1)RabbitMQ启用MQTT插件
root@mq:/# rabbitmq-plugins enable rabbitmq_mqtt
Enabling plugins on node rabbit@mq:
rabbitmq_mqtt
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_mqtt
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@mq...
The following plugins have been enabled:
rabbitmq_mqtt
started 1 plugins.
root@mq:/# rabbitmq-plugins enable rabbitmq_web_mqtt
Enabling plugins on node rabbit@mq:
rabbitmq_web_mqtt
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_mqtt
rabbitmq_web_dispatch
rabbitmq_web_mqtt
Applying plugin configuration to rabbit@mq...
The following plugins have been enabled:
rabbitmq_web_mqtt
started 1 plugins.
root@mq:/#
2)RabbitMQ管理控制台查看
如果插件启动成功,rabbitmq会打开1883和15675端口:
3)用MQTTX工具测试
4)用eclipse paho客户端测试
添加依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
收发消息测试
@RestController
public class DemoController {
@GetMapping("/publish")
public String publish() throws MqttException {
MqttClientPersistence persistence = new MemoryPersistence();;//内存持久化
MqttClient client = new MqttClient("tcp://192.168.137.138:1883", "abc", persistence);
//连接选项中定义用户名密码和其它配置
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(true);//参数为true表示清除缓存,也就是非持久化订阅者,这个时候只要参数设为true,一定是非持久化订阅者。而参数设为false时,表示服务器保留客户端的连接记录
options.setAutomaticReconnect(true);//是否自动重连
options.setConnectionTimeout(30);//连接超时时间 秒
options.setKeepAliveInterval(10);//连接保持检查周期 秒
options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1); //版本
options.setUserName("xjs1919");
options.setPassword("123321".toCharArray());
// client.setManualAcks(true);
client.connect(options);//连接
//订阅topic
client.subscribe("demoTopic", 2);
// 设置回调,将来收到消息的时候会被回调
client.setCallback(new MqttCallbackExtended() {
@Override
public void connectComplete(boolean reconnect, String serverURI) {
System.out.println("连接完成");
}
@Override
public void connectionLost(Throwable cause) {
System.out.println("连接丢失");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("收到消息,topic:"+topic + ", msg:" + new String(message.getPayload()));
//client.messageArrivedComplete(message.getId(),message.getQos());
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// Qos0: 当消息发送出去就回调
// Qos1: 当发送者收到了puback的时候的回调
// Qos2: 当发送者收到了pubcomp的时候的回调
System.out.println("消息发送完成");
}
});
client.publish("demoTopic", "hello,这是一个测试消息!".getBytes(), 2, false);
return "ok";
}
}
参考文章:
https://www.cnblogs.com/motion/p/14974024.html
https://blog.csdn.net/u013615903/article/details/131395264