如何在 Rust 中通过 Rumqttc 实现 MQTT 通信

Rust 简介

Rust 是一门系统级编程语言,以其卓越的性能、并发能力以及内存安全特性著称。Rust 由 Mozilla 推出,目标是在现代软件开发中提供一种安全高效的编程语言。其设计旨在提供安全、并发和高效的编程体验,同时保持开发效率和代码质量不受影响。

Rust 的核心特性包括:

  • 内存安全:Rust 通过所有权系统和借用检查器确保内存安全。所有权系统在编译时追踪每个值的所有权,并负责管理内存释放。借用检查器防止空指针引用和数据竞争等常见的内存问题。
  • 并发性:Rust 提供了一组轻量级的并发工具,让开发人员能够轻松编写安全的并发程序。通过 std::thread 模块,可以方便创建和管理线程,而 std::sync 模块则提供了如互斥锁、信号量和通道等同步原语,保证线程之间安全的数据共享和通信。
  • 高性能:Rust 强调零成本抽象和极低的运行时开销。它支持内联汇编、无锁编程和异步编程等高级功能,帮助开发者编写高性能的系统应用和网络服务。

总的来说,Rust 是一门功能强大、安全可靠、高性能的编程语言,适用于广泛的应用场景,从嵌入式开发到大规模分布式系统,甚至网络服务等领域。随着其生态系统的不断完善和活跃的社区支持,Rust 正逐渐成为开发人员的热门选择。

选择基于 Rust 的 MQTT 库

在 Rust 生态系统中,有几种常见的 MQTT 库,其中最受欢迎的是 rumqtt 和 paho-mqtt。

paho-mqtt

paho-mqtt 是 Eclipse Paho 项目的一部分,它是一个跨平台的 MQTT 客户端库,支持包括 Rust 在内的多种编程语言。paho-mqtt 支持 MQTT v3.1 和 v5.0 协议,以稳定和成熟著称。

特点:paho-mqtt 在众多项目中得到了广泛应用,并拥有活跃的社区支持。它提供了同步和异步 API,适用于多种应用场景。

rumqtt

rumqtt 是一个用 Rust 编写的开源库,旨在实现 MQTT 协议,具有简单、健壮和高性能的特点。该项目包括两个主要组件:rumqttc 和 rumqttd。

  • rumqttc

    rumqttc 是一个纯 Rust 实现的 MQTT 客户端,设计目标是稳健、高效且易于使用。它基于异步(使用 tokio)的事件循环,使开发者能够方便地发送和接收 MQTT 消息,与 MQTT Broker 进行通信。

  • rumqttd

    rumqttd 是一个高性能的 Rust 实现的 MQTT Broker,它的设计轻量且可嵌入,可以将其作为库集成到代码中,甚至扩展其功能。

特点:rumqtt 采用现代设计,提供符合 Rust 异步编程模型的异步 API。其轻量级和高性能的设计使其即使在资源有限的环境中也能表现出色。此外,rumqtt 的 API 设计简洁明了,遵循 Rust 的语言风格,易于使用和理解。

选择 rumqtt 的理由

  • 现代设计
  • 轻量级且高性能
  • 简洁的 API
  • 活跃的社区支持
  • 灵活的配置选项

在本文中,我们将使用 rumqttc 进行示例演示。

在 Rust 中使用 MQTT 的示例程序

以下示例将演示如何使用 rumqttc 库创建一个 MQTT 客户端,并实现消息的发布和订阅。通过这些示例,您将学习如何初始化客户端、设置选项、连接到 MQTT 服务器,以及发布/订阅消息。

准备工作

本示例使用 EMQX 提供的免费公共 MQTT 服务器进行测试,连接信息如下:

Broker:broker.emqx.io
TCP 端口:1883
Websocket 端口:8083
  1. 创建一个新的 Rust 项目:

    $ cargo new mqtt-rust-example
         Created binary (application) `mqtt-rust-example` package
    
  2. 修改 Cargo.toml 文件,添加所需的依赖项:

    [dependencies]
    rumqttc = "0.24.0"
    pretty_env_logger = "0.4"
    tokio = { version = "1", features = ["full"] }
    

同步订阅和发布 MQTT 消息

下面的内容展示了如何实现同步订阅和发布 MQTT 消息。

  1. 修改 Cargo.toml:

    [[bin]]
    name = "syncpubsub"
    path = "src/syncpubsub.rs"
    
  2. 在项目的 src 目录下创建 syncpubsub.rs 文件,并添加以下代码:

    use rumqttc::{Client, LastWill, MqttOptions, QoS};
    use std::thread;
    use std::time::Duration;
    
    /*
     * 这是程序的主函数。
     * 在该函数中,将初始化 MQTT 客户端、设置连接选项和遗嘱消息。
     * 然后,创建客户端和连接、并在新线程中调用发布函数。
     * 最后,使用 connection.iter() 方法遍历并处理接中的每个通知。
     */
    fn main() {
        // 初始化日志记录器
        pretty_env_logger::init();
    
        // 设置 MQTT 连接选项和遗嘱消息
        let mut mqttoptions = MqttOptions::new("test-1", "broker.emqx.io", 1883);
        let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false);
        mqttoptions
            .set_keep_alive(Duration::from_secs(5))
            .set_last_will(will);
        // 创建 MQTT 客户端和连接,并启动新线程进行消息发布
        let (client, mut connection) = Client::new(mqttoptions, 10);
        thread::spawn(move || publish(client));
    
        // 遍历并处理连接中的每个通知
        for (i, notification) in connection.iter().enumerate() {
            match notification {
                Ok(notif) => {
                    println!("{i}. Notification = {notif:?}");
                }
                Err(error) => {
                    println!("{i}. Notification = {error:?}");
                    return;
                }
            }
        }   
    
        println!("Done with the stream!!");
    }
    
    /*
     * 这是一个用于发布 MQTT 消息的辅助函数。
     * 在该函数中,首先休眠一秒钟,然后订阅一个主题。
     * 接着,循环发送 10 条长度从 0 到 9 不等的消息,每条消息的 QoS 都设置为“至少一次”。
     */
    fn publish(client: Client) {
        // 订阅主题前等待一秒
        thread::sleep(Duration::from_secs(1));
        client.subscribe("hello/+/world", QoS::AtMostOnce).unwrap();
    
    // 发送 10 条消息,长度从 0 到 9 不等,每条消息的 QoS 都设置为“至少一次”
        for i in 0..10_usize {
            let payload = vec![1; i]; 
            let topic = format!("hello/{i}/world");
            let qos = QoS::AtLeastOnce;
    
            client.publish(topic, qos, true, payload).unwrap();
        }
    
        thread::sleep(Duration::from_secs(1));
    }
    
  3. 编译:

    $ cargo build
    
  4. 运行 syncpubsub:

    $ ./target/debug/syncpubsub
    0. Notification = Incoming(ConnAck(ConnAck { session_present: false, code: Success }))
    1. Notification = Outgoing(Subscribe(1))
    2. Notification = Outgoing(Publish(2))
    3. Notification = Outgoing(Publish(3))
    4. Notification = Outgoing(Publish(4))
    5. Notification = Outgoing(Publish(5))
    6. Notification = Outgoing(Publish(6))
    7. Notification = Outgoing(Publish(7))
    8. Notification = Outgoing(Publish(8))
    9. Notification = Outgoing(Publish(9))
    10. Notification = Outgoing(Publish(10))
    11. Notification = Outgoing(Publish(11))
    12. Notification = Incoming(Publish(Topic = hello/9/world, Qos = AtMostOnce, Retain = true, Pkid = 0, Payload Size = 9))
    13. Notification = Incoming(Publish(Topic = hello/8/world, Qos = AtMostOnce, Retain = true, Pkid = 0, Payload Size = 8))
    14. Notification = Incoming(Publish(Topic = hello/7/world, Qos = AtMostOnce, Retain = true, Pkid = 0, Payload Size = 7))
    15. Notification = Incoming(Publish(Topic = hello/6/world, Qos = AtMostOnce, Retain = true, Pkid = 0, Payload Size = 6))
    ...
    

异步订阅和发布 MQTT 消息

下面的示例展示了如何使用 tokio 库有效管理异步任务,实现异步订阅和发布 MQTT 消息。

  1. 修改 Cargo.toml:

    [[bin]]
    name = "asyncpubsub"
    path = "src/asyncpubsub.rs"
    
  2. 在项目的 src 目录下创建 asyncpubsub.rs 文件,并添加以下代码:

    /*
     * 这行代码从 tokio 库导入了 task 和 time 模块,
     * 用于管理异步任务和处理与时间相关的操作。
     */
    use tokio::{task, time};
    
    use rumqttc::{AsyncClient, MqttOptions, QoS};
    use std::error::Error;
    use std::time::Duration;
    
    /*
     * 这个宏注解表明使用的是 tokio 运行时,
     * 其中 current_thread 表示异步代码将在单线程上下文中运行。
     */
    #[tokio::main(flavor = "current_thread")]
    /*
     * 这是程序的主函数,是一个异步函数。
     * 在这个函数中,首先初始化一个 MQTT 客户端并设置连接选项。
     * 然后,创建异步客户端和事件循环,并在任务中调用请求函数。
     * 最后,通过事件循环轮询并处理事件。
     */
    async fn main() -> Result<(), Box<dyn Error>> {
        // 初始化日志记录器
        pretty_env_logger::init();
        // color_backtrace::install();
    
        // 设置 MQTT 连接选项
        let mut mqttoptions = MqttOptions::new("test-1", "broker.emqx.io", 1883);
        mqttoptions.set_keep_alive(Duration::from_secs(5));
    
        // 创建异步 MQTT 客户端和事件循环
        let (client, mut eventloop) = AsyncClient::new(mqttoptions, 10);
      
        /*
         * 创建一个包含闭包的异步任务。
         * 在闭包内部,首先调用 requests(client).await;执行消息发布和订阅操作。
         * 然后,使用 time::sleep(Duration::from_secs(3)).await; 让任务休眠 3 秒。
         */
        task::spawn(async move {
            requests(client).await;
            time::sleep(Duration::from_secs(3)).await;
        }); 
    
        loop {
            // 在事件循环中等待并获取下一个事件。
            let event = eventloop.poll().await;
            // 对检索到的事件执行模式匹配,以确定其类型
            match &event {
                Ok(v) => {
                    println!("Event = {v:?}");
                }
                Err(e) => {
                    println!("Error = {e:?}");
                    return Ok(());
                }
            }
        }   
    }
    
    /*
     * 这是一个异步函数,用于发布和订阅消息。
     * 在此函数中,订阅一个主题,并循环发送长度从 1 到 10 的消息,每秒发送一条信息。
     * 最后,休眠 120 秒,再处理后续事件。
     */
    async fn requests(client: AsyncClient) {
       
        /*
         * 用于订阅 MQTT 服务器上的特定主题("hello/world")。
         * 指定服务质量(QoS)为 AtMostOnce,表示最多一次消息传递。
         */
        client
            .subscribe("hello/world", QoS::AtMostOnce)
            .await
            .unwrap();
        /*
         * 向“hello/world”主题发送 10 条消息,每条消息的长度从 1 到 10 递增,发送间隔为 1 秒。
         * 每条消息的服务质量(QoS)设置为 ExactlyOnce。
         */
        for i in 1..=10 {
            client
                .publish("hello/world", QoS::ExactlyOnce, false, vec![1; i]) 
                .await
                .unwrap();
    
            time::sleep(Duration::from_secs(1)).await;
        }
    
        time::sleep(Duration::from_secs(120)).await;
    }
    
  3. 编译:

    $ cargo build
    
  4. 运行 asyncpubsub:

    $ ./target/debug/asyncpubsub
    Event = Incoming(ConnAck(ConnAck { session_present: false, code: Success }))
    Event = Outgoing(Subscribe(1))
    Event = Outgoing(Publish(2))
    Event = Incoming(SubAck(SubAck { pkid: 1, return_codes: [Success(ExactlyOnce)] }))
    Event = Outgoing(PubRel(2))
    Event = Incoming(PubRec(PubRec { pkid: 2 }))
    Event = Incoming(Publish(Topic = hello/world, Qos = AtMostOnce, Retain = false, Pkid = 0, Payload Size = 1))
    Event = Incoming(PubComp(PubComp { pkid: 2 }))
    Event = Outgoing(Publish(3))
    Event = Outgoing(PubRel(3))
    ...
    

