基本思路:
- 使用EMQX作为Mqtt broker
- mqtt-receive-server服务,用于接收设备上报的数据
- mqtt-sender-service服务,用于下发数据给设备
- KafKa实现数据解耦,mqtt-receive-server服务接收的数据简单处理下直接扔到Kafka中
- 云服务各业务系统从KafKa中消费数据,各业务需要下发数据的话,调用mqtt-sender-service接口下发数据给设备
基本流程
DashBoard 定义认证用户
定义Mqtt协议主题
// 设备激活
public final static String ACTIVATE = "mqtt/0/1";
// 设备重置
public final static String RESET = "mqtt/0/0";
// 上线
public final static String ONLINE = "mqtt/1/1";
// 下线
public final static String OFFLINE = "mqtt/1/0";
// 上行-设备上报数据到平台
public final static String REPORT = "mqtt/2/1";
// 下行-平台下发数据给设备
public final static String ISSUED = "%s/2/0";
设备认证流程
首先在云平台创建产品,生成PK/PS,用于Mqtt Broker的连接认证
将PK/PS烧录到设备中
设备开机启动,首次连接平台携带PK/PS/DK,mqtt连接成功后,云服务端会下发DS给到设备,并标识设备已激活
设备再次连接云服务,mqtt连接成功后,会校验DK/DS是否合法,不合法将设备踢下线。
设备订阅${clientId}/2/0主题
@PostConstruct
public void init() throws MqttException {
client.setCallback(new MqttCallbackHandler());
client.subscribe(String.format(MqttTopicConstant.ISSUED, client.getClientId()));
}
mqtt-receive-server服务
使用EMQX内置的用户,连接Mqtt Broker,clientId=mqtt_receive_server
订阅ACTIVATE 、RESET 、ONLINE 、OFFLINE 、REPORT 等主题
将接收的数据简单处理,转发到KafKa
mqtt:
broker-url: tcp://42.194.132.44:1883
client-id: mqtt_receive_server
username: mqtt_server
password: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0
@PostConstruct
public void init() throws MqttException {
client.setCallback(new MqttCallbackHandler(kafkaService));
subscribe(MqttTopicConstant.ACTIVATE);
subscribe(MqttTopicConstant.RESET);
subscribe(MqttTopicConstant.ONLINE);
subscribe(MqttTopicConstant.OFFLINE);
subscribe(MqttTopicConstant.REPORT);
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
String data = new String(message.getPayload());
log.info("接收消息主题:{}, Qos:{}, 消息内容:{}", topic, message.getQos(), data);
UpData upData = JSONObject.parseObject(data, UpData.class);
UpKafKaData upKafKaData = new UpKafKaData(topic, data);
log.info("upKafKaData: {}", JSON.toJSONString(upKafKaData));
kafkaService.sendData(UP_DATA_TOPIC, upData.getClientId(), JSON.toJSONString(upKafKaData));
}
mqtt-sender-service服务
使用EMQX内置的用户,连接Mqtt Broker,clientId=mqtt_sender_server
不订阅主题,只下发数据,下发数据主题为${clientId}/2/0
提供API给给业务子系统使用,用于下发数据给设备
mqtt:
broker-url: tcp://42.194.132.44:1883
client-id: mqtt_sender_server
username: mqtt_server
password: 9b31fa798e16532b0285e130b004836d33391f908f043f2ce0897eea0a669fa0
package com.angel.ocean.listener;
import com.alibaba.fastjson2.JSONObject;
import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.domain.UpKafKaData;
import com.angel.ocean.domain.client.ActivateData;
import com.angel.ocean.mqtt.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.List;
import static com.angel.ocean.contants.KafkaTopicConstant.UP_DATA_TOPIC;
@Slf4j
@Component
public class UpDataConsumerListener {
@Resource
private MqttService mqttService;
/**
* 批量消费
*/
@KafkaListener(topics = UP_DATA_TOPIC, containerFactory = "batchFactory")
public void batchListen(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
try {
log.info("UpDataConsumerListener.batchListen(), records.size: {}", records.size());
for (ConsumerRecord<String, String> record : records) {
UpKafKaData data = JSONObject.parseObject(record.value(), UpKafKaData.class);
log.info("{}", record.value());
handler(data.getTopic(), data.getData());
}
} catch (Exception e) {
log.error("UpDataConsumerListener.batchListen() Exception:{}", e.getMessage(), e);
} finally {
// 手动确认
ack.acknowledge();
}
}
private void handler(String topic, String data) {
switch (topic) {
case MqttTopicConstant.ACTIVATE:
activateHandler(data);
break;
case MqttTopicConstant.RESET:
otherHandler(data);
break;
case MqttTopicConstant.OFFLINE:
otherHandler(data);
break;
case MqttTopicConstant.ONLINE:
otherHandler(data);
break;
case MqttTopicConstant.REPORT:
otherHandler(data);
break;
default:
otherHandler(data);
}
}
private void activateHandler(String data) {
ActivateData activateData = JSONObject.parseObject(data, ActivateData.class);
String clientId = activateData.getClientId();
mqttService.publish(String.format(MqttTopicConstant.ISSUED, clientId), "200");
}
private void otherHandler(String data) {
log.info("{}", data);
}
}
package com.angel.ocean.controller;
import com.angel.ocean.common.ApiResult;
import com.angel.ocean.contants.MqttTopicConstant;
import com.angel.ocean.mqtt.MqttService;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttTopic;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@Slf4j
@RestController
@RequestMapping("/mqtt/server")
public class MqttController {
@Resource
private MqttClient server;
@Resource
private MqttService mqttService;
/**
* 数据下发接口
* @param clientId
* @param data
* @return
*/
@RequestMapping("/sender")
public ApiResult<?> publish(String clientId, String data) {
String topic = String.format(MqttTopicConstant.ISSUED, clientId);
mqttService.publish(topic, data);
if(server.isConnected()) {
MqttMessage message = new MqttMessage(data.getBytes());
message.setQos(0);
try {
server.publish(topic, message);
log.info("Message published, topic:{}, data:{}", topic, data);
} catch (MqttException e) {
log.error("Message publish failed, topic:{}", topic, e);
return ApiResult.error();
}
return ApiResult.success();
}
log.info("Message publish failed, not online.");
return ApiResult.error();
}
}
代码验证
场景:设备上报消息,云服务端回复消息给设备; 云服务主动下发数据给设备。
模拟设备上报消息, 接收云平台回复
发了两次:
mqtt-client 本地客户端日志:
mqtt-receive-server云服务日志:
mqtt-sender-server云服务日志:
模拟云平台主动下发数据
mqtt-sender-server云服务主动下发的日志:
mqtt-client数据接收日志: