spring boot+netty 搭建MQTT broken

一、项目结构
在这里插入图片描述
二、安装依赖

<!--      netty包  -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.75.Final</version>
        </dependency>
        <!--   常用JSON工具包 -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.80</version>
        </dependency>
        <!--mqtt服务端-->
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

三、配置文件

server:
  port: 8880
netty:
  mqtt:
    port: 1883
#    mqtt账号
    username: admin
#mqtt密码
    password: 123456
# 日记配置
logging:
  level:
    # 开启debug日记打印
    com.hyx: debug

四、新建具体类
1、新建BootNettyMqttMsgBack类

package com.example.springnettymqtt.MQTTServer.callback;

import com.example.springnettymqtt.MQTTServer.config.MQTTServerProperties;
import io.netty.channel.Channel;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import io.netty.channel.ChannelId;
import io.netty.handler.codec.mqtt.*;

import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static com.example.springnettymqtt.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceAdd;
import static com.example.springnettymqtt.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
import static com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer.*;


@Component
@RequiredArgsConstructor
public class BootNettyMqttMsgBack {
    private static final Logger log =  LoggerFactory.getLogger(BootNettyMqttMsgBack.class);
    private final MQTTServerProperties MQTTserverProperties;

    /**
     * 	确认连接请求
     * @param channel
     * @param mqttMessage
     */
    public void connack (Channel channel, MqttMessage mqttMessage) {
        MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
        MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
        MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();

        //	构建返回报文, 可变报头
        MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());
        //	构建返回报文, 固定报头
        MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
        //	构建CONNACK消息体
        MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
        //log.info("back--"+connAck.toString());
        log.debug("设备上线,channelId:{}", channel.id());
        MQTTdeviceAdd(channel);
        channel.writeAndFlush(connAck);
    }
    public void disconnack (Channel channel, MqttMessage mqttMessage) {
        MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
        MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
        MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
        //	构建返回报文, 可变报头
        MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_REFUSED_BANNED, mqttConnectVariableHeaderInfo.isCleanSession());
        //	构建返回报文, 固定报头
        MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.DISCONNECT,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
        //	构建CONNACK消息体
        MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
        //log.info("back--"+connAck.toString());
        channel.writeAndFlush(connAck);
        log.debug("设备下线,channelId:{}", channel.id());
        MQTTdeviceRemove(channel);
    }

    /**
     * 	根据qos发布确认
     * @param channel
     * @param mqttMessage
     */
    public  void puback (Channel channel, MqttMessage mqttMessage) {
        MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
        MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
        MqttQoS qos =  mqttFixedHeaderInfo.qosLevel();
        //注意:	readableBytes会改变写指针位置,使后续推送数据时,读取数据为空,需要重置	读指针
        byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
        mqttPublishMessage.payload().readBytes(headBytes);
        String data = new String(headBytes);
        System.out.println("publish data-->"+data);
        //重置读取的指针
        mqttPublishMessage.payload().resetReaderIndex();
        switch (qos) {
            case AT_MOST_ONCE: 		//	至多一次
                //推送到订阅的客户端
                subscribSend(mqttMessage);
                break;
            case AT_LEAST_ONCE:		//	至少一次
                //	构建返回报文, 可变报头
                MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
                //	构建返回报文, 固定报头
                MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
                //	构建PUBACK消息体
                MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
                log.info("back--"+pubAck.toString());
                channel.writeAndFlush(pubAck);
                //推送到订阅的客户端
                subscribSend(mqttMessage);
                break;
            case EXACTLY_ONCE:		//	刚好一次
                //	构建返回报文, 固定报头
                MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);
                //	构建返回报文, 可变报头
                MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
                MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);
                //服务端收到publis的QoS2的消息之后,服务端需要保存一个msgid的记录,并且进入一个状态,
                // 即之后不管来了几个这个msgid的消息,都不管他,认为是重复的,丢弃。
                //接收到publish的QoS2消息之后,不能马上投递给上层,而是在本地做持久化,将消息保存起来。
                int mqttMessageId=mqttPublishMessage.variableHeader().packetId();
                if(!mqttMessageIdMap.containsKey(mqttMessageId)){
                    //不存在此消息,将此消息暂存
                    mqttMessageIdMap.put(mqttMessageId, mqttMessage);
                    log.info("消息ID"+mqttMessageId+"-->Qos2级别消息,消息缓存");
                }else{
                    //重复发送消息,直接返回
                    log.info(mqttPublishMessage.variableHeader().packetId()+"消息重复:"+mqttPublishMessage.fixedHeader().isDup());
                    return;
                }
                channel.writeAndFlush(mqttMessageBack);
                break;
            default:
                break;
        }
    }

    /**
     * 	发布完成 qos2
     * @param channel
     * @param mqttMessage
     */
    public  void pubcomp (Channel channel, MqttMessage mqttMessage) {
        MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
        //	构建返回报文, 固定报头
        MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02);
        //	构建返回报文, 可变报头
        MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
        MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);
        //log.info("back--"+mqttMessageBack.toString());
        channel.writeAndFlush(mqttMessageBack);
    }

    /**
     * 	订阅确认
     * @param channel
     * @param mqttMessage
     */
    public  void suback(Channel channel, MqttMessage mqttMessage) {
        MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;
        MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader();
        //	构建返回报文, 可变报头
        MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
        Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());
        //log.info(topics.toString());
        List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());
        for (int i = 0; i < topics.size(); i++) {
            grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
        }
        //	构建返回报文	有效负载
        MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);
        //	构建返回报文	固定报头
        MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());
        //	构建返回报文	订阅确认
        MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);
        channel.writeAndFlush(subAck);
    }

    /**
     * 	取消订阅确认
     * @param channel
     * @param mqttMessage
     */
    public  void unsuback(Channel channel, MqttMessage mqttMessage) {
        MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();
        //	构建返回报文	可变报头
        MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
        //	构建返回报文	固定报头
        MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
        //	构建返回报文	取消订阅确认
        MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);
        channel.writeAndFlush(unSubAck);
    }

    /**
     * 	心跳响应
     * @param channel
     * @param mqttMessage
     */
    public  void pingresp (Channel channel, MqttMessage mqttMessage) {
        //	心跳响应报文	11010000 00000000  固定报文
        MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
        MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);
        channel.writeAndFlush(mqttMessageBack);
    }

    /**
     * 订阅推送
     */
    public  void subscribSend(MqttMessage mqttMessage){
        MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
        Object obj=mqttMessage.variableHeader();
        MqttPublishVariableHeader variableHeader=(MqttPublishVariableHeader)obj;
        String topicName=variableHeader.topicName();
        int packetId=variableHeader.packetId();
        //固定消息头 注意此处的消息类型PUBLISH mqtt协议
        MqttFixedHeader FixedHeader = new MqttFixedHeader(MqttMessageType.PUBLISH,false, MqttQoS.AT_LEAST_ONCE,false,0);
        //可变消息头
        MqttPublishVariableHeader mqttPublishVariableHeader=new MqttPublishVariableHeader(topicName,packetId);
        //推送消息体
        MqttPublishMessage mqttPublishMessageResult=new MqttPublishMessage(FixedHeader,mqttPublishVariableHeader, mqttPublishMessage.content());
        log.info("推送地址————》"+topicName);
        if(subscribeMap.containsKey(topicName)){
            List<ChannelId> channelList=subscribeMap.get(topicName);
            for (int i = 0; i < channelList.size(); i++) {
                //订阅次此topic的Mqtt客户端搜到此消息,
                Channel channelSub=MQTTdeviceChannelGroup.find(channelList.get(i));
                //writeAndFlush会将ByteBuf的引用释放refCnt会减去1,使用retain加1
                mqttPublishMessageResult.retain();
                channelSub.writeAndFlush(mqttPublishMessageResult);
            }
            mqttPublishMessageResult.release();

        }
    }

    /**
     * 用户鉴权
     */
    public boolean authentication(MqttConnectPayload payload){

        String username=MQTTserverProperties.getUsername();
        String password=MQTTserverProperties.getPassword();
        //无账号或者无密码通过
        if(stringEmptyCheck(password)||stringEmptyCheck(username)){
            return true;
        }else {
            //消息中账号密码为空
            if(payload.passwordInBytes()==null||payload.userName()==null){
                return false;
            }
            String passwordAuthen=new String(payload.passwordInBytes());
            String usernameAuthen=payload.userName();
            if(password.equals(passwordAuthen)&&username.equals(usernameAuthen)){
                return true;
            }else {
                return false;
            }
        }
    }
    //判断字符字符为空
    private boolean stringEmptyCheck(String str){
        if(str==null||"".equals(str)){
            return true;
        }else {
            return false;
        }
    }
}

