使用HiveMQ实现Android MQTT

MQTT官网:https://mqtt.org/

百度Android MQTT,或者B站上搜索,发现大多使用https://github.com/eclipse/paho.mqtt.android,这是Eclipse的一个Android MQTT客户端实现库,但是我发现这个库在运行到高版本的手机上时报错了,这个库也是N年没有更新的了,而且这个库不支持MQTT5.0的,所以我找了新的库。

在查看MQTT官网的时候,发现关于MQTT的很多介绍是链接到了HiveMQ上面的,不知道它们是什么关系,我发现HiveMQ即有提供MQTT的服务器端,也有提供客户端,而且官方都给他跳转了,那我就用它的库来实现吧!使用了之后才发现,这个库是真的好用啊,封装的非常好,代码写起来特别简洁,响应式编程,支持异步,可以使用Java自带的,也可以使用RxJava或Reactor,HiveMQ的断线自动重连做的也比较好。库地址:https://github.com/hivemq/hivemq-mqtt-client,这个库好像没有限定Android,所以在普通的Java项目中也是可以使用的。android示例代码如下:

  1. 添加依赖

    implementation("com.hivemq:hivemq-mqtt-client:1.3.0")
    
  2. 权限声明

    <uses-permission android:name="android.permission.INTERNET"/>
    

    可以看到,相比paho.mqtt.android,HiveMQ的只需要声明一个互联网权限。

  3. 界面UI
    在这里插入图片描述

  4. MQTT实现代码:Mqtt.kt

    object Mqtt {
    
        private const val clientId = "9527"
        private const val host = "192.168.1.188"
        private const val port = 1883
        private const val topic = "message/topic"
    
        private var client: Mqtt3AsyncClient? = null
    
        private fun createMqttClient(): Mqtt3AsyncClient =
            Mqtt3Client.builder()
                .identifier(clientId)
                .serverHost(host)
                .serverPort(port)
    
                // 认证设置
                /*.simpleAuth()
                .username("admin")
                .password("password".toByteArray())
                .applySimpleAuth()*/
    
                // 重连设置
                .automaticReconnect()
                .initialDelay(1, TimeUnit.SECONDS) // 断线1秒后开始自动重连,如果重连还失败,则下次会等时间会按指数增长,比如2秒、4秒、8秒,双倍增长等待时间,但是不会超过最大值,由maxDelay函数来指定最大值。
                .maxDelay(32, TimeUnit.SECONDS)    // 断线后最多32秒就会自动重连,第5次连会来到32的位置,前面4次已用掉31秒的等待时间了。
                .applyAutomaticReconnect()
    
                // 连接状态监听器设置
                .addConnectedListener {
                    println("MQTT${it.clientConfig.serverHost}:${it.clientConfig.serverPort}连接成功")
                }
                .addDisconnectedListener {
                    // 客户端断开连接,或者连接失败都会回调这里
                    println("MQTT${it.clientConfig.serverHost}:${it.clientConfig.serverPort}连接断开:${it.cause.message},连接状态:${it.clientConfig.state.name}")
                    /*when (it.clientConfig.state) {
                        MqttClientState.CONNECTING -> println("手动连接失败")             // 即主动调用connect时没连接成功
                        MqttClientState.CONNECTING_RECONNECT -> println("自动重连失败")   // 即连接成功后异常断开自动重连时连接失败
                        MqttClientState.CONNECTED -> println("连接正常断开或异常断开")
                        else -> println("连接断开:${it.clientConfig.state.name}")
                    }*/
                }
                .buildAsync()
    
                // 消息监听器设置
                .also {
                    // 接收订阅的消息。publishes必须在subscribe之前调用以确保消息不会丢失,可以在connect之前调用它以便接收前一个会话的消息。
                    it.publishes(MqttGlobalPublishFilter.ALL) { publish: Mqtt3Publish ->
                        println("收到${publish.topic}的消息:${String(publish.payloadAsBytes)}")
                    }
                }
    
        fun connect() {
            disconnect()
            // 断开连接后的client没法再复用,复用的client重新再连接时会收不到离线时的消息。所以每次连接时创建一个新的client。
            val client = createMqttClient().also { this.client = it }
            client.connectWith()
                .cleanSession(false) // false为持久会话,这样离线再上线时还能收到离线时别人推送的消息。
                .keepAlive(60)  // 心跳时间间隔,单位为秒
                .send()
                .whenComplete { ack, e ->
                    if (ack != null) {
                        // 连接成功之后订阅主题
                        println("手动连接成功:$ack")
                        client.subscribeWith().topicFilter(topic).qos(MqttQos.EXACTLY_ONCE).send()
                    } else if (e != null) {
                        println("手动连接失败: ${e.message}")
                    }
                }
        }
    
        fun disconnect() {
            client?.let {
                it.disconnect().thenAccept { println("手动断开了连接") }
                client = null
            }
        }
    
        fun subscribe(topic: String = this.topic, qos: MqttQos = MqttQos.EXACTLY_ONCE) {
            val client = this.client ?: return
            client.subscribeWith().topicFilter(topic).qos(MqttQos.EXACTLY_ONCE).send()
        }
    
        fun unsubscribe(topic: String = this.topic) {
            val client = this.client ?: return
            client.unsubscribeWith().topicFilter(topic).send()
        }
    
        fun publish(message: String, topic: String = this.topic,  qos: MqttQos = MqttQos.EXACTLY_ONCE, retain: Boolean = false) {
            val client = this.client ?: return
            client
                .publishWith()
                .topic(topic)
                .qos(qos)
                .retain(retain)
                .payload(message.toByteArray())
                .send()
        }
    
        fun clearRetainMessage(topic: String = this.topic) {
            val client = this.client ?: return
            // 发送一条空的retain消息即可清除retain消息
            client.publishWith().topic(topic).retain(true).send()
        }
    
    }
    
  5. Activity调用Mqtt相关功能:MainActivity.kt

    class MainActivity : AppCompatActivity() {
    
        private val testTopic = "topic/hello"
    
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
        }
    
        fun connect(view: View) = Mqtt.connect()
        fun disconnect(view: View) = Mqtt.disconnect()
        fun subscribe(view: View) = Mqtt.subscribe(testTopic)
        fun unsubscribe(view: View) = Mqtt.unsubscribe(testTopic)
        fun publish(view: View) = Mqtt.publish("How are you?", testTopic)
        fun publishRetain(view: View) = Mqtt.publish("I'm a retain message!", testTopic, retain = true)
        fun clearRetain(view: View) = Mqtt.clearRetainMessage(testTopic)
    
    }
    

    示例完整代码:https://gitee.com/daizhufei/HelloMQTT

    这里MQTT的服务器端我使用的是ActivityMQ,它目前的ActiveMQ Classic最新版本是6.0.1,只支持MQTT3.13.1.1ActiveMQ Artemis的最新版本是2.32.0,支持支持MQTT3.1MQ3.1.1MQTT5.0,官方说明如下:

    • https://activemq.apache.org/components/classic/documentation/mqtt
    • https://activemq.apache.org/components/artemis/documentation/latest/mqtt.html

    随着时间的推移,它支持的版本可能会发生变化。这里截图如下:
    在这里插入图片描述
    在这里插入图片描述

MQTT其他知识总结:

  • 关于MQTT的基础知识,可查看:https://www.hivemq.com/blog/mqtt-essentials-part-10-alive-client-take-over/
  • clientId , 连接的时候用到这个参数,(也叫identifier)应该唯一,可以使用账号做为clientId,比如我们公司的需求是先使用账号密码登录一个Web接口,然后再连接MQTT,但是这个账号是可以在多台设备上同时登录的,所以使用直接使用账号的话会出现重复,可以为每台设备生成一个唯一标识然后持久化存储,然后clientId就可以使用账号和唯一标识组合一起使用。比如:username + uuid,而uuid是持久化保存的。
  • cleanSession , 连接的时候用到这个参数,持久会话设置,如果设置为false,则是持久的,true为非持久,持久的意思是当设备离线后,如果有消息推过来,上线时还能收到。如果是非持久的则收不到离线时的消息。
  • keepAlive,连接的时候用到这个参数,用于设置发送心跳的时间间隔,在客户端和服务器没有交流时,在指定的keepAlive时间到达后会发送心跳给服务器,以证明客户端还活着(也就是说确保连接是正常的)。MQTT是使用TCP的,虽然理论上TCP/IP会在套接字中断时通知您,但在实践中,特别是在移动和卫星链路等情况下,它们经常在空中“伪造”TCP并在每一端放回报头,TCP会话很可能会“黑洞”,即它看起来仍然打开,但只是将您写入的任何内容倾倒到地板上,内容来自:https://www.hivemq.com/blog/mqtt-essentials-part-10-alive-client-take-over/
  • qos 订阅主题和推送消息的时候用到这个参数,有0、1、2,2是最好的质量,现在的手机网络已经比以前好太多了,所以我就选最好的质量了,质量越好需要的流量越多,但是现在的流量已经不像从前那样贵了,所以无所谓。
  • retain 推送消息的时候用到这个参数,如果设置为true,则推送的这条消息会在客户端每次连接时都接收到,比如连接了收到了,然后断开连接,然后再连接上,还是能收到这条消息。如果需要清除这样的消息,则往这个主题再发送一条空的retain为true的消息即可,这样客户端在连接成功时就不会再收到这条消息了。
  • lastWill 简称遗嘱,遗嘱是一条普通的消息,设置遗嘱后,当客户端异常断开时,服务器会给遗嘱指定的主题推送这条消息。一开始我在想,客户端都断开了,怎么推送消息的啊,这是在客户端上线的时候把遗嘱先传给服务器了,所以客户端断线,则服务器来推送这条消息。如果是正常的断开连接,则服务器不会推送遗嘱消息。
  • MQTT测试客户端:MQTTX,目前感觉这个测试的客户端是非常好用的,而且这个网站上也有很多关于MQTT的教程。需要注意的是,新建连接是默认选的MQTT5.0协议,需要选择服务器支持的协议。
  • MQTT入门:https://www.hivemq.com/mqtt/

