Flutter之使用mqtt进行连接和信息传输的使用案例

目录

引言

什么是MQTT?

在Flutter中使用MQTT

安装

iOS 

安卓

创建一个全局的客户端对象

 配置客户端对象

 连接(异步)

监听接受的消息

发送消息 

监听连接状态和订阅的回调


引言

 随着移动应用开发技术的发展,实时通信成为了许多应用程序不可或缺的一部分。无论是社交应用中的即时消息传递,还是物联网(IoT)设备之间的数据交换,都需要一个高效稳定的通信机制。MQTT(Message Queuing Telemetry Transport)作为一种轻量级的消息协议,非常适合于这种场景。本文将介绍如何在Flutter项目中集成MQTT,并通过一个简单的示例来演示其基本用法。

什么是MQTT?

MQTT是一种基于发布/订阅模式的轻量级消息协议,设计初衷是为了提供低开销、低带宽的网络连接。它特别适合于远程位置的通信,如传感器与中央服务器之间的数据传输。MQTT的主要特点包括:

  • 轻量级:非常小的代码占用空间和带宽使用。
  • 发布/订阅模型:允许一对多的消息分发,即一个消息可以发送给多个客户端。
  • 服务质量(QoS):提供了三种不同的服务质量级别,以满足不同场景下的需求。
  • 安全性:支持TLS/SSL加密,确保数据传输的安全性。

 

在Flutter中使用MQTT

首先需要安装mqtt_client这个依赖,执行下面命令

flutter pub add mqtt_client 

安装

如果您在 Android 或 iOS 设备上的 Flutter 环境中使用客户端,则需要进行以下设备权限设置。

iOS 

将以下键添加到位于ios/Runner/Info.plist的Info.plist文件中:

<key>NSLocalNetworkUsageDescription</key>
<string>Looking for local tcp Bonjour service</string>
<key>NSBonjourServices</key>
<array>
  <string>mqtt.tcp</string>
</array>

安卓

将以下 Android 权限添加到位于android/app/src/main/AndroidManifest.xml的AndroidManifest.xml文件中:

<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />

在页面中导入:

import 'package:mqtt_client/mqtt_client.dart';

使用案例, 这里我们使用的是wss协议:

import 'dart:convert';
import 'package:flutter_diancan/utils/logger_helper.dart';
import 'package:mqtt_client/mqtt_client.dart';
import 'package:mqtt_client/mqtt_server_client.dart';
import 'package:shared_preferences/shared_preferences.dart';

class MqttServe {
  final client = MqttServerClient('请求地址', '');

  Future<MqttClient> connect() async {
    try {
      client.setProtocolV311();
      client.logging(on: true);
      client.port = 443; // 端口号
      client.keepAlivePeriod = 60;

      client.websocketProtocols = ['mqtt'];
      client.useWebSocket = true;  // 因为我们这里使用的是wss协议所以加这个,这个根据自己的需求来定是否需要
      client.onConnected = onConnected;
      client.onDisconnected = onDisconnected;
      client.onUnsubscribed = onUnsubscribed;
      client.onSubscribed = onSubscribed;
      client.onSubscribeFail = onSubscribeFail;
      client.pongCallback = pong;
      client.connectTimeoutPeriod = 60;

      final connMess = MqttConnectMessage()
          .authenticateAs("用户名", "密码")
          .withClientIdentifier('Mqtt_MyClientUniqueId')
          .withWillTopic('willtopic')
          .withWillMessage('My Will message')
          .startClean()
          .withWillQos(MqttQos.atLeastOnce);
      client.connectionMessage = connMess;
      try {
        print('Connecting');
        await client.connect();
      } catch (e) {
        print('Exception: $e');
        client.disconnect();
      }

      client.updates!
          .listen((List<MqttReceivedMessage<MqttMessage?>>? c) async {
        final recMessage = c![0].payload as MqttPublishMessage;
        final payload = MqttPublishPayload.bytesToStringAsString(
            recMessage.payload.message);

        print('Received message:$payload from topic: ${c[0].topic}');
      });
    } catch (e, s) {
      LoggerHelper.fatal(e, s);
    }

    return client;
  }