2、新建MqttChannelInit类

package com.example.springnettymqtt.MQTTServer.channel;

import com.example.springnettymqtt.MQTTServer.handler.MQTTMessageHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.RequiredArgsConstructor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
@RequiredArgsConstructor
public class MqttChannelInit extends ChannelInitializer<SocketChannel> {
    @Autowired
    private MQTTMessageHandler MQTTmessageHandler;

    @Override
    protected void initChannel(SocketChannel channel) {
        channel.pipeline()
                // 心跳时间
                .addLast("idle", new IdleStateHandler(600, 600, 1200, TimeUnit.SECONDS))
                .addLast("encoder", MqttEncoder.INSTANCE)
                .addLast("decoder", new MqttDecoder())
                .addLast(MQTTmessageHandler);
    }
}

3、新建MQTTDeviceManerger类

package com.example.springnettymqtt.MQTTServer.channel;

import lombok.extern.slf4j.Slf4j;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import lombok.extern.slf4j.Slf4j;

import java.util.List;
import java.util.Map;

import static com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer.*;

@Slf4j
public class MQTTDeviceManerger {
    /**
     * 设备接入
     */
    public static void MQTTdeviceAdd(Channel channel) {
        if(!MQTTdeviceChannelGroup.contains(channel)) {
            MQTTdeviceChannelGroup.add(channel);
        }
    }
    /**
     * 设备移除和和订阅的topic
     */
    public static void MQTTdeviceRemove(Channel channel) {
        if(MQTTdeviceChannelGroup.contains(channel)) {
            MQTTdeviceChannelGroup.remove(channel);
            MQTTremoveDeviceChannelId(channel.id());
            //移除topic中的这个设备的chanelid
            for (Map.Entry<String, List<ChannelId>> listEntry : subscribeMap.entrySet()) {
                try {
                    if (listEntry.getValue().contains(channel.id())) {
                        listEntry.getValue().remove(channel.id());
                        log.info(channel.id() + "下线,topic:  " + listEntry.getKey() + "中移除此id");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public static void MQTTremoveDeviceChannelId(ChannelId channelId) {
        MQTTdeviceMap.entrySet().removeIf(item -> item.getValue().equals(channelId));
    }
}

4、新建配置类

package com.example.springnettymqtt.MQTTServer.config;

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConfigurationProperties(prefix = MQTTServerProperties.MQTTPREFIX)
@Data
public class MQTTServerProperties {
    public static final String MQTTPREFIX = "netty.mqtt";

    /**
     * 服务器端口
     */
    private Integer port;

    /**
     * mqtt服务器用户名
     */
    private String username;

    /**
     * mqtt服务器密码
     */
    private String password;
}

5、新建MQTTMessageHandler类

package com.example.springnettymqtt.MQTTServer.handler;

import com.example.springnettymqtt.MQTTServer.callback.BootNettyMqttMsgBack;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import io.netty.channel.*;
import io.netty.handler.codec.mqtt.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static com.example.springnettymqtt.MQTTServer.channel.MQTTDeviceManerger.MQTTdeviceRemove;
import static com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer.*;

@Slf4j
@Component
@ChannelHandler.Sharable
@RequiredArgsConstructor
public class MQTTMessageHandler extends ChannelInboundHandlerAdapter {
    @Autowired
    private BootNettyMqttMsgBack bootNettyMqttMsgBack;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (null != msg) {
            MqttMessage mqttMessage = (MqttMessage) msg;
            log.info("info--"+mqttMessage.toString());
            MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
            Channel channel = ctx.channel();
            if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){
                //用户鉴权(配置文件中配置账号和密码,如果没有默认)
                log.warn("正在尝试鉴权");
                boolean authentag=  bootNettyMqttMsgBack.authentication((MqttConnectPayload)mqttMessage.payload());
                if(!authentag){
                    return;
                }
                //	在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
                if(MQTTdeviceChannelGroup.contains(channel)){
                    //移除次设备channel和topic
                    bootNettyMqttMsgBack.disconnack(channel,mqttMessage);
                }
                //	to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息
                bootNettyMqttMsgBack.connack(channel, mqttMessage);
            }
            //对于没有鉴权的设备,请求不处理
            if(!MQTTdeviceChannelGroup.contains(channel)){
                log.warn(channel.id()+"无鉴权操作");
                return;
            }
            switch (mqttFixedHeader.messageType()){
                case PUBLISH:		//	客户端发布消息
                    //	PUBACK报文是对QoS 1等级的PUBLISH报文的响应
                    bootNettyMqttMsgBack.puback(channel, mqttMessage);
                    break;
                // PUBREL	Qos2级别消息,客户端返回
                case PUBREL:
                    //	PUBREL(客户端发给服务端)报文是对PUBREC(服务端发给客户端)报文的响应
                    //服务端收到pubrel之后,正式将消息投递给上层应用层。
                    MqttMessageIdVariableHeader VariableHeader=(MqttMessageIdVariableHeader)mqttMessage.variableHeader();
                    if(mqttMessageIdMap.containsKey(VariableHeader.messageId())) {
                        log.warn("移除消息缓存-->消息id"+VariableHeader.messageId());
                        bootNettyMqttMsgBack.subscribSend(mqttMessageIdMap.get(VariableHeader.messageId()));
                        bootNettyMqttMsgBack.pubcomp(channel, mqttMessage);
                        mqttMessageIdMap.remove(VariableHeader.messageId());
                    }else {
                        //后续多次收到REL消息,制作comp响应
                        bootNettyMqttMsgBack.pubcomp(channel, mqttMessage);
                    }
                    break;
                case SUBSCRIBE:		//	客户端订阅主题
                    //	客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。
                    //	为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。
                    //	SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端
                    // 	to do
                    bootNettyMqttMsgBack.suback(channel, mqttMessage);
                    MqttSubscribePayload SubscribePayload=(MqttSubscribePayload) mqttMessage.payload();;
                    for (int i = 0; i < SubscribePayload.topicSubscriptions().size(); i++) {
                        String topicname=SubscribePayload.topicSubscriptions().get(i).topicName();
                        boolean tag=subscribeMap.containsKey(topicname);
                        if(tag){
                            List<ChannelId> channelIds=subscribeMap.get(topicname);
                            if(!channelIds.contains(channel.id())) {
                                channelIds.add(channel.id());
                            }else {
                                log.warn(channel.id()+"重复订阅");
                            }
                            subscribeMap.put(topicname, channelIds);
                        }else {
                            List<ChannelId> channelIds=new ArrayList<>();
                            channelIds.add(channel.id());
                            subscribeMap.put(topicname,channelIds);
                        }
                        log.info(channel.id()+"订阅地址————》"+topicname);
                    }


                    break;
                case UNSUBSCRIBE:	//	客户端取消订阅
                    //	客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题
                    //	to do
                    bootNettyMqttMsgBack.unsuback(channel, mqttMessage);
                    Object Unsubscribe=mqttMessage.payload();
                    MqttUnsubscribePayload unsubscribePayload=(MqttUnsubscribePayload)Unsubscribe;
                    int len=unsubscribePayload.topics().size();
                    for (int i = 0; i < len; i++) {
                        String topicname=unsubscribePayload.topics().get(i);
                        boolean tag=subscribeMap.containsKey(topicname);
                        if(tag){
                            List<ChannelId> channelIds=subscribeMap.get(topicname);
                            channelIds.remove(channel.id());
                            subscribeMap.put(topicname,channelIds);
                        }else {
                            log.error("不存在订阅地址——>"+topicname);
                        }
                        log.info(channel.id()+"取消订阅地址————》"+topicname);
                    }

                    break;
                case PINGREQ:		//	客户端发起心跳
                    //	客户端发送PINGREQ报文给服务端的
                    //	在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着
                    //	请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开
                    bootNettyMqttMsgBack.pingresp(channel, mqttMessage);
                    break;
                case DISCONNECT:	//	客户端主动断开连接
                    log.debug("设备下线,channelId:{}", channel.id());
                    MQTTdeviceRemove(channel);
                    //	DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0
                    //	to do
                    break;
                default:
                    break;
            }
        }
        else {
            return;
        }
    }

    /**
     * 	从客户端收到新的数据、读取完成时调用
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
    }

    /**
     * 	客户端与服务端第一次建立连接时执行 在channelActive方法之前执行
     */
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        super.channelRegistered(ctx);
    }

    /**
     * 	客户端与服务端 断连时执行 channelInactive方法之后执行
     */
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        log.warn(ctx.channel().id()+"连接断开");
        MQTTdeviceRemove(ctx.channel());
        super.channelUnregistered(ctx);

    }

    /**
     * 	当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel channel = ctx.channel();
        log.warn(channel.id()+"连接异常断开。。。。。。。");
        MQTTdeviceRemove(ctx.channel());
        super.exceptionCaught(ctx, cause);
        if(channel.isActive()){
            ctx.close();
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        log.debug("\n");
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
    }
    /**
     * 	服务端 当读超时时 会调用这个方法
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        super.userEventTriggered(ctx, evt);
        ctx.close();
    }
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        super.channelWritabilityChanged(ctx);
    }
}

6、新建接口IMQTTServer及其实现类MQTTServer

package com.example.springnettymqtt.MQTTServer.server;

import javax.annotation.PreDestroy;

public interface IMQTTServer {
    /**
     * 主启动程序,初始化参数
     *
     * @throws Exception 初始化异常
     */
    void start() throws Exception;

    /**
     * 优雅的结束服务器
     *
     * @throws InterruptedException 提前中断异常
     */
    @PreDestroy
    void destroy() throws InterruptedException;
}

package com.example.springnettymqtt.MQTTServer.server.impl;

import com.example.springnettymqtt.MQTTServer.channel.MqttChannelInit;
import com.example.springnettymqtt.MQTTServer.config.MQTTServerProperties;
import com.example.springnettymqtt.MQTTServer.server.IMQTTServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PreDestroy;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
@Slf4j
@RequiredArgsConstructor
public class MQTTServer implements IMQTTServer {
    private final MqttChannelInit mqttChannelInit;

