Android 消息队列之MQTT的使用:物联网通讯,HTTP太重了,使用MQTT;断网重连、注册、订阅、发送数据和接受数据,实现双向通讯。

目录:

  1. 问题
  2. MQTT是什么以及为什么使用
  3. 如何使用:第一阶段、基础功能
  4. 如何使用:第二阶段、增加断网重连
  5. 如何使用:第三阶段、封装

在这里插入图片描述


一、问题

在开发的时候,我们一般都使用Http和后台进行通讯,比如我们是开发物联网的,设备会有很多数据需要频繁发给后台,使用Http来做这件事情,就感觉很重,比如会遇到如下这些问题:

  1. 开发成本:需要后台创建接口,前台去请求。
  2. 连接数过多:在HTTP协议中,每次请求都需要建立一个新的连接,这可能导致连接数过多,特别是在高并发场景下。对于自动售卖机来说,如果同时有大量的用户进行交互,可能会导致服务器资源紧张,影响性能。
  3. 开销较大:HTTP协议的消息头部相对复杂,包含了大量的元数据,这增加了网络传输的开销。
  4. 实时性较差:HTTP协议是基于请求-响应模式的,需要客户端主动发起请求才能获取数据。这导致在实时性要求较高的场景下,HTTP可能无法满足需求。也就是服务器不能主动发数据给客户端。

基于这样的背景,本来想使用Rabbit MQ,但是不能双向通讯,我们选择切换成MQTT。


二、MQTT是什么以及为什么使用

MQTT(Message Queuing Telemetry Transport)是一个轻量级的发布/订阅消息协议,它构建于TCP/IP协议之上,为小型设备提供了稳定的网络通讯。MQTT协议设计简单,易于实现,非常适合在物联网(IoT)和移动应用中使用。

你会发现传递的数据量是根据你的内容来决定。

能干吗:

1、实时通讯:MQTT支持异步通讯模式,客户端可以通过订阅主题来接收感兴趣的消息,而不需要主动请求。这使得MQTT非常适合实时通讯和事件驱动的应用场景。
2、低开销:MQTT协议的数据包开销非常小,消息头部仅需2字节,非常适合网络带宽受限或设备资源受限的环境。
3、高可靠性:MQTT支持三种不同的服务质量(QoS)级别,可以根据实际需求选择合适的级别来确保消息的可靠传输。同时,MQTT还具有自动重连机制,能够在网络断开时自动恢复连接。
4、减少连接数:与HTTP相比,MQTT协议只需要客户端与服务器(Broker)建立一次连接就可以进行多次消息发布和订阅,大大减少了网络连接次数和数据传输量。


三、如何使用:第一阶段、基础功能

  1. 如何连接:init方法
  2. 连接后如何订阅:subscribe方法
  3. 如何发送数据,如何接受数据:subscribe方法
/**
 * 测试环境的设备管理系统
 */
class ManageMqtt {

    private var TAG = "MQTT"
    private var client: MqttAndroidClient? = null //mqtt客户端
    private lateinit var options: MqttConnectOptions  //mqtt 的链接信息设置

    @Volatile
    var isMqConnected: Boolean = false


    //初始化,
    fun init(context: Context?) {
        try {
            log("1")

            if (client != null) {
                return
            }
            log("1")
            //MQTT的连接设置
            options = MqttConnectOptions()
            //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
            options.isCleanSession = true
            //重连尝试
            options.isAutomaticReconnect = true
            // 设置超时时间 单位为秒
            options.connectionTimeout = 10
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.keepAliveInterval = 90

            client = MqttAndroidClient(context, "tcp://xxx:xxxx", "名称")//名称
            //设置连接的用户名
            options.userName = "xxx"
            //设置连接的密码
            options.password = "xxx".toCharArray()


            //设置回调
            client?.setCallback(object : MqttCallbackExtended {
                override fun connectComplete(reconnect: Boolean, serverURI: String) {
                    log("已连接mq")
                    isMqConnected = true
                    //连接成功,我们要进行订阅
                    subscribe("xxxx")
                }

                override fun connectionLost(cause: Throwable) {
                    log("已断开mq")
                    isMqConnected = false
                }

                override fun deliveryComplete(token: IMqttDeliveryToken) {
                    //publish后会执行到这里  发布
                    try {
                        log("发送成功:" + token.message.toString())
                    } catch (e: Exception) {
                        e.printStackTrace()
                    }
                }

                override fun messageArrived(topicName: String, message: MqttMessage) {
                    //subscribe后得到的消息会执行到这里面  订阅
                    //topicName 为主题
                    try {
                        //todo 收到消息,要进行一些处理的。
                        log("收到消息:$topicName     $message")
                    } catch (e: Exception) {
                        log("异常:$e")
                    }
                }
            })
            connect()
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }

    //进行链接
    private fun connect() {
        Thread(connect).start()
//        Schedulers.io().scheduleDirect(connect)
    }

    private val connect = Runnable {
        if (client != null && client!!.isConnected) {
            return@Runnable
        }
        try {
            log("连接Mq............")
            client?.connect(options, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken) {
                    log("Connection success")
                    //todo 是否连接成功?要重连的。
                }

                override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
                    log("Connection failure")
                    //todo 是否连接成功?要重连的。

                }
            })
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }


    //订阅信息
    fun subscribe(topic: String, qos: Int = 1) {
        try {
            client?.subscribe(topic, qos, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "Subscribed to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to subscribe $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

    //发送消息
    fun publish(topic: String, msg: String, qos: Int = 1, retained: Boolean = false) {
        try {
            val message = MqttMessage()
            message.payload = msg.toByteArray()
            message.qos = qos
            message.isRetained = retained
            client?.publish(topic, message, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "$msg published to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to publish $msg to $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

    //释放资源
    fun closeMqtt() {
        try {
            if (client != null) {
                client!!.disconnect()
                client = null
            }
        } catch (e: java.lang.Exception) {
            e.printStackTrace()
        }
    }

    //打印log
    private fun log(msg: String) {
        Log.d(TAG, msg)
    }


}


四、如何使用:第二阶段、断网重连

  1. 即使短暂断网,后面自己也还是可以重连恢复。
  2. 如果第一次没有连接上,增加第一次的断网重连
/**
 * 测试环境的设备管理系统
 */
class ManageMqtt {

    private var context: Context? = null
    private var TAG = "MQTT"
    private var client: MqttAndroidClient? = null //mqtt客户端
    private lateinit var options: MqttConnectOptions  //mqtt 的链接信息设置

    @Volatile
    var isMqConnected: Boolean = false


    //初始化,
    fun init(context: Context?) {
        this.context = context
        try {
            log("1")

            if (client != null) {
                return
            }
            log("1")
            //MQTT的连接设置
            options = MqttConnectOptions()
            //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
            options.isCleanSession = true
            //重连尝试
            options.isAutomaticReconnect = true
            // 设置超时时间 单位为秒
            options.connectionTimeout = 10
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options.keepAliveInterval = 90

            client = MqttAndroidClient(context, "tcp://xxxx:xxxx", "")//名称
            //设置连接的用户名
            options.userName = "xxx"
            //设置连接的密码
            options.password = "xxx".toCharArray()


            //设置回调
            client?.setCallback(object : MqttCallbackExtended {
                override fun connectComplete(reconnect: Boolean, serverURI: String) {
                    log("已连接mq")
                    isMqConnected = true
                    //连接成功,我们要进行订阅
                    subscribe("xxxx")
                }

                override fun connectionLost(cause: Throwable) {
                    log("已断开mq")
                    isMqConnected = false
                }

                override fun deliveryComplete(token: IMqttDeliveryToken) {
                    //publish后会执行到这里  发布
                    try {
                        log("发送成功:" + token.message.toString())
                    } catch (e: Exception) {
                        e.printStackTrace()
                    }
                }

                override fun messageArrived(topicName: String, message: MqttMessage) {
                    //subscribe后得到的消息会执行到这里面  订阅
                    //topicName 为主题
                    try {
                        //todo 收到消息,要进行一些处理的。 Eventbus
                        log("收到消息:$topicName     $message")
                    } catch (e: Exception) {
                        log("异常:$e")
                    }
                }
            })
            connect()
        } catch (e: Exception) {
            e.printStackTrace()
        }
        val intentFilter = IntentFilter()
        intentFilter.addAction(ConnectivityManager.CONNECTIVITY_ACTION)
        intentFilter.addAction(WifiManager.NETWORK_STATE_CHANGED_ACTION)
        intentFilter.addAction(WifiManager.WIFI_STATE_CHANGED_ACTION)
        intentFilter.addAction(WifiManager.RSSI_CHANGED_ACTION)
        context?.registerReceiver(netWorkBroadCastReciver,intentFilter)
    }

    //进行链接
    private fun connect() {
        Thread(connect).start()
//        Schedulers.io().scheduleDirect(connect)
    }

    private val connect = Runnable {
        if (client != null && client!!.isConnected) {
            return@Runnable
        }
        try {
            log("连接Mq............")
            client?.connect(options, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken) {
                    log("Connection success")
                    //todo 是否连接成功?要重连的。
                }

                override fun onFailure(asyncActionToken: IMqttToken, exception: Throwable) {
                    log("Connection failure")
                    //todo 是否连接成功?要重连的。

                }
            })
        } catch (e: Exception) {
            e.printStackTrace()
        }
    }


    //订阅信息
    fun subscribe(topic: String, qos: Int = 1) {
        try {
            client?.subscribe(topic, qos, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "Subscribed to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to subscribe $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

    //发送消息
    /**
     * @param topic 主题 给这个主题发送消息
     *  @param qos 0最多一次不管是否收到,1最少一次可能会收到多次,2保证收到,且仅一次
     *  @param retained 发布后是否保留,即重新链接时会存在
     *  @param msg 消息
     */
    fun publish(topic: String, msg: String, qos: Int = 0, retained: Boolean = false) {
        try {
            val message = MqttMessage()
            message.payload = msg.toByteArray()
            message.qos = qos
            message.isRetained = retained //发布后是否保留,即重新链接时会存在
            client?.publish(topic, message, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.d(TAG, "$msg published to $topic")
                }

                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.d(TAG, "Failed to publish $msg to $topic")
                }
            })
        } catch (e: MqttException) {
            e.printStackTrace()
        }
    }

    //释放资源
    fun closeMqtt() {
        try {
            if (client != null) {
                client!!.disconnect()
                client = null
            }
        } catch (e: java.lang.Exception) {
            e.printStackTrace()
        }
        context?.unregisterReceiver(netWorkBroadCastReciver)
    }

    //打印log
    private fun log(msg: String) {
        Log.d(TAG, msg)
    }


    private var networkState = 100
    //断网重连查询
    fun isNetConnected(context: Context): Boolean {
        Log.d(TAG, "isNetConnected: ")
        val connectivity =
            context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
        if (connectivity != null) {
            val info = connectivity.activeNetworkInfo
            if (info != null) {
                if (info.type == networkState) {
                    return false
                }
                networkState = info.type
                if (info.type == (ConnectivityManager.TYPE_WIFI)) {
                    if (!isMqConnected) {

                        connect()
                    }
                    return true
                } else if (info.type == (ConnectivityManager.TYPE_MOBILE)) {
                    if (!isMqConnected) {
                        connect()
                    }
                    return true
                }
            }
        }

        return false
    }
    
    var netWorkBroadCastReciver = object :BroadcastReceiver() {
        override fun onReceive(context: Context, intent: Intent?) {
            isNetConnected(context)
            log( "NetWorkBroadCastReciver: ")
        }
    }


}

五、如何使用:第三阶段、封装

  1. 尝试封装,其实就是提供比如,注册,取消注册,订阅,发送数据,或者读取数据的方法。后面如何更换MQTT为其他协议,也很方便
/**
 * 对Mqtt操作的进一步封装
 */
@Singleton
class MqttHelper @Inject constructor() {

    @Inject
    lateinit var mqtt: ManageMqtt


    /**
     * 注册
     */
    fun register(context: Context?){
        mqtt.init(context)
    }

    /**
     * 发送数据
     */
    fun sendData(data :String){
        Heartbeat.deviceId?.let { mqtt.publish(it,Gson().toJson(data)) }
    }

    /**
     * 接收数据
     */
    fun data(kind:String,data:String){
        //待定,一般都是通过eventbus来解决。
    }
}

好了,这篇文章就介绍到这里~,我是前期后期,如果你也有相关的问题,也可以在评论区讨论哦,我们下一篇文章再见。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/929069.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

node.js基础学习-express框架-静态资源中间件express.static(十一)

前言 在 Node.js 应用中,静态资源是指那些不需要服务器动态处理,直接发送给客户端的文件。常见的静态资源包括 HTML 文件、CSS 样式表、JavaScript 脚本、图片(如 JPEG、PNG 等)、字体文件和音频、视频文件等。这些文件在服务器端…

全面解析 Transformer:改变深度学习格局的神经网络架构

目录 一、什么是 Transformer? 二、Transformer 的结构解析 1. 编码器(Encoder) 2. 解码器(Decoder) 3. Transformer 模型结构图 三、核心技术:注意力机制与多头注意力 1. 注意力机制 2. 多头注意力&…

LobeChat-46.6k星!顶级AI工具集,一键部署,界面美观易用,ApiSmart 是你肉身体验学习LLM 最好IDEA 工具

LobeChat LobeChat的开源,把AI功能集合到一起,真的太爽了。 我第一次发现LobeChat的时候,就是看到那炫酷的页面,这么强的前端真的是在秀肌肉啊! 看下它的官网,整个网站的动效简直闪瞎我! GitH…

AC+AP漫游实验

实验拓扑 实验要求 1.AP1服务vlan10,AP2服务vlan20,实现三层漫游 2.AP1与AP2为不同AP组,直接转发 实验步骤 1.配置VLAN放行相关流量 交换机与AP接口为trunk口并修改PVID为30 2.配置相关业务使得ap上线 3.配置vap上线,AP可用…

浅谈CI持续集成

1.什么是持续集成 持续集成(Continuous Integration)(CI)是一种软件开发实践,团队成员频繁地将他们的工作成果集成到一起(通常每人每天至少提交一次,这样每天就会有多次集成),并且在每次提交后…

JUnit介绍:单元测试

1、什么是单元测试 单元测试是针对最小的功能单元编写测试代码(Java 程序最小的功能单元是方法)单元测试就是针对单个Java方法的测试。 2、为什么要使用单元测试 确保单个方法运行正常; 如果修改了代码,只需要确保其对应的单元…

Active RIS-Aided ISAC Systems: Beamforming Design and Performance Analysis

文章目录 II. SYSTEM MODELC. Active RIS Model III. PROBLEM FORMULATIONA. Radar Performance MetricC. Optimize Φ V. PERFORMANCE ANALYSIS OF THE RADAR SINR IN ACTIVE RIS-AIDED SENSING SYSTEMSA. Simplified System SettingB. Power Scaling Law AnalysisC. Active R…

python之Django连接数据库

文章目录 连接Mysql数据库安装Mysql驱动配置数据库信息明确连接驱动定义模型在模型下的models.py中定义表对象在settings.py 中找到INSTALLED_APPS添加创建的模型 测试testdb.py中写增删改查操作urls.py添加请求路径启动项目进行测试 连接Mysql数据库 安装Mysql驱动 pip inst…

网页端五子棋对战(四)---玩家匹配实现上线下线处理

文章目录 1.游戏大厅用户匹配1.1请求和响应1.2设计匹配页面1.3获取玩家信息1.4玩家信息的样式设置1.5初始化我们的websocket1.6点击按钮和客户端交互1.7点击按钮和服务器端交互 2.服务器端实现匹配功能框架2.1方法重写2.2借用session 3.处理上线下线3.1什么是上线下线3.2实现用…

matlab finv()函数解释 F分布 和 逆累积分布函数 卡方分布

1.Earths flattening 翻译并解释含义 "Earths flattening" 翻译为中文是“地球的扁率”。 含义解释: 地球的扁率是指地球形状偏离完美球形的程度。地球并非一个完美的球体,而是一个扁球体,即在两极略微扁平,赤道略微…

qt QSettings详解

1、概述 QSettings是Qt框架中用于应用程序配置和持久化数据的一个类。它提供了一种便捷的方式来存储和读取应用程序的设置,如窗口大小、位置、用户偏好等。QSettings支持多种存储格式,包括INI文件、Windows注册表(仅限Windows平台&#xff0…

Web 毕设篇-适合小白、初级入门练手的 Spring Boot Web 毕业设计项目:智行无忧停车场管理系统(前后端源码 + 数据库 sql 脚本)

🔥博客主页: 【小扳_-CSDN博客】 ❤感谢大家点赞👍收藏⭐评论✍ 文章目录 1.0 项目介绍 1.1 项目功能 2.0 用户登录功能 3.0 首页界面 4.0 车辆信息管理功能 5.0 停车位管理功能 6.0 入场登记管理功能 7.0 预约管理功能 8.0 收费规则功能 9.0…

openssl使用哈希算法生成随机密钥

文章目录 一、openssl中随机数函数**OpenSSL 随机数函数概览**1. **核心随机数函数** **常用函数详解**1. RAND_bytes2. RAND_priv_bytes3. RAND_seed 和 RAND_add4. RAND_status **随机数生成器的熵池****常见用例****注意事项** 二、使用哈希算法生成随机的密钥 一、openssl中…

【PlantUML系列】序列图(二)

目录 一、参与者 二、消息交互顺序 三、其他技巧 3.1 改变参与者的顺序 3.2 使用 as 重命名参与者 3.3 注释 3.4 页眉和页脚 一、参与者 使用 participant、actor、boundary、control、entity 和 database 等关键字来定义不同类型的参与者。例如: Actor&…

从excel数据导入到sqlsever遇到的问题

1、格式问题时间格式,excel中将日期列改为日期未生效,改完后,必须手动单击这个单元格才能生效,那不可能一个一个去双击。解决方案如下 2、导入之后表字段格式问题,数据类型的用navicat导入之后默认是nvarchar类型的&a…

On-Chip-Network之router微架构的物理实现

Low-Power Microarchitecture 自20世纪90年代以来,功耗一直是嵌入式芯片和高性能芯片面临的一个挑战。自2000年代中期以来,它已经成为大多数设计的主要约束。多核解决了功耗问题,由此产生的communication substrate,namely the on…

路径规划之启发式算法之四:蚁群算法(Ant Colony Optimization,ACO)

蚁群算法(Ant Colony Optimization,ACO)是一种模拟蚂蚁觅食行为的启发式搜索算法,由Marco Dorigo于1992年在他的博士论文中提出。该算法适用于解决组合优化问题,如旅行商问题(TSP)、车辆路径问题…

LabVIEW密码保护与反编译的安全性分析

在LabVIEW中,密码保护是一种常见的源代码保护手段,但其安全性并不高,尤其是在面对专业反编译工具时。理论上,所有软件的反编译都是可能的,尽管反编译不一定恢复完全的源代码,但足以提取程序的核心功能和算法…

RabbitMQ消息可靠性保证机制6--可靠性分析

在使用消息中间件的过程中,难免会出现消息错误或者消息丢失等异常情况。这个时候就需要有一个良好的机制来跟踪记录消息的过程(轨迹溯源),帮助我们排查问题。 在RabbitMQ中可以使用Firehose实现消息的跟踪,Firehose可…

工业—使用Flink处理Kafka中的数据_ProduceRecord1

1 、 使用 Flink 消费 Kafka 中 ProduceRecord 主题的数据,统计在已经检验的产品中,各设备每 5 分钟 生产产品总数,将结果存入Redis 中, key 值为