结语

通过以上基于 rumqttc 的示例,我们演示了如何实现简单的异步订阅和发布功能。除了基本的 MQTT 功能,rumqttc 还支持 MQTT v5 的新特性,如用户属性等。了解更多信息,请参考 rumqtt 示例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/883222.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【多线程】面试高频考点!JUC常见类的详细总结,建议收藏!

&#x1f490;个人主页&#xff1a;初晴~ &#x1f4da;相关专栏&#xff1a;多线程 / javaEE初阶 JUC是“Java Util Concurrency”的缩写&#xff0c;指的是Java并发工具包&#xff0c;它位于java.util.concurrent包及其子包中。JUC包提供了大量用于构建并发应用程序的工具和…

ARM基础

一、ARM ARM公司&#xff08;正式名称为ARM Holdings Ltd.&#xff09;是一家总部位于英国剑桥的半导体和软件设计公司&#xff0c;专注于开发和授权基于ARM架构的处理器技术。 ARM也是一种广泛使用的计算机架构&#xff0c;特别适合于低功耗和高性能的应用。ARM最初由英国的Ac…

【Redis】分布式锁之 Redission

一、基于setnx实现的分布式锁问题 重入问题&#xff1a;获得锁的线程应能再次进入相同锁的代码块&#xff0c;可重入锁能防止死锁。例如在HashTable中&#xff0c;方法用synchronized修饰&#xff0c;若在一个方法内调用另一个方法&#xff0c;不可重入会导致死锁。而synchroni…

WebGL入门(一)绘制一个点

源码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><title>Document</title><scr…

2-103 基于matlab的光电信号下血氧饱和度计算

基于matlab的光电信号下血氧饱和度计算&#xff0c;光转换成电信号时&#xff0c;由于动脉对光的吸收有变化而其他组织对光的吸收基本不变&#xff0c;得到的信号就可以分为直流DC信号和交流AC信号。提取AC信号&#xff0c;就能反应出血液流动的特点。这种技术叫做光电容积脉搏…

STM32基础学习笔记-Timer定时器面试基础题5

第五章、TIMER 常见问题 1、基本概念&#xff1a;什么是定时器 &#xff1f;作用 &#xff1f;分类 &#xff1f; 2、时基单元 &#xff1f;组成 &#xff1f;计数模式 &#xff1f;溢出条件 &#xff1f; 溢出时间计算 &#xff1f; 3、systick原理 &#xff1f;代码讲解 &…

MODIS/Landsat/Sentinel下载教程详解【常用网站及方法枚举】

⛄前言 在当今快速发展的地球观测时代&#xff0c;遥感技术作为获取地球表面及其环境信息的重要手段&#xff0c;正以前所未有的广度和深度改变着我们对自然界的认知与管理方式。MODIS&#xff08;Moderate-resolution Imaging Spectroradiometer&#xff0c;中分辨率成像光谱…

网络通信——OSI七层模型和TCP/IP模型

OSI模型 一.OSI七层模型 OSI&#xff08;Open System Interconnect&#xff09;七层模型是一种将计算机网络通信协议划分为七个不同层次的标准化框架。每一层都负责不同的功能&#xff0c;从物理连接到应用程序的处理。这种模型有助于不同的系统之间进行通信时&#xff0c;更…

分享课程:VUE数据可视化教程

在当今这个数据驱动的世界中&#xff0c;数据可视化已经成为了一种至关重要的工具&#xff0c;它帮助我们理解复杂的数据集&#xff0c;发现模式、趋势和异常。数据可视化不仅仅是将数字转换成图表&#xff0c;它是一种将数据转化为洞察力的艺术。 1.什么是数据可视化&#xf…

DNS协议解析

