# emqx开启CoAP网关
请参考【https://blog.csdn.net/chenhz2284/article/details/139562749?spm=1001.2014.3001.5502】
# 写一个emqx的客户端程序,不断地往topic【server/1】发消息
【pom.xml】
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.3.12.RELEASE</version>
</dependency>
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.mqttv5.client</artifactId>
<version>1.2.5</version>
</dependency>
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.49</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.11</version>
</dependency>
【MyDemo2MqttCallback.java】
package com.chz.myMqttV5.demo2;
@Slf4j
public class MyDemo2MqttCallback implements MqttCallback
{
private String clientId;
public MyDemo2MqttCallback(String clientId)
{
this.clientId = clientId;
}
public void connectComplete(boolean reconnect, String serverURI) {
log.info("{}::MyMqttCallback, reconnect={}, serverURI={}", clientId, reconnect, serverURI);
}
public void disconnected(MqttDisconnectResponse disconnectResponse) {
log.info("{}::disconnected, disconnectResponse={}", clientId, disconnectResponse.getReasonString());
}
public void deliveryComplete(IMqttToken token) {
log.info("{}::deliveryComplete, disconnectResponse={}", clientId, token.isComplete());
}
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("{}::messageArrived, topic={}, qos={}, message={}", clientId, topic, message.getQos(), new String(message.getPayload()));
}
public void mqttErrorOccurred(MqttException exception) {
log.info("{}::mqttErrorOccurred, disconnectResponse={}", clientId, exception.getMessage());
}
public void authPacketArrived(int reasonCode, MqttProperties properties) {
log.info("{}::authPacketArrived, reasonCode={}", clientId, reasonCode);
}
}
【MyDemo2MqttV5Sender.java】
package com.chz.myMqttV5.demo2;
@Slf4j
public class MyDemo2MqttV5Sender {
private static String clientId = MyDemo2MqttV5Sender.class.getSimpleName();
public static void main(String[] args) throws InterruptedException {
String broker = "tcp://192.168.44.228:1883";
int subQos = 1;
int pubQos = 1;
String msg;
try {
MqttClient client = new MqttClient(broker, clientId);
MqttConnectionOptions options = new MqttConnectionOptions();
client.setCallback(new MyDemo2MqttCallback(clientId));
client.connect(options);
client.subscribe("device/#", subQos);
for(int i=0; i<10000; i++){
msg = "I am "+clientId+":" + i + "\r\n";
MqttMessage message = new MqttMessage(msg.getBytes());
message.setQos(pubQos);
client.publish("server/1", message);
Thread.sleep(3000L);
}
client.disconnect();
client.close();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
启动【MyDemo2MqttV5Sender】,每隔3秒不断地往topic【server/1】发消息。
# 下面使用libcoap对emqx进行测试
创建与emqx的连接,执行:
coap-client -m post -e "" "coap://192.168.44.228/mqtt/connection?clientid=123&username=admin&password=public"
注意url里面带有clientid,下面还会继续用到。
返回的那个数据是token,下面也会用到。
向emqx发送消息,执行:
coap-client -m post -e "Hi, this is libcoap" "coap://192.168.44.228/ps/device/1?clientid=123&token=3167748641"
注意url参数里面带有token,clientid也必须跟上面建立连接时候的clientid一样
从截图可以看到mqtt的消息订阅者接收到libcoap发出的消息了。
使用libcoap从emqx订阅消息,执行:
coap-client -m get -s 6000 -O 6,0x00 -o - -T "obstoken" "coap://192.168.44.228/ps/server/%23?clientid=123&token=3167748641"
注意订阅的主题【server/%23】是经过url编码的,编码之前的主题名是【server/#】
从截图可以看到消费到消息了
# 总结
从上面的测试可以看出,只要emqx安装了CoAP网关,通过CoAP协议也是可以跟emqx进行通讯,发布和订阅消息的。不过尽量使用mqtt协议比较好