协议配置
ActiveMQ 支持的协议有 TCP 、 UDP、NIO、SSL、HTTP(S) 、VM 这是activemq 的activemq.xml 中配置文件设置协议的地方
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumCon nections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnect ions=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConn ections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnect ions=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnection s=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
默认是使用 openwire 也就是 tcp 连接
默认的Broker 配置,TCP 的Client 监听端口 61616 ,在网络上传输数据,必须序列化数据,消息是通过一个 write protocol 来序列化为字节流。默认情况 ActiveMQ 会把 wire protocol 叫做 Open Wire ,它的目的是促使网络上的效率和数据快速交互
- NIO 协议为ActiveMQ 提供更好的性能
适合NIO 使用的场景:
1 当有大量的Client 连接到Broker 上 , 使用NIO 比使用 tcp 需要更少的线程数量,所以使用 NIO
2 可能对于 Broker 有一个很迟钝的网络传输, NIO 的性能高于 TCP
连接形式:
nio://hostname:port?key=value
https://activemq.apache.org/configuring-version-5-transports.html
修改 activemq.xml 使之支持 NIO 协议:
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="auto+nio+ssl" uri="auto+nio+ssl://0.0.0.0:5671?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
ActiveMQ 的可持久化
将MQ 收到的消息存储到文件、硬盘、数据库 等、 则叫MQ 的持久化,这样即使服务器宕机,消息在本地还是有,仍就可以访问到。
官网 : http://activemq.apache.org/persistence
ActiveMQ 支持的消息持久化机制: 带赋值功能的 LeavelDB 、 KahaDB 、 AMQ 、 JDBC
持久化就是高可用的机制,即使服务器宕机了,消息也不会丢失
AMQ 是文件存储形式,写入快、易恢复 默认 32M 在 ActiveMQ 5.3 之后不再适用
KahaDB : 5.4 之后基于日志文件的持久化插件,默认持久化插件,提高了性能和恢复能力
KahaDB 的属性配置 : http://activemq.apache.org/kahadb
它使用一个事务日志和 索引文件来存储所有的地址
db-<数字>.log 存储数据,一个存满会再次创建 db-2 db-3 …… ,当不会有引用到数据文件的内容时,文件会被删除或归档
db.data 是一个BTree 索引,索引了消息数据记录的消息,是消息索引文件,它作为索引指向了 db-.log 里的消息
一点题外话:就像mysql 数据库,新建一张表,就有这个表对应的 .MYD 文件,作为它的数据文件,就有一个 .MYI 作为索引文件。
db.free 存储空闲页 ID 有时会被清除
db.redo 当 KahaDB 消息存储在强制退出后启动,用于恢复 BTree 索引
lock 顾名思义就是锁
四类文件+一把锁 ==》 KahaDB
LeavelDB : 希望作为以后的存储引擎,5.8 以后引进,也是基于文件的本地数据存储形式,但是比 KahaDB 更快
它比KahaDB 更快的原因是她不使用BTree 索引,而是使用本身自带的 LeavelDB 索引
题外话:为什么LeavelDB 更快,并且5.8 以后就支持,为什么还是默认 KahaDB 引擎,因为 activemq 官网本身没有定论,LeavelDB 之后又出了可复制的LeavelDB 比LeavelDB 更性能更优越,但需要基于 Zookeeper 所以这些官方还没有定论,任就使用 KahaDB
JDBC : 有一部分数据会真实的存储到数据库中
使用JDBC 的持久化,
①修改配置文件,默认 kahaDB
修改之前:
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
修改之后:
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-xql" createTablesOnStartup="false"/>
</persistenceAdapter>
<bean id="mysql-xql" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://xxxxxxx:3307/activemq?relaxAutoCommit=true&useSSL=false&serverTimezone=GMT"/>
<property name="username" value="root"/>
<property name="password" value="xxxx"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
④ 让linux 上activemq 可以访问到 mysql ,之后产生消息。
ActiveMQ 启动后会自动在 mysql 的activemq 数据库下创建三张表:activemq_msgs 、activemq_acks、activemq_lock
activemq_acks:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存
activemq_lock:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker
activemq_msgs:用于存储消息,Queue和Topic都存储在这个表中
点对点会在数据库的数据表 ACTIVEMQ_MSGS 中加入消息的数据,且在点对点时,消息被消费就会从数据库中删除
但是对于主题,订阅方式接受到的消息,会在 ACTIVEMQ_MSGS 存储消息,即使MQ 服务器下线,并在 ACTIVEMQ_ACKS 中存储消费者信息 。 并且存储以 activemq 为主,当activemq 中的消息被删除后,数据库中的也会自动被删除。
持久化消息是指:
MQ 所在的服务器down 了消息也不会丢失
持久化机制演化过程:
从最初的AMQ Message Store 方案到 ActiveMQ V4版本推出的High performance journal (高性能事务)附件,并且同步推出了关系型数据库的存储方案, ActiveMQ 5.3 版本有推出了KahaDB 的支持,(也是5.4之后的默认持久化方案),后来ActiveMQ 从5.8开始支持LevelDB ,现在5.9 提供了 Zookeeper + LevelDB 的集群化方案。
ActiveMQ 消息持久化机制有:
AMQ 基于日志文件
KahaDB 基于日志文件,5.4 之后的默认持久化
JDBC 基于第三方数据库
LevelDB : 基于文件的本地数据库存储,从5.8 之后推出了LevelDB 性能高于 KahaDB
ReplicatedLevelDB Store 从5.8之后提供了基于LevelDB 和Zookeeper 的数据复制方式,用于Master-slave方式的首数据复制选方案
但是无论使用哪种持久化方式,消息的存储逻辑都一样
小结
- 1> 引入消息队列后 如何保证高可用性
持久化、事务、签收、 以及带复制的 Leavel DB + zookeeper 主从集群搭建 - 2> 异步投递 Async send
对于一个慢消费者,使用同步有可能造成堵塞,消息消费较慢时适合用异步发送消息
activemq 支持同步异步 发送的消息,默认异步。当你设定同步发送的方式和 未使用事务的情况下发持久化消息,这时是同步的。
如果没有使用事务,且发送的是持久化消息,每次发送都会阻塞一个生产者直到 broker 发回一个确认,这样做保证了消息的安全送达,但是会阻塞客户端,造成很大延时 。
在高性能要求下,可以使用异步提高producer 的性能。但会消耗较多的client 端内存,也不能完全保证消息发送成功。在 useAsyncSend = true 情况下容忍消息丢失。
// 开启异步投递
activeMQConnectionFactory.setUseAsyncSend(true);
如何在投递快还可以保证消息不丢失 ?
异步发送消息丢失的情况场景是: UseAsyncSend 为 true 使用 producer(send)持续发送消息,消息不会阻塞,生产者会认为所有的 send 消息均会被发送到 MQ ,如果MQ 突然宕机,此时生产者端尚未同步到 MQ 的消息均会丢失 。
故 正确的异步发送方法需要接收回调
同步发送和异步发送的区别就在于 :
同步发送send 不阻塞就代表消息发送成功
异步发送需要接收回执并又客户端在判断一次是否发送
在代码中接收回调的函数 :
activeMQConnectionFactory.setUseAsyncSend(true);
……
for (int i = 1; i < 4 ; i++) {
textMessage = session.createTextMessage("msg--" + i);
textMessage.setJMSMessageID(UUID.randomUUID().toString()+"-- orderr");
String msgid = textMessage.getJMSMessageID();
messageProducer.send(textMessage, new AsyncCallback() {
@Override
public void onSuccess() {
// 发送成功怎么样
System.out.println(msgid+"has been successful send ");
}
@Override
public void onException(JMSException e) {
// 发送失败怎么样
System.out.println(msgid+" has been failure send ");
}
});
}
long delay = 3 * 1000 ;
long perid = 4 * 1000 ;
int repeat = 7 ;
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("delay msg--" + i);
// 消息每过 3 秒投递,每 4 秒重复投递一次 ,一共重复投递 7 次
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY,delay);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD,perid);
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT,repeat);
messageProducer.send(textMessage);
}