关于ActiveMQ服务器:

  • 服务器默认是启用了MQTT的,也就是说安装好ActiveMQ之后不需要配置就可以使用客户端来进行MQTT的连接了
  • 默认没有启用MQTT认证,也就是MQTT客户端连接时不需要提供用户名和密码就能连接
  • 默认的MQTT端口是1883
  • 目前的最新版本 ActiveMQ Classic 6.0.1支持 MQTT v3.1v3.1.1
  • 目前的最新版本 ActiveMQ Artemis 2.32.0支持 MQTT v3.1v3.1.1v5.0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/405683.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Oracle迁移到mysql-表结构的坑

1.mysql中id自增字段必须是整数类型 id BIGINT AUTO_INCREMENT not null, 2.VARCHAR2改为VARCHAR 3.NUMBER(16)改为decimal(16,0) 4.date改为datetime 5.mysql范围分区必须int格式&#xff0c;不能list类型 ERROR 1697 (HY000): VALUES value for partition …

【leetcode热题】填充每个节点的下一个右侧节点指针

给定一个 完美二叉树 &#xff0c;其所有叶子节点都在同一层&#xff0c;每个父节点都有两个子节点。二叉树定义如下&#xff1a; struct Node {int val;Node *left;Node *right;Node *next; } 填充它的每个 next 指针&#xff0c;让这个指针指向其下一个右侧节点。如果找不到…

Linux线程同步(2)死锁与互斥锁

死锁&#xff08;Deadlock&#xff09;是指两个或两个以上的进程&#xff08;或线程&#xff09;在执行过程中&#xff0c;由于竞争资源或者由于彼此通信而造成的一种阻塞的现象&#xff0c;若无外力作用&#xff0c;它们都将无法推进下去。此时称系统处于死锁状态或系统产生了…

【Linux进阶之路】Socket —— “UDP“ “TCP“

文章目录 一、再识网络1. 端口号2. 网络字节序列3.TCP 与 UDP 二、套接字1.sockaddr结构2.UDP1.server端1.1 构造函数1.2 Init1.3 Run 2.客户端1.Linux2.Windows 3.TCP1. 基本接口2. 客户端3. 服务端1.版本12.版本23.版本34.版本4 三、守护进程尾序 一、再识网络 1. 端口号 在…

RT-Thread 时钟 timer delay 相关

前言 此处,介绍对delay 时钟 timer 这几部分之间的关联和相关的知识点;本来只是想介绍一下 delay的,但是发现说到delay 不先 提到 先验知识 晶振\时钟\时钟节拍\定时器 好像没法解释透彻,所以就变成了 晶振\时钟\时钟节拍\定时器\delay 的很简单的概括一遍;并附带上能直接运行的…

【数据结构】链式队列

链式队列实现&#xff1a; 1.创建一个空队列 2.尾插法入队 3.头删法出队 4.遍历队列 一、main函数 #include <stdio.h> #include "./3.linkqueue.h" int main(int…

备考2025年AMC8数学竞赛:2000-2024年AMC8真题练一练

我们今天来随机看五道AMC8的真题和解析&#xff0c;对于想了解或者加AMC8美国数学竞赛的孩子来说&#xff0c;吃透AMC8历年真题是备考最科学、最有效的方法之一。 为帮助孩子们更高效地备考&#xff0c;我整理了2000-2004年的全部AMC8真题&#xff0c;并且独家制作了多种在线练…

Rust通用代码生成器莲花发布红莲尝鲜版二十一发布介绍视频,前端代码生成物大翻新

Rust通用代码生成器莲花发布红莲尝鲜版二十一发布介绍视频&#xff0c;前端代码生成物大翻新 Rust通用代码生成器发布了红莲尝鲜版二十一的最新介绍视频&#xff0c;前端代码生成物大翻新。视频请见&#xff1a; Rust通用代码生成器&#xff1a;莲花&#xff0c;红莲尝鲜版二…