    private final MQTTServerProperties MQTTserverProperties;

    //保存接入的MQTT设备channel
    public static ChannelGroup MQTTdeviceChannelGroup;
    //保存订阅地址和chanelid,当推送数据时,会向此订阅地址的多个channel发送数据
    public static Map<String, List<ChannelId>> subscribeMap =new ConcurrentHashMap();
    //保存设备名称和通道编号,向设备发送消息可以通过名称找到通道
    public static ConcurrentHashMap<String, ChannelId> MQTTdeviceMap = new ConcurrentHashMap<>();
    //存放Qos2消息等级的消息ID,这里可使用redis之类的工具做缓存,为了简化配置,使用map暂存
    public static ConcurrentHashMap<Integer, MqttMessage> mqttMessageIdMap=new ConcurrentHashMap();
    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;

    @Override
    public void start() {
        log.info("初始化 Mqttserver ...");
        bossGroup = new NioEventLoopGroup();
        workerGroup =  new NioEventLoopGroup();
        MQTTdeviceChannelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
        this.MqttServer();
    }
    /**
     * 初始化
     */
    private void MqttServer() {
        try {
            new ServerBootstrap()
                    .group(bossGroup, workerGroup)
                    .channel( NioServerSocketChannel.class )
                    .localAddress(new InetSocketAddress(MQTTserverProperties.getPort()))
                    // 配置 编码器、解码器、业务处理
                    .childHandler(mqttChannelInit)
                    // tcp缓冲区
                    .option(ChannelOption.SO_BACKLOG, 128)
                    // 将网络数据积累到一定的数量后,服务器端才发送出去,会造成一定的延迟。希望服务是低延迟的,建议将TCP_NODELAY设置为true
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    // 保持长连接
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    // 绑定端口,开始接收进来的连接
                    .bind().sync();
            log.info("MQTT服务启动成功!开始监听端口:{}", MQTTserverProperties.getPort());
        } catch (Exception e) {
            e.printStackTrace();
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    /**
     * 销毁
     */
    @PreDestroy
    @Override
    public void destroy() {
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

7、新建启动类

package com.example.springnettymqtt.startServer;

import com.example.springnettymqtt.MQTTServer.server.impl.MQTTServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
public class StartSrver {

    @Autowired
    private MQTTServer mqttServer;
    @PostConstruct
    public void startNetty(){

        new Thread(() -> {
            try {
                mqttServer.start();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/129551.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

C# Socket通信从入门到精通(7)——单个异步TCP服务器监听单个客户端C#代码实现

前言: 我们在开发TCP服务器程序的时候,有的时候需要一些异步的应用,比如我读取客户端发送的数据,但是服务器程序不能一直等待客户端数据发送过来,服务器要先做一些别的事情,这个时候C# Socket通信从入门到精通(5)——单个同步TCP服务器监听一个客户端C#代码实现这篇文…

在Docker中设置Redis的密码

目录 1&#xff0c;介绍2&#xff0c;实现“Docker Redis设置密码”的整体流程3&#xff0c;具体实现步骤4&#xff0c;结论 1&#xff0c;介绍 Docker是一个开源的应用容器引擎&#xff0c;可以自动化部署、扩展应用程序。它可以帮助开发人员将应用程序及其依赖项打包到一个可…

【入门Flink】- 08Flink时间语义和窗口概念

Flink-Windows 是将无限数据切割成有限的“数据块”进行处理&#xff0c;这就是所谓的“窗口”&#xff08;Window&#xff09;。 注意&#xff1a;Flink 中窗口并不是静态准备好的&#xff0c;而是动态创建——当有落在这个窗口区间范围的数据达到时&#xff0c;才创建对应的窗…

UE5、CesiumForUnreal实现加载GeoJson绘制单面(Polygon)功能(StaticMesh方式)

文章目录 1.实现目标2.实现过程2.1 实现原理2.1.1 数据读取2.1.2 三角剖分2.1.3 创建StaticMesh2.2 应用测试2.2.1 具体代码2.2.2 蓝图应用测试3.参考资料1.实现目标 通过读取本地GeoJson数据,在UE中以StaticMeshComponent的形式绘制出面数据,支持Editor和Runtime环境,GIF动…

JMeter---JSON提取器

&#x1f4e2;专注于分享软件测试干货内容&#xff0c;欢迎点赞 &#x1f44d; 收藏 ⭐留言 &#x1f4dd; 如有错误敬请指正&#xff01;&#x1f4e2;交流讨论&#xff1a;加入1000人软件测试技术学习交流群&#x1f4e2;资源分享&#xff1a;进了字节跳动之后&#xff0c;才…

传统企业数字化转型都要面临哪些挑战?_数据治理平台_光点科技

数字化转型已经成为传统企业发展的必经之路&#xff0c;但在这个过程中&#xff0c;企业往往会遭遇多方面的挑战。 1.文化和组织惯性 最大的挑战之一是企业文化和组织惯性的阻力。传统企业往往有着深厚的历史和根深蒂固的工作方式&#xff0c;员工和管理层可能对新的数字化工作…

中国电信终端产业联盟5G Inside行业子联盟正式成立!宏电股份作为副理事单位受邀加入

11月9日&#xff0c;中国电信于广州召开“2023中国电信终端生态合作暨中国电信终端产业联盟&#xff08;以下简称CTTA&#xff09;第十四次会员大会”&#xff0c;联盟成员齐聚现场。作为CTTA大会的一个重要环节&#xff0c;中国电信终端产业联盟5G Inside行业子联盟正式成立&a…

爱剪辑如何将视频旋转90度,详细操作流程

爱剪辑是一款电脑端常用的视频剪辑类软件&#xff0c;基本上囊括了视频剪辑所需的所有功能&#xff0c;此处主要介绍&#xff0c;爱剪辑是如何对视频进行旋转操作的&#xff0c;水平旋转或者垂直旋转爱剪辑都是可以操作的&#xff0c;整体操作的详细过程将在下方为大家讲解。 …

希亦ACE和石头m1这两款内衣洗衣机哪一款更好?高性价比内衣洗衣机测评

内衣洗衣机可以说是近两年很火爆的小家电了&#xff0c;给大家带了一种全新的时尚体验&#xff0c;越来越内衣裤也可以用手洗&#xff01;而且还比手洗得干净&#xff01;不过现在市面上关于内衣洗衣机的品牌越来越多&#xff0c;小伙伴们想要挑选一款性价比高的内衣洗衣机看得…

LabVIEW在OPC中使用基金会现场总线

LabVIEW在OPC中使用基金会现场总线 本文讨论了如何使用开放的OPC&#xff08;用于过程控制的OLE&#xff09;接口访问基金会现场总线网络和设备。 NI-FBUS通信管理器随附了一个OPC数据访问服务器。 &#xff08;NI-FBUS Configurator自动包含NI-FBUS通信管理器。&#xff09…

【面试经典150 | 位运算】二进制求和

文章目录 Tag题目来源题目解读解题思路方法一&#xff1a;模拟 其他语言c 写在最后 Tag 【二进制】【位运算】 题目来源 67. 二进制求和 题目解读 以二进制字符串的形式返回两个二进制字符串的和。 解题思路 看到这个题目首先想到的方法可能是先把二进制字符转化成 int 型数…

[LeetCode]-225. 用队列实现栈

目录 225. 用队列实现栈 题目 ​思路 代码 225. 用队列实现栈 225. 用队列实现栈 - 力扣&#xff08;LeetCode&#xff09;https://leetcode.cn/problems/implement-stack-using-queues/description/ 题目 请你仅使用两个队列实现一个后入先出&#xff08;LIFO&#xff0…

基于SSM的网络音乐系统的设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;Vue 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#xff1a;是 目录…

1. 深度学习——激活函数

机器学习面试题汇总与解析——激活函数 本章讲解知识点 什么是激活函数&#xff1f; 为什么要使用激活函数&#xff1f; 详细讲解激活函数 本专栏适合于Python已经入门的学生或人士&#xff0c;有一定的编程基础。本专栏适合于算法工程师、机器学习、图像处理求职的学生或人…

统一消息分发中心设计

背景 我们核心业务中订单完成时&#xff0c;需要完成后续的连带业务&#xff0c;扣件库存库存、增加积分、通知商家等。 如下图的架构&#xff1a; 这样设计出来导致我们的核心业务和其他业务耦合&#xff0c;每次新增连带业务或者去掉连带业务都需要修改核心业务。 一方面&…

javascript用localStorage存储用户搜索词记录,并在搜索框下展显搜索词记录

//首先是storage的一封装 //storage.js文件 function storage(){//设置storage密钥this.ms"mystorage";}//以下为函数的原型方法//获得localStorage值storage.prototype.getLocalfunction(key){//先检查设置的localStorage的密钥var mydatalocalStorage.getItem(thi…

vue项目如何快速上手echarts

1.安装echarts npm i echarts 2.导入echarts 说明&#xff1a;任意组件页面都可以导入echarts。如果在main.js里面导入&#xff0c;那么只需要导入一次就可以应用于任意页面。 import * as echarts from "echarts" 3.创建容器 说明&#xff1a;它具有一个指定的id属…

Selenium+Python自动化测试环境搭建

selenium python 自动化测试 —— 环境搭建 关于 selenium Selenium 是一个用于Web应用程序测试的工具。Selenium测试直接运行在浏览器中&#xff0c;就像真正的用户在操作一样。支持的浏览器包括IE(7、8、9)、Mozilla Firefox、Mozilla Suite等。 Selenium 框架底层使用JavaS…

Leetcode—637.二叉树的层平均值【简单】

2023每日刷题&#xff08;二十五&#xff09; Leetcode—637.二叉树的层平均值 BFS实现代码 /*** Definition for a binary tree node.* struct TreeNode {* int val;* struct TreeNode *left;* struct TreeNode *right;* };*/ /*** Note: The returned array mu…

Docker+K8s基础(重要知识点总结)

目录 一、Docker的核心1&#xff0c;Docker引擎2&#xff0c;Docker基础命令3&#xff0c;单个容器运行多个服务进程4&#xff0c;多个容器运行多个服务进程5&#xff0c;备份在容器中运行的数据库6&#xff0c;在宿主机和容器之间共享数据7&#xff0c;在容器之间共享数据8&am…