目录
一、主从和集群架构的特点
1.1 主从架构的-Master/slave模式特点
1.2 集群架构-Cluster模式特点
二、ActiveMQ的主从架构
2.1 架构图
2.2 特点
2.3 实现方式(3种)
2.4 实现 (基于LevelDB复制)
2.4.1 准备环境
2.4.2 启动
2.4.3 宕机测试
三、ActiveMq的集群架构
3.1 架构图
3.2 集群的配置实现
3.2.1 静态的发现
3.2.3 动态的发现
3.3 集群的具体实现(动态的发现)
3.4.1 准备环境 (同2.4.1)
3.4.2 修改配置文件 config/activemq.xml (三台服务器activemq相同配置)
3.4.3 启动三台服务
3.4.4 集群成功,查看页面
3.4.5 代码测试
一、主从和集群架构的特点
1.1 主从架构的-Master/slave模式特点
读写分离,纵向扩展,所有的写操作一般在master上完成,slave只提供一个热备
1.2 集群架构-Cluster模式特点
分布式的一种存储,水平的扩展,消息的分布式共享
二、ActiveMQ的主从架构
2.1 架构图
2.2 特点
- 只有master对外提供服务,也就是说,producer和consumer智能连接master
- 一个master下面可以有一个或者多个slave,slave不对外提供服务
- 一个slave只能属于一个master
- 整个主从架构中只有一个master,否则容易造成数据复制到混乱
- master、slave之间的数据是同步的(共享方式有3种)是一个同步的复制
2.3 实现方式(3种)
- 基于文件(Shared File System),需要创建一个共享的持久化文件
<persistenceAdapter>
<kahaDB directory="/data/kahadb"/> # 自定义的地址
</persistenceAdapter>
主要是通过共享目录存储目录来实现master和slave的热备,谁先启动,谁就可以最终取得共享目录的控制权成为master,其它的应用就只能作为slave
- 基于数据库(JDBC Master Slave),需要创建一个共享的数据库
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destory-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root" />
<property name="poolPreparedStatements" value="true"/>
</bean>
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
</persistenceAdapter>
与shared filesystem方式类似,知识共享的存储介质由文件系统改成数据库而已
- 基于LevelDB复制(Replicated LevelDB Store),需要zookeeper的支持,例如使用activemq本身就支持的LevelDB持久化。
2.4 实现 (基于LevelDB复制)
2.4.1 准备环境
这里准备3台服务器:192.168.190.200、192.168.190.201、192.168.190.202
我这里三台主机名分别设置了 master node1 node2
三台服务器均下载安装activemq
[root@master /]# mkdir myactivemq
[root@master /]# cd myactivemq
[root@master opt]# wget https://archive.apache.org/dist/activemq/5.15.9/apache-activemq-5.15.9-bin.tar.gz
[root@master opt]# tar -zxvf apache-activemq-5.15.9-bin.tar.gz
2.4.2 修改配置文件 config/activemq.xml (三台服务器activemq相同配置)
zkAddress 为zookeeper的地址
<!--
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="192.168.190.200:2181"
hostname="192.168.190.201" <!-- 集群中任意一台服务器ip或通过host文件配置的主机名 -->
sync="local_disk"
zkPath="/activemq/leveldb-stores"
/>
</persistenceAdapter>
如果是单机需要修改下边配置,61616端口修改,其它全部注掉,防止端口冲突,我们这里是多服务器配置,可以不用更改,本配置忽略下边配置
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<!--
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
-->
</transportConnectors>
2.4.2 启动
先启动zookeeper,然后分别启动三台服务器的activemq
[root@master bin]# pwd
/myactivemq/apache-activemq-5.15.9/bin
[root@master bin]# ./activemq start
我这里先启动了241的节点,所以241为主master节点
因此我们只能访问241的控制界面
2.4.3 宕机测试
选择master节点, ./activemq stop
我们发现其中一台从节点被选举为主节点
重新启动原来master,其会作为slave服务器继续提供服务
三、ActiveMq的集群架构
3.1 架构图
3.2 集群的配置实现
3.2.1 静态的发现
需要在节点的配置文件中,显示的配置其他节点的IP地址和服务端口号,例如:
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.190.200:61616,tcp://192.168.190.201:61616,tcp://192.168.190.202:61616)" />
</networkConnectors>
3.2.3 动态的发现
通过广播的方式,动态的发现其它节点。例如:
<networkConnectors>
<networkConnector uri="multicast://default" /> <!-- 这里defalut广播名可以是随意取的 -->
</networkConnectors>
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" discoveryUri="multicast://default"
/>
<!--
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
-->
</transportConnectors>
在每一个节点中activemq.xml配置文件中配置上述代码。其中defalut是我们自定义的名称,在discoveryUri属性中进行引用即可。有利于集群节点的动态变更。
注意,如单机部署多个activemq示例,jetty.xml文件中的8161端口也需要更改
3.3 集群的具体实现(动态的发现)
3.4.1 准备环境 (同2.4.1)
这里准备3台服务器:192.168.190.200、192.168.190.201、192.168.190.202
三台服务器均下载安装activemq
[root@master /]# mkdir myactivemq
[root@master /]# cd myactivemq
[root@master opt]# wget https://archive.apache.org/dist/activemq/5.15.9/apache-activemq-5.15.9-bin.tar.gz
[root@master opt]# tar -zxvf apache-activemq-5.15.9-bin.tar.gz
3.4.2 修改配置文件 config/activemq.xml (三台服务器activemq相同配置)
<networkConnectors>
<networkConnector uri="multicast://default" /> <!-- 这里defalut广播名可以是随意取的 -->
</networkConnectors>
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" discoveryUri="multicast://default"
/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
每台brokerName不能重名,名字随意
3.4.3 启动三台服务
我这里启动2台的能成功加入集群,启动三台失败,...当启动第三台的时候,查看日志报错如下
2024-04-15 12:06:18,843 | WARN | Failed to add Connection id=localhost->localhost-36779-1713153927901-42:2, clientId=NC_localhost_outbound due to {} | org.apache.activemq.broker.TransportConnection | ActiveMQ Transport: tcp:///192.168.190.202:50616@61616
javax.jms.InvalidClientIDException: Broker: localhost - Client: NC_localhost_outbound already connected from tcp://192.168.190.200:47904
at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:247)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:227)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:119)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:849)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)[activemq-client-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:336)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.transport.MutexTransport.onComm
我们打开2台集群中的其中一台可以看到,连接的名字都是NC:localhost:outbound,
其中localhost是brokerName,所以这里需要修改每一台的brokerName不能重名
启动第一台服务器activemq
这里查看的都是一台服务器的日志
加入第二台服务器的日志
加入第三台服务器的日志
3.4.4 集群成功,查看页面
随便查看一台服务器的地址的可视页面,可以看到集群成功
3.4.5 代码测试
生产者连接其中一个activemq实例生产消息
package com.dolphin;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.jms.core.MessagePostProcessor;
import javax.jms.*;
public class JmsProduce {
public static final String ACTIVEMQ_URL = "tcp://192.168.190.200:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//1 创建连接工厂,按照规定的url地址,采用默认用户名和密码 admin/admin
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2 通过连接工厂,获得链接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建会话session
//两个参数,第一个叫事务/第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5、创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
//6、 通过使用messageProducer生产3条消息发送到MQ的队列里面
for (int i = 1;i<=6;i++) {
//7 创建消息
TextMessage textMessage = session.createTextMessage("message---" + i);//理解为一个字符串
textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
//8 通过messageProducer发送给mq
messageProducer.send(textMessage);
}
//9 关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println("*****消息发布完成");
}
}
消费者连接另一台服务实例
package com.dolphin;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumer1 {
public static final String ACTIVEMQ_URL = "tcp://192.168.190.201:61616";
public static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException, IOException {
System.out.println("1号消费者");
//1 创建连接工厂,按照规定的url地址,采用默认用户名和密码 admin/admin
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//2 通过连接工厂,获得链接connection并启动访问
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
//3、创建会话session
//两个参数,第一个叫事务/第二个叫签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4、创建目的地(具体是队列还是主题topic)
Queue queue = session.createQueue(QUEUE_NAME);
//5、创建消息的消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
//6、 通过使用messageProducer生产3条消息发送到MQ的队列里面
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("*****消费者接受到消息"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read(); //进程处于运行状态
messageConsumer.close();
session.close();
connection.close();
System.out.println("*****消息消费完成");
}
}
运行生产者生产了6条消息,消息发送到指定的服务实例上,其它集群实例中看不到消息
运行连接了另一台服务的消费者
运行结果,这里消费了6条上述生产者的消息