  Future<void> sendMessage() async {
    if (client.connectionStatus?.state == MqttConnectionState.connected) {
      final builder = MqttClientPayloadBuilder();
      var payloadObject = {'MsgData': "发送成功啦"};
      print("发送的信息:${json.encode(payloadObject)} ");
      builder.addUTF8String(json.encode(payloadObject));

      client.publishMessage('发送消息的订阅地址', MqttQos.atLeastOnce, builder.payload!);
    }
  }

// Connected callback
  void onConnected() {
    print("已连接");
    try {
      // 连接后订阅
      client.subscribe('订阅地址', MqttQos.atLeastOnce);
    } catch (e, s) {
      LoggerHelper.fatal(e, s);
    }
  }

// Disconnected callback
  void onDisconnected() async {
    print('已断开');
    final SharedPreferences prefs = await SharedPreferences.getInstance();
    String? token = prefs.getString('token');
    if (token != null) {
      reconnect();
    }
  }

  Future<void> reconnect() async {
    print("重连中");
    int retryCount = 0;
    const maxRetries = 10;
    const baseRetryInterval = 2; // 初始重连间隔时间(秒)
    while (retryCount < maxRetries) {
      try {
        print('Reconnecting attempt ${retryCount + 1}...');
        await client.connect();
        if (client.connectionStatus?.state == MqttConnectionState.connected) {
          print('Reconnected successfully.');
          break;
        }
      } catch (e) {
        print('Reconnect failed: $e');
      }
      // 计算下一次重连间隔时间(指数退避)
      int retryInterval = baseRetryInterval * (2 << retryCount);
      await Future.delayed(Duration(seconds: retryInterval));
      retryCount++;
    }
  }

  // 关闭
  Future<void> close() async {
    try {
      // 重新订阅
      client.unsubscribe('订阅地址');
    } catch (e, s) {
      LoggerHelper.fatal(e, s);
    }

    client.disconnect();
  }

// Subscribed callback
  void onSubscribed(String topic) {
    print('订阅成功,主题为: $topic');
  }

// Subscribed failed callback
  void onSubscribeFail(String topic) {
    print('订阅失败,主题为: $topic');
  }

// Unsubscribed callback
  void onUnsubscribed(String? topic) {
    print('Unsubscribed topic: $topic');
  }

// Ping callback
  void pong() {
    print('调用Ping响应客户端回调');
  }
}

  • 创建一个全局的客户端对象

final client = MqttServerClient('请求地址', '');
  •  配置客户端对象

      client.setProtocolV311();
      client.logging(on: true);
      client.port = 443; // 端口号
      client.keepAlivePeriod = 60;
      client.websocketProtocols = ['mqtt'];
      client.useWebSocket = true;  // 因为我们这里使用的是wss协议所以加这个,这个根据自己的需求来定是否需要
      client.onConnected = onConnected;
      client.onDisconnected = onDisconnected;
      client.onUnsubscribed = onUnsubscribed;
      client.onSubscribed = onSubscribed;
      client.onSubscribeFail = onSubscribeFail;
      client.pongCallback = pong;
      client.connectTimeoutPeriod = 60;
  • 设置连接消息
 final connMess = MqttConnectMessage()
          .authenticateAs("用户名", "密码")
          .withClientIdentifier('Mqtt_MyClientUniqueId')
          .withWillTopic('willtopic')
          .withWillMessage('My Will message')
          .startClean()
          .withWillQos(MqttQos.atLeastOnce);
 client.connectionMessage = connMess;
  •  连接(异步)

 try {
        print('Connecting');
        await client.connect();
      } catch (e) {
        print('Exception: $e');
        client.disconnect();
      }
  • 监听接受的消息

 client.updates!
          .listen((List<MqttReceivedMessage<MqttMessage?>>? c) async {
        final recMessage = c![0].payload as MqttPublishMessage;
        final payload = MqttPublishPayload.bytesToStringAsString(
            recMessage.payload.message);

        print('Received message:$payload from topic: ${c[0].topic}');
      });
  • 发送消息 

Future<void> sendMessage() async {
    if (client.connectionStatus?.state == MqttConnectionState.connected) {
      final builder = MqttClientPayloadBuilder();
      var payloadObject = {'MsgData': "发送成功啦"};
      print("发送的信息:${json.encode(payloadObject)} ");
      builder.addUTF8String(json.encode(payloadObject));

      client.publishMessage('发送消息的订阅地址', MqttQos.atLeastOnce, builder.payload!);
    }
  }
  • 监听连接状态和订阅的回调

// Connected callback
  void onConnected() {
    print("已连接");
    try {
      // 连接后订阅
      client.subscribe('订阅地址', MqttQos.atLeastOnce);
    } catch (e, s) {
      LoggerHelper.fatal(e, s);
    }
  }

// Disconnected callback
  void onDisconnected() async {
    print('已断开');
    final SharedPreferences prefs = await SharedPreferences.getInstance();
    String? token = prefs.getString('token');
    if (token != null) {
      reconnect();
    }
  }

  Future<void> reconnect() async {
    print("重连中");
    int retryCount = 0;
    const maxRetries = 10;
    const baseRetryInterval = 2; // 初始重连间隔时间(秒)
    while (retryCount < maxRetries) {
      try {
        print('Reconnecting attempt ${retryCount + 1}...');
        await client.connect();
        if (client.connectionStatus?.state == MqttConnectionState.connected) {
          print('Reconnected successfully.');
          break;
        }
      } catch (e) {
        print('Reconnect failed: $e');
      }
      // 计算下一次重连间隔时间(指数退避)
      int retryInterval = baseRetryInterval * (2 << retryCount);
      await Future.delayed(Duration(seconds: retryInterval));
      retryCount++;
    }
  }

  // 关闭
  Future<void> close() async {
    try {
      // 重新订阅
      client.unsubscribe('订阅地址');
    } catch (e, s) {
      LoggerHelper.fatal(e, s);
    }

    client.disconnect();
  }

// Subscribed callback
  void onSubscribed(String topic) {
    print('订阅成功,主题为: $topic');
  }

// Subscribed failed callback
  void onSubscribeFail(String topic) {
    print('订阅失败,主题为: $topic');
  }

// Unsubscribed callback
  void onUnsubscribed(String? topic) {
    print('Unsubscribed topic: $topic');
  }

// Ping callback
  void pong() {
    print('调用Ping响应客户端回调');
  }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/921100.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

知识中台:提升企业知识管理的智能化水平

在数字化转型的浪潮中&#xff0c;企业知识管理的智能化水平成为提升竞争力的关键。HelpLook知识中台通过集成先进的AI技术&#xff0c;为企业提供了一个智能化的知识管理平台。 一、知识管理智能化的重要性 智能化的知识管理不仅能够提高信息检索的效率&#xff0c;还能通过…

Unreal5从入门到精通之EnhancedInput增强输入系统详解

前言 从Unreal5开始,老版的输入系统,正式替换为EnhancedInput增强型输入系统,他们之间有什么区别呢? 如果有使用过Unity的同学,大概也知道,Unity也在2020版本之后逐渐把输入系统也升级成了新版输入系统,为什么Unreal和Unity都热衷于升级输入系统呢?这之间又有什么联系…

C语言数据结构与算法--简单实现队列的入队和出队

&#xff08;一&#xff09;队列的基本概念 和栈相反&#xff0c;队列(Queue)是一种先进先出&#xff08;First In First Out&#xff09;的线性表。只 允许在表的一端进行插入&#xff0c;而在另一端删除元素&#xff0c;如日常生活中的排队现象。队列中 允许插入的一端叫队尾…

docker搭建私有仓库,实现镜像的推送和拉取

1.拉取docker仓库镜像 docker pull registry 2.启动registry容器 docker run -d registry 3.查看当前仓库中存在的镜像&#xff08;一&#xff09; curl -XGET http://192.168.111.162: 5000/v2/_catalog 192.168.111.162 部署docker仓库宿主机的ip 5000 部署docker仓库映射到宿…

算法学习笔记(九):网格图DFS、图论算法DFS、动态规划DP、贪心

一.网格图DFS 适用于需要计算连通块个数、大小的题目 1.岛屿数量 给你一个由 1(陆地) 和 0&#xff08;水&#xff09;组成的二维网格&#xff0c;请你计算网格中岛屿的数量 岛屿总是被水包围&#xff0c;并且每座岛屿只能由水平方向和\或竖直方向上相邻的陆地连接形成 此外&…

Cmakelist.txt之Linux-redis配置

1.cmakelist.txt cmake_minimum_required(VERSION 3.16) ​ project(redis_linux_test LANGUAGES C) ​ ​ ​ add_executable(redis_linux_test main.c) ​ # 设置hiredis库的头文件路径和库文件路径 set(Hiredis_INCLUDE_DIR /usr/local/include/hiredis) set(Hiredis_LIBRA…

【Node.js】Node.js 和浏览器之间的差异

Node.js 是一个强大的运行时环境&#xff0c;它在现代 JavaScript 开发中扮演着重要角色。然而&#xff0c;许多开发者在使用 Node.js 时常常会感到困惑&#xff0c;尤其是与浏览器环境的对比。本文将深入探讨 Node.js 和浏览器之间的差异&#xff0c;帮助你全面理解两者的设计…

【物联网原理与应用】实验二:红外传感实验