构建生物医学知识图谱from zero to hero (3):生物医学命名实体识别和链接

生物医学实体链接 🤓现在是激动人心的部分。对于NLP和命名实体识别和链接的新手,让我们从一些基础知识开始。命名实体识别技术用于检测文本中的相关实体或概念。例如,在生物医学领域,我们希望在文本中识别各种基因、药物、疾病和其他概念。 生物医学概念提取 在这个例子中…

爬虫知识--03

数据存mysql import requests from bs4 import BeautifulSoup import pymysql# 链接数据库pymysql conn pymysql.connect(userroot,password"JIAJIA",host127.0.0.1,databasecnblogs,port3306, ) cursor conn.cursor() cursor conn.cursor()# 爬数据 res request…

Linux之ACL访问控制列表

一、ACL权限的介绍 1.1 什么是ACL 访问控制列表&#xff08;ACL&#xff09;是一种网络安全技术&#xff0c;它通过在网络设备&#xff08;如路由器、交换机和防火墙&#xff09;上定义一系列规则&#xff0c;对进出接口的数据包进行控制。这些规则可以包含“允许”&…

计算机网络面经_体系结构一文说清

编辑&#xff1a;平平无奇的羊 目录 基础 1. 计算机网络结构体系 三种模型之间的区别&#xff1a; 如何背诵&#xff1a; 进阶 OSI七层模型&#xff1a; TCP/IP四层模型&#xff1a; TCP/IP五层模型 总结 字节实习生为大家带来的是计算机网络面经系列博文&#xff0c;由浅…

线性代数:向量、张量、矩阵和标量

线性代数&#xff1a;向量、张量、矩阵和标量 背景 在线性代数中&#xff0c;向量、张量、矩阵和标量都属于基础概念&#xff0c;特别是最近AI的爆火&#xff0c;向量和张量的概念也越来越普及&#xff0c;本文将介绍下这些基本概念。 1. 标量&#xff08;Scalar&#xff0…

【Java网络编程06】HTTPS原理

1. HTTPS基本概念 HTTPS&#xff1a;HTTPS也是一个应用层协议&#xff0c;它在HTTP协议的基础上引入了一个加密层——SSL协议&#xff0c;区别就在于HTTP协议是基于明文传输的&#xff08;不安全&#xff09;&#xff0c;使用HTTPS加密就能在一定程度上防止数据在传输过程中被…

c# 类的介绍及延伸

类介绍 类的定义是以关键字 class 开始&#xff0c;后跟类的名称。 类属于引用类型&#xff0c;只能通过new方式创建。 如果类定义中没有指定基类&#xff0c;那其基类为system.object // <访问修饰符> class class类名 <access specifier> class class_name { //…

华为配置WDS手拉手业务示例

配置WDS手拉手业务示例 组网图形 图1 配置WDS手拉手业务示例组网图 业务需求组网需求数据规划配置思路配置注意事项操作步骤配置文件 业务需求 企业用户通过WLAN接入网络&#xff0c;以满足移动办公的最基本需求。但企业考虑到AP通过有线部署的成本较高&#xff0c;所以通过建立…

golang 监听ip数据包(golang纯享版)

golang 监听ip数据包(golang纯享版) 【注】本机编译运行平台为linux&#xff0c;如需测试代码请移至linux平台进行代码测试 本文以ip4 作为案例进行包抓取示范&#xff0c;ip6抓取与ip4方式异曲同工&#xff0c;可自行举一反三得出 第一步&#xff0c;通过wireshark抓包拿到…

第四十二回 假李逵翦径劫单身 黑旋风沂岭杀四虎-python读写csv和json数据

李逵答应了宋江三件事&#xff1a;不可吃酒&#xff0c;独自前行&#xff0c;不带板斧。李逵痛快答应了&#xff0c;挎一口腰刀&#xff0c;提着朴刀&#xff0c;带了一锭大银子&#xff0c;三五个小银子就下山去了。 宋江放心不下&#xff0c;于是请同乡朱贵也回家一趟&#…

spring boot3登录开发-3(账密登录逻辑实现)

⛰️个人主页: 蒾酒 &#x1f525;系列专栏&#xff1a;《spring boot实战》 &#x1f30a;山高路远&#xff0c;行路漫漫&#xff0c;终有归途。 目录 前置条件 内容简介 用户登录逻辑实现 创建交互对象 1.创建用户登录DTO 2.创建用户登录VO 创建自定义登录业务异…

Vue模板引用之ref特殊属性

1. 使用实例 <template><input ref"input" name"我是input的name" /><br /><ul><li v-for"arr in array" :key"arr" id"111" ref"itemRefs">{{arr}}</li></ul> </…