介绍
我们可以想象这么一个场景,我们java应用想要采集到电表a的每小时的用电信息,我们怎么拿到电表的数据?一般我们会想 直接 java 后台发送请求给电表,然后让电表返回数据就可以了,事实上,我们java应用发送请求请求电表的数据信息并不是发到电表上,而是发送到 服务端 (broker)上,请求服务器 给我们电表的信息,而电表会把数据 按照mqtt协议 源源不断的发送到服务端,服务端可以把数据存储到物联网数据库上,也可以由我们java应用手动存储到物联网数据库上
而我们怎么知道电表发送到服务端的哪里,java应用又怎么请求到该电表发送的位置?
这就 引出了 一个概念,主题 (topic) ,这个topic在mqtt中不需要手动的创建,只要又客户端订阅或者发布消息,主题就会被自动创建出来
而我们服务端用的最多的就是 集成好的emqx服务器,本文我们也用的是集成好的emqx的服务端
,我们先是 一个电表 订阅好一个固定的主题,然后 源源不断的往服务端发消息,然后我们java应用订阅这个主题,这样 java应用就能持续的拿到电表的数据了
具体什么是主题,主题怎么设置的 ,mqtt协议的具体协议内容,直接登录emqx官网查看即可
MQTT 最全教程:从入门到精通 | EMQ
而emqx服务器是怎么在linux系统上搭建的呢,具体直接看文档即可,输入文档对应的yum命令就可以直接 在linux服务器上安装了
在 CentOS/RHEL 上安装 EMQX | EMQX文档
文本主要书写代码的实现
代码实现
我们的yml文件如下
我们后续 java应用订阅消息 都要到服务端 emqx 的1883端口
实体类如下
这里解释一下 clientid 是不固定的,随机的每一个发布/订阅消息的客户端都有一个唯一的clientid
而username 和password 是 客户端连接到 服务端的认证账户,多个客户端可以使用一个 账号密码
客户端代码实现
@Slf4j
@Component
@RequiredArgsConstructor
public class EMQXClient {
private final MqttDefaultProperties mqttDefaultProperties;
private final IMessageCallbackImpl mqttCallback;
private IMqttClient mqttClient;
/**
* 初始化客户端对象
*/
public boolean initMqttClient(String clientId,String serverUrl) {
MemoryPersistence memoryPersistence = new MemoryPersistence();
try {
if(Objects.isNull(clientId)){
clientId= mqttDefaultProperties.getDefaultClientId();
}
if(Objects.isNull(serverUrl)){
serverUrl= mqttDefaultProperties.getServerUrl();
}
mqttClient = new MqttClient(serverUrl, clientId, memoryPersistence);
} catch (MqttException e) {
log.info("mqtt创建异常:{}", e.getMessage());
return false;
}
return true;
}
public boolean initMqttClient() {
MemoryPersistence memoryPersistence = new MemoryPersistence();
try {
mqttClient = new MqttClient(mqttDefaultProperties.getServerUrl(),mqttDefaultProperties.getDefaultClientId(), memoryPersistence);
} catch (MqttException e) {
log.info("mqtt创建异常:{}", e.getMessage());
return false;
}
return true;
}
/**
* 获取连接
* @return
*/
public boolean connect() {
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//当客户端会话关闭的时候 对应的broker也关闭
mqttConnectOptions.setCleanSession(true);
//自动重连
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setUserName(mqttDefaultProperties.getDefaultUserName());
mqttConnectOptions.setPassword(mqttDefaultProperties.getDefaultPassword().toCharArray());
mqttClient.setCallback(mqttCallback);
try {
mqttClient.connect(mqttConnectOptions);
} catch (MqttException e) {
log.info("客户端连接异常:{}",e.getMessage());
return false;
}
return true;
}
public boolean connect(String username,String password) {
if(Objects.isNull(username)){
username=mqttDefaultProperties.getDefaultUserName();
}
if(Objects.isNull(password)){
password= mqttDefaultProperties.getDefaultPassword();
}
MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
//当客户端会话关闭的时候 对应的broker也关闭
mqttConnectOptions.setCleanSession(true);
//自动重连
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setUserName(username);
mqttConnectOptions.setPassword(password.toCharArray());
mqttClient.setCallback(mqttCallback);
try {
mqttClient.connect(mqttConnectOptions);
} catch (MqttException e) {
log.info("客户端连接异常:{}",e.getMessage());
return false;
}
return true;
}
/**
* 断开连接
* @return
*/
public boolean disConnect(){
try {
mqttClient.disconnect();
} catch (MqttException e) {
log.info("客户端断开连接异常:{}",e.getMessage());
return false;
}
return true;
}
/**
*
* @param topic 主题
* @param msg 消息内容
* @param qosEnum
* @param retain 新的订阅者来了是否能拿到之前的 最新的一次消息
* @return
*/
public boolean publish(String topic, String msg, QosEnum qosEnum,boolean retain){
int uniqueInt = (int) (System.nanoTime() & 0xFFFFFFFFL);//取纳秒时间戳低32位
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(msg.getBytes());
mqttMessage.setQos(qosEnum.getType());
mqttMessage.setRetained(retain);
mqttMessage.setId(uniqueInt);
try {
mqttClient.publish(topic,mqttMessage);
} catch (MqttException e) {
log.info("客户端发送消息失败:{}",e.getMessage());
return false;
}
return true;
}
/**
*
* @param topicFilters 要订阅的主题 例子 testtopic/#
* @param qosEnum
* @return
*/
public boolean subscribe(String topicFilters,QosEnum qosEnum){
try {
mqttClient.subscribe(topicFilters,qosEnum.getType());
} catch (MqttException e) {
log.info("订阅主题失败:{}",e.getMessage());
return false;
}
return true;
}
public boolean unSubscribe(String topicFilter){
try {
mqttClient.unsubscribe( topicFilter);
} catch (MqttException e) {
log.info("取消订阅主题失败:{}",e.getMessage());
return false;
}
return true;
}
}
我们着重关注的是
我们想连接 服务端 是不是得有 一个client ,那这个client就对应IMqttclient
,我们java应用客户端连接上服务端之后,是不是得订阅主题,订阅之后的逻辑在哪里,就在
IMessageCallbackImpl
这里面就是 书写的 客户端收到服务端发来的消息之后的处理情况
@Slf4j
@Component
public class IMessageCallbackImpl implements MessageCallback {
@Override
public void connectionLost(Throwable cause) {
//丢失对服务端的连接后触发该方法回调,此处可以做一些特殊处理,比如重连 或者记录 日志之类的
log.info("丢失了对broker的连接");
}
/**
* 订阅到消息后的回调
* 该方法由mqtt客户端同步调用,在此方法未正确返回之前,不会发送ack确认消息到broker
* 一旦该方法向外抛出了异常客户端将异常关闭,当再次连接时;所有QoS1,QoS2且客户端未进行ack确认的
消息都将由broker服务器再次发送到客户端
* @param topic
* @param message
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
log.info("订阅到了消息;topic={},messageid={},qos={},msg={}",
topic,
message.getId(),
message.getQos(),
new String(message.getPayload()));
}
/**
* 消息发布完成且收到ack确认后的回调
* QoS0:消息被服务端发出后触发一次
* QoS1:当收到broker的PUBACK消息后触发
* QoS2:当收到broer的PUBCOMP消息后触发
* @param token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
int messageId = token.getMessageId();
String[] topics = token.getTopics();
log.info("消息发送完成,messageId={},topics={}",messageId,topics);
}
}
我们用一个 bean 在初始化的时候就订阅一个主题,这样 只要有 客户端往主题上发消息,我们就能收到了
而我们这个时候 没有硬件,怎么办呢,很简单,直接下载一个mqttx 模拟硬件发送消息到主题,启动springboot,就能看到消息的发送与接收了
当然 这实现的紧紧是最简单的协议的发送接收,后面还有许多的高级功能等我们使用,具体的可以查阅官方文档