ActiveMQ的传输协议
一、是什么 二、协议 1.TCP(默认) 2.NIO 3.AMQP 4.STOMP 5.SSL 6.MQTT 7 WS
三、NIO配置案例 1.修改activemq.xml 2.重启 3.生产者/消费者 4.性能提升 4.1 配置 4.2 生产者/消费者
一、是什么
官网地址:http://activemq.apache.org/configuring-version-5-transports.html ActiveMQ支持的client-broker通讯协议有:TVP、NIO、UDP、SSL、Http(s)、VM。
其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的<transportConnectors>标签之内。 URI描述信息的头部都是采用协议名称,唯独在进行openwire协议描述时,URI头却采用的“tcp://······”。这是因为ActiveMQ中默认的消息协议就是openwire。
二、协议
1.TCP(默认)
Transmission Control Protocol(TCP)
1.这是默认的Broker配置,TCP的Client监听端口61616 2.在网络传输数据前,必须要先序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。 3.TCP连接的URI形式如:tcp://HostName:port?key=value&key=value,后面的参数是可选的。 4.TCP传输的的优点:
TCP协议传输可靠性高,稳定性强 高效率:字节流方式传递,效率很高 有效性、可用性:应用广泛,支持任何平台 协议参数文档地址:https://activemq.apache.org/components/classic/documentation/tcp-transport-reference
2.NIO
New I/O API Protocol(NIO)
1.NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。
2.适合使用NIO协议的场景:
可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。 可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。 3.NIO连接的URI形式:nio://hostname:port?key=value&key=value 协议参数文档地址:https://activemq.apache.org/components/classic/documentation/nio-transport-reference 默认端口:61618 NIO和TCP协议的编码是一样的,所以只需要替换URL的协议即可实现切换。
3.AMQP
Advanced Message Queuing Protocol(AMQP)
一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件限制。 默认端口:5672 注意:编码和TCP不一样
4.STOMP
Streaming Text Orientation Message Protocol(STOMP)
是流文本定向消息协议
,是一种为MOM(Message Oriented Middleware,面向消息中间件)设计的简单文本协议。 默认端口:61613
5.SSL
Secure Sockets Layer Protocol(SSL)
6.MQTT
Message Queuing Telemetry Transport(MQTT):消息队列遥测传输)
IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被用来当作传感器和致动器(比如通过Twitter让房屋联网)的通信协议。 默认端口:1883 扩展:https://github.com/fusesource/mqtt-client
7 WS
三、NIO配置案例
1.修改activemq.xml
打开activemq的配置文件,在active安装目录下:conf/activemq.xml 将下面的内容复制到<transportConnectors>标签内
< transportConnector name = " nio" uri = " nio://0.0.0.0:61618?trace=true" />
如果你不特别指定ActiveMQ的网络监听端口,那么这些端口都讲使用BIO网络IO模型 所以为了首先提高单节点的网络吞吐性能,我们需要明确指定ActiveMQ网络IO模型。 如上所示:URI格式头以“nio”开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型。
2.重启
./bin/active
去控制台查看,是否加成功了。
3.生产者/消费者
NIO和TCP协议的编码是一样的,所以只需要替换URL的协议即可实现切换。
package com. qingsi. activemq ;
import org. apache. activemq. ActiveMQConnectionFactory ;
import javax. jms. * ;
public class JmsProduce {
public static final String ACTIVEMQ_URL = "nio://192.168.86.128:61618" ;
public static final String QUEUE_NAME = "transport_nio" ;
public static void main ( String [ ] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory ( ACTIVEMQ_URL ) ;
Connection connection = activeMQConnectionFactory. createConnection ( ) ;
connection. start ( ) ;
Session session = connection. createSession ( false , Session . AUTO_ACKNOWLEDGE ) ;
Queue queue = session. createQueue ( QUEUE_NAME ) ;
MessageProducer producer = session. createProducer ( queue) ;
for ( int i = 0 ; i < 3 ; i++ ) {
TextMessage textMessage = session. createTextMessage ( "tx msg--" + i) ;
producer. send ( textMessage) ;
}
producer. close ( ) ;
session. close ( ) ;
connection. close ( ) ;
}
}
4.性能提升
问题:URI格式以"nio"开头,代表这个端口使用TCP协议为基础的NIO网络模型。但是这样的设置方式,只能使这个端口支持Openwire协议。 需要将所有的BIO模型,都替换成NIO模型,性能得到提升。
4.1 配置
官网地址:https://activemq.apache.org/components/classic/documentation/auto 将下面的配置,写入到activemq.xml
< transportConnector name = " auto+nio" uri = " auto+nio://0.0.0.0:61608?maximumConnections=1000& wireFormat.maxFrameSize=104857600& org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20& org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50" />
./bin/activemq restart
4.2 生产者/消费者
都是只改动了URL,其他的代码一样。所以下面举例了生产者的URL 只要
package com. qingsi. activemq ;
import org. apache. activemq. ActiveMQConnectionFactory ;
import javax. jms. * ;
public class JmsProduce {
public static final String ACTIVEMQ_URL = "nio://192.168.86.128:61608" ;
public static final String QUEUE_NAME = "nio_auto" ;
public static void main ( String [ ] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory ( ACTIVEMQ_URL ) ;
Connection connection = activeMQConnectionFactory. createConnection ( ) ;
connection. start ( ) ;
Session session = connection. createSession ( false , Session . AUTO_ACKNOWLEDGE ) ;
Queue queue = session. createQueue ( QUEUE_NAME ) ;
MessageProducer producer = session. createProducer ( queue) ;
for ( int i = 0 ; i < 3 ; i++ ) {
TextMessage textMessage = session. createTextMessage ( "tx msg--" + i) ;
producer. send ( textMessage) ;
}
producer. close ( ) ;
session. close ( ) ;
connection. close ( ) ;
}
}