目录 一、实验目的 二、实验原理 三、实验内容及步骤 四、实验结果 五、核心代码 一、实验目的 学习试验模块上线路的连接操作理解掌握红外传感器的工作原理实现对红外传感器数据的接收和处理 二、实验原理 1、将红外辐射能转换成电能的光敏元件称为红外传感器&#…

PAL(Program-Aided Language Model)

PAL&#xff08;Program-Aided Language Model&#xff09;是一种结合生成式语言模型&#xff08;如 GPT&#xff09;和程序执行能力的技术框架。它的核心思想是通过让语言模型生成代码或程序来解决复杂任务&#xff0c;程序执行的结果反过来增强语言模型的输出准确性和逻辑性。…

java基础概念36:正则表达式1

一、正则表达式的作用 作用一&#xff1a;校验字符串是否满足规则&#xff1b;作用二&#xff1a;在一段文本中查找满足要求的内容。——爬虫 二、正则表达式 2-1、字符类 示例&#xff1a; public static void main(String[] args) {System.out.println("a".matc…

VsCode 插件推荐(个人常用)

VsCode 插件推荐&#xff08;个人常用&#xff09;

工业储能柜的大小该如何选择,工商储能系统设备哪家好?

在能源转型和可持续发展的大潮中&#xff0c;工商业储能系统因其提升清洁能源利用率、降低电能损耗、实现“双碳”目标等优势而备受青睐。它们不仅增强了电力系统的可靠性和灵活性&#xff0c;还帮助企业降低成本、提高经济效益。储能系统通过负荷管理适应电价波动&#xff0c;…

人工智能之数学基础:线性代数在人工智能中的地位

本文重点 从本文开始&#xff0c;我们将开启线性代数的学习&#xff0c;在线性代数中有向量、矩阵&#xff0c;以及各种性质&#xff0c;那么这些数学知识究竟和人工智能有什么关系呢&#xff1f; 重要性 机器学习和深度学习的本质就是训练模型&#xff0c;要想训练模型需要使…

数字IC后端实现时钟树综合系列教程 | Clock Tree,Clock Skew Group之间的区别和联系

Q: Clock&#xff0c;Clock Tree和Skew Group有何区别&#xff1f;Innovus CCOPT引擎是如何使用这些的&#xff1f; Clock是时序约束SDC中的时钟定义点。 create_clock -name clk_osc -period $period_24m [get_ports xin_osc0_func] 时钟树综合(Clock Tree Synthesis)之前应…

飞桨大模型PaddleOCR

一、新建项目PaddleOCRProject 二、查看开源 pip install paddlepaddle pip install paddleocr指定镜像源下载才快&#xff1a; pip install paddlepaddle -i https://pypi.tuna.tsinghua.edu.cn/simple pip install paddleocr -i https://pypi.tuna.tsinghua.edu.cn/simple 三…

31、js中日期操作

<!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>日期</title> </head> <body></body> <script>// js中日期操作 var datenew Date();document.write("日期时间&am…

【大数据学习 | Spark】Spark中的join原理

join是两个结果集之间的链接&#xff0c;需要进行数据的匹配。 演示一下join是否存在shuffle。 1. 如果两个rdd没有分区器&#xff0c;分区个数一致 &#xff0c;会发生shuffle。但分区数量不变。 scala> val arr Array(("zhangsan",300),("lisi",…

NLP论文速读(CVPR 2024)|使用DPO进行diffusion模型对齐

论文速读|Diffusion Model Alignment Using Direct Preference Optimization 论文信息&#xff1a; 简介&#xff1a; 本文探讨的背景是大型语言模型&#xff08;LLMs&#xff09;通过人类比较数据和从人类反馈中学习&#xff08;RLHF&#xff09;的方法进行微调&#xff0c;以…

小车AI视觉识别--9.目标检测

一、目标检测概述 本节主要解决的问题是如何使用OpenCV中的dnn模块&#xff0c;用来导入一个实现训练好的目标检测网络。但是对opencv的版本是有要求的。目前用深度学习进行目标检测&#xff0c;主要有三种方法&#xff1a; Faster R-CNNsYou Only Look Once(YOLO)Single Shot…

2023年9月GESPC++一级真题解析

一、单选题&#xff08;每题2分&#xff0c;共30分&#xff09; 题号 123456789101112131415 答案 CDBCDBACACBBDDA 1. 我们通常说的 “ 内存 ” 属于计算机中的&#xff08;&#xff09;。 A. 输出设备 B. 输 ⼊ 设备 C. 存储设备 D. 打印设备 【答案】 C 【考纲知识点】…