文章目录
- 前言
- 一、ActiveMQ、 MQTT是什么?
- 1.ActiveMQ介绍
- 2.MQTT介绍
- 二、集群搭建步骤
- 1.下载apache-activemq-5.15.12-bin.tar.gz
- 2.上传apache-activemq-5.15.12-bin.tar.gz到服务器并解压文件到文件夹clusters、master、slave三个文件夹下面形成三个节点(平等节点)
- 3.分别修改三个节点的配置文件:activemq.xml,修改连接协议端口号和配置网络代理,修改如下:
- 4.分别修改三个节点的WEB配置文件:jetty.xml,修改端口号,修改如下:
- 5.分别到bin目录下运行activemq文件启动activemq,命令如下:
- 6.全部节点启动后,进入web端界面查询集群配置情况:
- 三、项目连接配置步骤
- 1.Activemq连接服务配置:
- 2.MQTT生产者连接配置:
- 3.MQTT消费者连接配置:
- 4.项目使用:
- 总结
前言
随着技术的不断迭代,在分布式系统中应用消息组件进行通信已经是非常常见的方式,而为了保障消息中间件的高可用性就需要对中间件进行集群化部署,这是应用程序发展的必经之路。
一、ActiveMQ、 MQTT是什么?
1.ActiveMQ介绍
ActiveMQ官网
ActiveMQ是一个开源的、基于Java的消息中间件(Message Oriented Middleware,MOM)实现。它提供了可靠的异步消息传递的功能,用于在分布式系统中进行应用程序之间的通信。
以下是ActiveMQ的一些主要特点和功能:
1.1、 异步消息传递:ActiveMQ支持发布-订阅和点对点模式的消息传递。应用程序可以通过发送和接收消息来进行异步通信。
1.2、持久化和持久订阅:ActiveMQ可以将消息持久化到磁盘,以确保即使在消息发送者和接收者之间的断开连接或重启后,消息也能被正确接收。
1.3、 多种消息传递模式:ActiveMQ支持多种消息传递模式,包括点对点队列、主题订阅和点对点回复等。
1.4、基于JMS标准:ActiveMQ完全支持Java消息服务(JMS)规范,是JMS的一种实现。JMS提供了一系列的API和协议,用于在Java应用程序之间进行消息传递。
1.5、高可用性和故障转移:ActiveMQ支持故障转移和高可用性,可以通过配置多个broker实现自动故障转移和消息备份。
1.6、多种协议支持:ActiveMQ支持多种协议,如AMQP、STOMP、OpenWire、MQTT等。这使得ActiveMQ可以与不同的客户端和应用程序进行集成和通信。
1.7、 插件体系结构:ActiveMQ具有可扩展的插件体系结构,允许开发人员根据需求添加自定义功能和扩展。
1.8、可视化管理工具:ActiveMQ提供了可视化的管理界面,用于监控和管理消息队列、主题、连接等。
作为一种成熟而强大的消息中间件解决方案,ActiveMQ被广泛用于构建可靠的分布式系统、实现异步通信、实现解耦和提高应用程序的可伸缩性等场景。
2.MQTT介绍
MQTT(Message Queuing Telemetry Transport)是一种轻量级、开放、简单的消息传输协议,专门针对物联网(IoT)领域设计。它具有低带宽和低功耗的特点,适用于在资源受限的设备上进行可靠的通信。
以下是MQTT协议的一些关键特点:
1.1、 轻量级:MQTT协议设计简单,通信报文开销小,传输数据量较小,适用于带宽有限的网络环境,能够满足物联网设备的资源限制。
1.2、发布/订阅模式:MQTT采用发布/订阅模式,包含发送消息的发布者和接收消息的订阅者。发布者将消息发布到特定的主题上,而订阅者通过订阅感兴趣的主题来接收消息。
1.3、QoS支持:MQTT支持三种不同的服务质量(QoS)级别:QoS 0(至多一次传输)、QoS 1(至少一次传输)和QoS 2(恰好一次传输)。这种级别的支持确保了消息的可靠性和传递保证。
1.4、消息保留:MQTT支持在特定主题上保留最新的消息。这意味着当订阅者订阅一个主题时,它将立即接收到最新的保留消息,而不仅仅是实时发送的消息。
1.5、心跳机制:MQTT协议定义了心跳机制,通过发送心跳报文,保持客户端和代理服务器之间的连接有效性。如果客户端长时间没有发送心跳,代理服务器将断开连接。
1.6、安全性支持:MQTT提供了基于TLS/SSL的加密和身份验证机制,以确保消息的机密性和安全性。
1.7、广泛的应用:MQTT广泛应用于物联网领域,例如传感器网络、远程监测、智能家居、工业自动化等。
MQTT协议的轻量级和简单性使得它成为连接大量设备和传输数据的理想选择,尤其是在资源受限的物联网环境中。它以其可靠性、灵活性和互通性在物联网行业得到了广泛应用。
二、集群搭建步骤
1.下载apache-activemq-5.15.12-bin.tar.gz
官网下载地址
2.上传apache-activemq-5.15.12-bin.tar.gz到服务器并解压文件到文件夹clusters、master、slave三个文件夹下面形成三个节点(平等节点)
注意:三个节点是公平节点。一开始我是想做集群节点clusters做数据分发,然后master和slave做主从的。后面发现存在问题,在资源较少情况下clusters、clusters、slave都为单节点的情况下,clusters一挂掉,集群关系就破裂了,没有节点给master和slave做数据分发了,这样的配置不友好。
于是我就把三个节点配置成了平等节点,任何节点宕机都能正常运行。
3.分别修改三个节点的配置文件:activemq.xml,修改连接协议端口号和配置网络代理,修改如下:
3.1、配置默认的传输协议OpenWire 和 支持硬件的传输协议MQTT;
3.2、配置网络代理networkConnectors,做节点间数据传输;
3.3、duplex设置为true,则一个连接上可以双向流动消息(双工连接),默认值为false,默认情况下,在两个提供者之间的连接上的消息流动方向是单向(单工连接);
3.4、修改三个节点的brokerName为localhost_clusters、localhost_master、localhost_slave;
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=10000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=10000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<!-- 配置网络代理,cluster 节点需要与 master 跟 slave 进行穿透 -->
<networkConnectors>
<networkConnector name="network-clusters" uri="static:(tcp://192.168.10.41:61617,tcp://192.168.10.41:61618)" duplex="true" />
</networkConnectors>
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=10000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:2884?maximumConnections=10000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<!-- 配置网络代理,master 节点需要与 cluster 跟 slave 进行穿透 -->
<networkConnectors>
<networkConnector name="network-master" uri="static:(tcp://192.168.10.41:61616,tcp://192.168.10.41:61618)" duplex="true" />
</networkConnectors>
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61618?maximumConnections=10000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:2885?maximumConnections=10000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<!-- 配置网络代理,slave 节点需要与 master 跟 cluster 进行穿透 -->
<networkConnectors>
<networkConnector name="network-slave" uri="static:(tcp://192.168.10.41:61616,tcp://192.168.10.41:61617)" duplex="true" />
</networkConnectors>
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost_clusters" dataDirectory="${activemq.data}">
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost_master" dataDirectory="${activemq.data}">
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost_slave" dataDirectory="${activemq.data}">
4.分别修改三个节点的WEB配置文件:jetty.xml,修改端口号,修改如下:
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8161"/>
</bean>
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8171"/>
</bean>
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0"/>
<property name="port" value="8181"/>
</bean>
5.分别到bin目录下运行activemq文件启动activemq,命令如下:
sh activemq start
6.全部节点启动后,进入web端界面查询集群配置情况:
6.1、进入web端界面http://192.168.10.41:8161、http://192.168.10.41:8171、http://192.168.10.41:8181,登录账号密码admin/admin,到Network查看是否有另外两个节点的连接情况,若有另外两个节点的连接信息并且Remote Address为true,则集群建立完毕;
6.2、图片如下:
三、项目连接配置步骤
1.Activemq连接服务配置:
ActiveMQ连接配置开箱即用
failover
是一种连接URL配置选项,用于指定多个ActiveMQ broker的连接地址。当一个broker发生故障或不可用时,客户端会自动尝试连接配置中的其他broker。以此机制来实现多节点的集群连接模式。
spring:
activemq:
broker-url: failover:(tcp://192.168.10.41:61616,tcp://192.168.10.41:61617,tcp://192.168.10.41:61618)
user: admin
password: admin
pool:
enabled: true
packages:
trust-all: true
2.MQTT生产者连接配置:
注意:MQTT的连接配置是我自定义的,在项目里有相关代码配合使用;
mqtt:
brokers: tcp://192.168.10.41:2884,tcp://192.168.10.41:2885,tcp://192.168.10.41:1883
clientIds: dig-producer1,dig-producer2
qos: 1
userName: admin
password: admin
3.MQTT消费者连接配置:
注意:MQTT的连接配置是我自定义的,在项目里有相关代码配合使用;
mqtt:
topics: V5008Upload/#,V6800Upload/#
qoss: 1,2
brokers: tcp://192.168.10.41:2884,tcp://192.168.10.41:2885,tcp://192.168.10.41:1883
clientIds: dig-consumer1,dig-consumer2
userNames: admin
words: admin
4.项目使用:
4.1、ActiveMQ配置使用
activeMQ配置使用比较简单,也不是本文的重点,简单贴一点代码
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.JmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
@Configuration
public class ActiveMqConfig {
@Value("${spring.activemq.broker-url}")
private String brokerUrl;
/**
* 队列模式(消息将按顺序一个一个地被消费,每个消息只能被一个消费者接收)
*/
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
// SimpleJmsListenerContainerFactory适用于JMS 1.1规范
// 消息监听容器工厂
SimpleJmsListenerContainerFactory factory = new SimpleJmsListenerContainerFactory();
// 关闭事务
factory.setSessionTransacted(false);
// 手动确认消息
factory.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
// 设置监听容器工厂的发布订阅域为队列模式,即采用点对点消息传递模式
factory.setPubSubDomain(false);
factory.setConnectionFactory(activeMQConnectionFactory);
return factory;
}
/**
* 配置名字为givenConnectionFactory的连接工厂
*
* @return
*/
@Bean("givenConnectionFactory")
public ActiveMQConnectionFactory connectionFactory() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
// 自定义消费重试机制
RedeliveryPolicy policy = new RedeliveryPolicy();
// 消息处理失败重新处理次数,默认为5次
policy.setMaximumRedeliveries(5);
// 启用指数退避策略,以延长每次重试的间隔时间
policy.setUseExponentialBackOff(Boolean.TRUE);
// 设置初始重试延迟时间为0毫秒,意味着消息处理失败时立即进行重试
policy.setInitialRedeliveryDelay(0);
// 设置每次重试之间的延迟时间为3秒
policy.setRedeliveryDelay(3000L);
// 设置指数退避的增加倍数,每次重试的延迟时间将按比例增加
policy.setBackOffMultiplier(2);
// 设置最大重试延迟时间为20秒
policy.setMaximumRedeliveryDelay(20000L);
factory.setRedeliveryPolicy(policy);
Connection connection = factory.createConnection();
connection.start();
return factory;
}
// /**
// * 发布-订阅模式(消息会被广播给所有订阅该主题的消费者)
// */
// @Bean("topicListener")
// public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ActiveMQConnectionFactory givenConnectionFactory) {
// // 设置为发布订阅模式, 默认情况下使用生产消费者方式
// // DefaultJmsListenerContainerFactory 2.0规范
// DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
// bean.setSessionTransacted(false);
// bean.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
// bean.setPubSubDomain(true);
// bean.setConnectionFactory(givenConnectionFactory);
// return bean;
// }
}
import org.apache.activemq.command.ActiveMQQueue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
@Service
public class ActivimqProducer {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
/**
* 发送队列模式
*
* @param queueName
* @param message
*/
public void sendMqQueue(String queueName, String message) {
this.jmsMessagingTemplate.convertAndSend(new ActiveMQQueue(queueName), message);
}
}
import com.test.common.enums.QueueType;
import com.test.local.mqtt.process.EquipmentAssetsProcess;
import com.test.local.mqtt.process.MqttDataProcessing;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import javax.jms.Session;
@Slf4j
@Component
public class ActivimqConsumer {
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private EquipmentAssetsProcess equipmentAssetsProcess;
@JmsListener(destination = QueueType.LABEL_STATE, containerFactory = "jmsListenerContainerQueue")
public void consumerLabelState(ActiveMQMessage activeMQMessage, String message, Session session) {
if (StringUtils.isNotEmpty(message)) {
threadPoolTaskExecutor.execute(new MqttDataProcessing(
equipmentAssetsProcess,
message,
QueueType.LABEL_STATE,
activeMQMessage,
session
));
}
}
}
import com.test.fastjson.JSON;
import com.test.common.constants.Constants;
import com.test.common.enums.PatternStatusEnum;
import com.test.common.enums.QueueType;
import com.test.common.redis.RedisCache;
import com.test.entity.TagInfo;
import com.test.local.entity.*;
import com.test.service.RegionCheckRecordService;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.command.ActiveMQMessage;
import org.springframework.beans.factory.annotation.Autowired;
import javax.jms.Session;
import java.util.UUID;
@Slf4j
public class MqttDataProcessing implements Runnable {
@Autowired
private RedisCache redisCache;
@Autowired
private RegionCheckRecordService regionCheckRecordService;
private String topic;
private String message;
private EquipmentAssetsProcess equipmentAssetsProcess;
private ActiveMQMessage activeMQMessage;
private Session session;
public MqttDataProcessing(
EquipmentAssetsProcess equipmentAssetsProcess,
String message,
String topic,
ActiveMQMessage activeMQMessage,
Session session
) {
this.topic = topic;
this.message = message;
this.equipmentAssetsProcess = equipmentAssetsProcess;
this.activeMQMessage = activeMQMessage;
this.session = session;
}
@SneakyThrows
@Override
public void run() {
String logId = UUID.randomUUID().toString().replace("-", "");
try {
if (QueueType.LABEL_STATE.equals(topic)) {
LabelState labelState = JSON.parseObject(message, LabelState.class);
if (labelState.getData() != null && labelState.getData().size() > 0) {
equipmentAssetsProcess.processLabelState(labelState, logId);
}
activeMQMessage.acknowledge();
}
} catch (Exception e) {
// 重发
session.recover();
log.error("异常,重新消费!logId={},topic={},message={}", logId, topic, message, e);
}
}
}
4.2、MQTT配置使用
对于MQTT的分布式我是这么理解的:
在消费端,同时连接多个节点进行消费,硬件发送的消息定义一个唯一id,此时会有ABC三个消费者等待硬件发送过来的消息,于是使用redisson的分布式锁lock.tryLock来限制消息只被消费一次。
在生产端,同时连接多个节点进行消息发送,因为我们的硬件只能连接到一个节点上面(硬件不能支持多节点代理消费)在一个节点宕机后才会去尝试连接备选节点,所有我们对所有节点都发送消息,保证该消息能被硬件接收并消费到,另外两个节点多发送的消息也不会造成问题(直接无视了),因为硬件同一时刻只能连接一个节点进行消费。
消费者配置
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Repository;
@RefreshScope
@Repository
@Data
public class ConfigMqtt {
private String[] topics = new String[]{"test1Upload/#","test2Upload/#","test3Upload/#"};
// @Value("${mqtt.qoss}")
private int[] qoss = new int[]{2,2,2};
@Value("${mqtt.brokers}")
private String[] brokers;
@Value("${mqtt.clientIds}")
private String[] clientIds;
@Value("${mqtt.userNames}")
private String userNames;
@Value("${mqtt.words}")
private String words;
}
import com.alibaba.fastjson.JSON;
import com.test.common.redis.RedisCache;
import com.test.config.ConfigMqtt;
import com.test.util.HexConvert;
import com.google.common.base.Strings;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
/**
* 订阅者:订阅硬件mqtt主题信息,消费硬件发送消息,转换硬件消息发送到ActiveMQ队列,最终到其他微服务处理ActiveMQ队列的消息
*/
@Slf4j
@Service
public class MqttSubscription {
@Autowired
private ConfigMqtt configMqtt;
@Autowired
private SubscriptionJSON subscriptionJSON;
@Autowired
private SubscriptionHEX subscriptionHEX;
@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Resource
private RedissonClient redissonClient;
@Resource
private RedisCache redisCache;
@Bean
public void client() throws Exception {
String[] hosts = configMqtt.getBrokers();
String[] clientIds = configMqtt.getClientIds();
// 多个
for (int i = 0; i < hosts.length; i++) {
String host = hosts[i];
String clientId = clientIds[i];
try {
InetAddress ip4 = Inet4Address.getLocalHost();
clientId = clientId + "-" + ip4.getHostAddress();
} catch (UnknownHostException e) {
log.error("MqttSubscription-client-configMqtt" + configMqtt);
log.error("MqttSubscription-client-e" + e);
}
String finalClientId = clientId;
threadPoolTaskExecutor.execute(() -> this.myClient(host, finalClientId));
}
}
private void myClient(String host, String clientId) {
try {
String[] topics = configMqtt.getTopics();
int[] qos = configMqtt.getQoss();
String userName = configMqtt.getUserNames();
String passWord = configMqtt.getWords();
// host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
MqttClient client = new MqttClient(host, clientId, new MemoryPersistence());
// MQTT的连接设置
MqttConnectOptions options = new MqttConnectOptions();
// todo:ch:设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,断线重连会消费断线期间的消息
options.setCleanSession(true);
// 设置连接的用户名
options.setUserName(userName);
// 设置连接的密码
options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
// 自动重连
options.setAutomaticReconnect(true);
// todo:ch:设置遗嘱消息的话题,若客户端和服务器之间的连接意外断开,服务器将发布客户端的遗嘱信息
options.setWill("willTopic", (clientId + "与服务器断开连接").getBytes(), 0, false);
// 设置回调函数
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
while (true) {
try {
client.connect(options);
client.subscribe(topics, qos);
break;
} catch (Exception e) {
e.printStackTrace();
log.error("mqtt客户端id-clientId:" + clientId);
log.error("mqtt连接异常-e", e);
log.error("mqtt连接异常-cause" + cause);
try {
Thread.sleep(5000);
} catch (InterruptedException ex) {
ex.printStackTrace();
}
}
}
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
String key = "MQTT-";
String uuid = "";
// 消息存在
if (message.getPayload().length > 0) {
byte[] req = message.getPayload();
if (topic.contains("test1") && configMqtt.getDataFormat().equals("JSON")) {
String data = new String(req);
Map<String, Object> map = JSON.parseObject(data);
if (map.containsKey("uuid") && map.get("uuid") != null) {
uuid = map.get("uuid").toString();
}else {
uuid = clientId;
}
} else if (topic.contains("test1") && configMqtt.getDataFormat().equals("HEX")) {
String data = HexConvert.convertStringToHex(req);
uuid = data.subSequence(data.length() - 8, data.length()).toString();
} else if (topic.contains("test2") || topic.contains("test3")) {
String data = HexConvert.convertStringToHex(req);
uuid = data.subSequence(data.length() - 8, data.length()).toString();
}
if (!Strings.isNullOrEmpty(uuid)) {
key += uuid;
}
}
// 分布式锁,防止多应用节点产生重复消息
RLock lock = redissonClient.getLock(key);
try {
// 加锁,等待30秒锁自动释放, 不在finally手动释放了,给予30秒的缓冲时间
boolean resultLock = lock.tryLock(0, 30, TimeUnit.SECONDS);
if (resultLock) {
String data = new String(message.getPayload());
log.info("mqtt-clientId:" + clientId);
log.info("mqtt-key:" + key);
// log.info("message-ID:" + message.getId());
log.info("messageArrived-topic" + topic);
log.info("messageArrived-message" + data);
if (topic.contains("test1") && configMqtt.getDataFormat().equals("JSON")) {
// 处理硬件的消息,发送到ActiveMQ,最终在别的微服务进行消息消费
......
} else if (topic.contains("test1") && configMqtt.getDataFormat().equals("HEX")) {
// 处理硬件的消息,发送到ActiveMQ,最终在别的微服务进行消息消费
......
} else if (topic.contains("test2") || topic.contains("test3")
) {
// 处理硬件的消息,发送到ActiveMQ,最终在别的微服务进行消息消费
......
}
}
} catch (Exception e) {
e.printStackTrace();
log.error("mqtt客户端id-clientId:" + clientId);
log.error("mqtt发布信息异常-e", e);
log.error("mqtt发布信息异常-topic" + topic);
log.error("mqtt发布信息异常-message" + message.toString());
}
// finally {
// if (lock.isLocked()) {
// if (lock.isHeldByCurrentThread()) {
// lock.unlock();
// }
// }
// }
}
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
// todo:是否需要永久重新连接,能否设定固定重连次数 或者固定多少秒重连一次(类似心跳机制)
int retryCount = 0;
while (!client.isConnected()) {
try {
Thread.sleep(getBackoffTime(retryCount));
client.connect(options);
client.subscribe(topics, qos);
} catch (Exception e) {
log.error("Reconnect attempt failed", e);
retryCount++;
}
}
} catch (MqttException e) {
log.error("mqtt客户端id-clientId:" + clientId);
log.error("mqtt连接错误:", e);
}
}
private long getBackoffTime(int retryCount) {
// 使用指数退避算法计算重连时间
long waitTime = Math.min(1000 * (1 << retryCount), 60000); // 最大等待时间为60秒
return waitTime;
}
}
生产者配置
import lombok.Data;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.stereotype.Repository;
@RefreshScope
@Repository
@Data
public class ConfigMqtt {
@Value("${mqtt.brokers}")
private String[] brokers;
@Value("${mqtt.clientIds}")
private String[] clientIds;
@Value("${mqtt.qos}")
private int qos;
@Value("${mqtt.userName}")
private String userName;
@Value("${mqtt.password}")
private String password;
}
import com.test.local.config.ConfigMqtt;
import com.test.local.utils.HexConvert;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.net.Inet4Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.List;
/**
* MQTT生产者,生产消息发送到硬件
*/
@Slf4j
@Service
public class MqttConnect {
private volatile static MqttClient mqttClientSingleton;
private volatile static List<MqttClient> mqttClientSingletonList = new ArrayList<>();
@Autowired
private ConfigMqtt configMqtt;
@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
private MqttConnect() {
}
/**
* 创建 多个 mqtt 实例
*/
public static List<MqttClient> getMoreInstance(ConfigMqtt configMqtt) {
String[] hosts = configMqtt.getBrokers();
String[] clientIds = configMqtt.getClientIds();
// 创建多个节点连接实例
if (mqttClientSingletonList == null || mqttClientSingletonList.size() < hosts.length) {
if (mqttClientSingletonList != null && !mqttClientSingletonList.isEmpty() && mqttClientSingletonList.size() < hosts.length) {
mqttClientSingletonList.forEach(re -> {
try {
re.disconnect();
re.close();
} catch (Exception e) {
}
});
// 清除原有实例
mqttClientSingletonList.clear();
}
// 多个
for (int i = 0; i < hosts.length; i++) {
String clientIdRe = clientIds[i];
String broker = hosts[i];
String userName = configMqtt.getUserName();
String password = configMqtt.getPassword();
StringBuffer clientId = new StringBuffer();
try {
InetAddress ip4 = Inet4Address.getLocalHost();
clientId.append(clientIdRe).append("-").append(ip4.getHostAddress()).append("-").append(HexConvert.getStringRandom(11));
} catch (UnknownHostException e) {
log.error("MqttClient-getInstance-e" + e);
}
MemoryPersistence persistence = new MemoryPersistence();
synchronized (MqttConnect.class) {
MqttClient mqttClient = null;
try {
// 创建客户端
mqttClient = new MqttClient(broker, clientId.toString(), persistence);
// 创建链接参数
MqttConnectOptions connOpts = new MqttConnectOptions();
// 在重新启动和重新连接时记住状态
connOpts.setCleanSession(true);
// 设置连接的用户名
connOpts.setUserName(userName);
connOpts.setPassword(password.toCharArray());
// 建立连接
mqttClient.connect(connOpts);
mqttClientSingletonList.add(mqttClient);
} catch (MqttException me) {
log.error("reason " + me.getReasonCode());
log.error("msg " + me.getMessage());
log.error("loc " + me.getLocalizedMessage());
log.error("cause " + me.getCause());
log.error("excep " + me);
log.error("发送连接mqtt异常" + me);
try {
mqttClient.disconnect();
mqttClient.close();
} catch (Exception e) {
}
//将 mqtt 置空
mqttClient = null;
me.printStackTrace();
}
}
}
}
return mqttClientSingletonList;
}
/**
* 发布消息给硬件
*/
public void publish(String version, String gateway, String content) {
StringBuffer topic = new StringBuffer();
topic.append(version).append("Download/").append(gateway);
int qos = configMqtt.getQos();
// mqtt多节点消息发送 -- 每个节点都发送一份消息让硬件消费
List<MqttClient> clientList = MqttConnect.getMoreInstance(configMqtt);
if (!clientList.isEmpty()) {
clientList.forEach(client -> {
threadPoolTaskExecutor.execute(() -> {
try {
// 创建消息
MqttMessage message = new MqttMessage(content.getBytes());
// 设置消息的服务质量
message.setQos(qos);
log.info("发送消息到MQTT供硬件消费");
log.info("================client:"+client.getClientId());
log.info("================topic:"+topic);
log.info("================message:"+message);
// 发布消息
client.publish(topic.toString(), message);
} catch (MqttException me) {
log.error("reason " + me.getReasonCode());
log.error("msg " + me.getMessage());
log.error("loc " + me.getLocalizedMessage());
log.error("cause " + me.getCause());
log.error("excep " + me);
log.error("发送连接mqtt异常" + me);
}
});
});
}
}
}
总结
近期有时间总结了一下前段时间搭建ActiveMQ + MQTT集群并且在微服务中使用的流程。经此,牛马小陈同学巩固了中间件和分布式概念知识。MQTT的分布式使用是出于自己对分布式的理解然后手写的,目前能正常进行分布式消费,对于MQTT的理解还不是很深,很多处理非常的粗糙,欢迎各位新手同学一起学习、各路大佬批评指正,谢谢!
ActiveMQ + MQTT使用docker方式部署如下:
ActiveMQ + MQTT 集群搭建(docker版本)