DNS协议解析 什么是DNS协议 IP地址&#xff1a;一长串唯一标识网络上的计算机的数字 域名&#xff1a;一串由点分割的字符串名字 网址包含了域名 DNS&#xff1a;域名解析协议 IP>域名 --反向解析 域名>IP --正向解析 域名 由ICANN管理&#xff0c;有级别&#xf…

CVE-2024-46101

前言 自己挖的第一个CVE~ 喜提critical 这里简单说一下。 漏洞简介 GDidees CMS < 3.9.1 的版本&#xff0c;存在一个任意文件上传漏洞。允许登录后的攻击者上传webshell获得网站的权限。 影响版本&#xff1a; GDidees CMS < 3.9.1 &#xff08;其它的我没测。。&am…

二叉树之堆树

堆树是一种完全二叉树&#xff0c;完全二叉树特点&#xff1a;除了最后一层所有层都填满&#xff0c;最后一层节点从左到右排列。堆树分为两种类型&#xff1a;大顶堆和小顶堆。 大顶堆&#xff1a;每个节点的值都大于或等于其子节点的值&#xff0c;根节点是最大值。 小顶堆…

降准降息一揽子措施点燃 A 股激情,4% 大涨之后趋势深度剖析

文章目录 牛回速归原因分析引爆点情绪和信心一根大阳线&#xff0c;千军万马来相见阴霾是否一扫而空还未可知 流动性和增量 潜在隐患等待经济复苏配套政策期待中美关系进展 短期内趋势分析空军短期内仍有余力如何看待第2日的回撤外围 趋势分析结论短期内可能仍有波折中长期会是…

Flink Task 日志文件隔离

Flink Task 日志文件隔离 任务在启动时会先通过 MdcUtils 启动一个 slf4j 的 MDC 环境&#xff0c;然后将 jobId 添加到 slf4j 的 MDC 容器中&#xff0c;随后任务输出的日志都将附带 joid。 MDC 介绍如下&#xff1a; MDC ( Mapped Diagnostic Contexts )&#xff0c;它是一个…

C/C++逆向:循环语句逆向分析

在逆向分析中&#xff0c;循环语句通常会以特定的汇编模式或结构体现出来。常见的循环语句包括 for 循环、while 循环和 do-while 循环。由于不同的编译器会根据代码优化的级别生成不同的汇编代码&#xff0c;分析循环的模式也可能会有所不同。以下是三种常见循环语句的汇编分析…

【C++ Primer Plus习题】17.7

问题: 解答: #include <iostream> #include <vector> #include <string> #include <fstream> #include <algorithm>using namespace std;const int LIMIT 50;void ShowStr(const string& str); void GetStrs(ifstream& fin, vector<…

ShardingSphere 分库分表

中间件 常用中间件 MyCat 是基于 Proxy&#xff0c;它复写了 MySQL 协议&#xff0c;将 Mycat Server 伪装成⼀个 MySQL 数据库客户端所有的jdbc请求都必须要先交给MyCat&#xff0c;再有 MyCat转发到具体的真实服务器缺点是效率偏低&#xff0c;中间包装了⼀层代码⽆侵⼊性…

【刷题3】找到字符串中所有字母异位词、串联所有单词的子串

目录 一、找到字符串中所有字母异位词二、串联所有单词的子串 一、找到字符串中所有字母异位词 题目&#xff1a; 思路&#xff1a; 用一个变量count来统计有效字符的个数。哈希表2统计字符串p的每个字符出现的个数&#xff0c;然后遍历字符串s&#xff0c;先进窗口&#xf…

Unity-物理系统-碰撞检测-物理材质

物理材质的作用&#xff1a;改变碰撞效果 因为碰撞的过程是相互的&#xff0c;所以在碰撞双方都要加相同的物理材质才能实现效果 物理材质创建 参数

微软宣布弃用WSUS,企业用户尽早准备替换方案

微软最近宣布将逐步弃用Windows Server Update Services (WSUS)&#xff0c;不再为其开发新功能&#xff0c;但会继续支持现有的更新和功能。这一决定对企业客户来说影响深远&#xff0c;尤其是那些依赖WSUS来管理大规模Windows设备更新的组织。 对企业客户的影响 安全性与合规…