vue3中使用mqtt数据传输(封装)

使用版本

"mqtt": "^5.8.0",

安装指令

npm install mqtt --save
------
yarn add mqtt

介绍mqtt

参考使用文档

配置

connection: {
  protocol: "ws",
  host: "broker.emqx.io",
  port: 8083,
  endpoint: "/mqtt",
  clean: true,
  connectTimeout: 30 * 1000, // ms
  reconnectPeriod: 4000, // ms
  clientId: "emqx_vue_" + Math.random().toString(16).substring(2, 8),
  // 随机数 每次不能重复
  username: "emqx_test",
  password: "emqx_test",
},

连接

import mqtt from "mqtt";
let client = {}
client = mqtt.connect(url, options)

client.on('connect', (e) => {
  // 订阅主题
  
})

订阅主题

client.subscribe(topic, { qos: 1 }, (err) => {
  if (!err) {
    console.log('订阅成功')
  } else {
    console.log('消息订阅失败!')
  }
})

消息发布

给后端发送格式,是和后端约定好的数据格式,一般为JSON传输。

client.publish(publishTopic, `{"messageType":1,"messageContent":""}`, { qos: 0 }, (err) => {
  if (!err) {
    console.log('发送成功')
    client.subscribe(topic, { qos: 1 }, (err) => {
      if (!err) {
        console.log('订阅成功')
      } else {
        console.log('消息订阅失败!')
      }
    })
  } else {
    console.log('消息发送失败!')
  }
})

取消订阅

client.unsubscribe(topicList, (error) => {
  console.log('主题为' + topicList + '取消订阅成功', error)
})

断开连接

export function unconnect() {
  client.end()
  client = null
  // Message.warning('服务器已断开连接!')
  console.log('服务器已断开连接!')
}

mqtt封装使用(ts版)

import type { IClientOptions, MqttClient } from 'mqtt';
import mqtt from 'mqtt';

interface ClientOptions extends IClientOptions {
  clientId: string;
}

interface SubscribeOptions {
  topic: string;
  callback: (topic: string, message: string) => void;
  subscribeOption?: mqtt.IClientSubscribeOptions;
}

interface PublishOptions {
  topic: string;
  message: string;
}

class Mqtt {
  private static instance: Mqtt;
  private client: MqttClient | undefined;
  private subscribeMembers: Record<string, ((topic: string, message: string) => void) | undefined> = {};
  private pendingSubscriptions: SubscribeOptions[] = [];
  private pendingPublications: PublishOptions[] = [];
  private isConnected: boolean = false;

  private constructor(url?: string) {
    if (url) {
      this.connect(url);
    }
  }

  public static getInstance(url?: string): Mqtt {
    if (!Mqtt.instance) {
      Mqtt.instance = new Mqtt(url);
    } else if (url && !Mqtt.instance.client) {
      Mqtt.instance.connect(url);
    }
    return Mqtt.instance;
  }

  private connect(url: string): void {
    console.log(url, clientOptions);
    if (!this.client) {
      this.client = mqtt.connect(url, clientOptions);
      this.client.on('connect', this.onConnect);
      this.client.on('reconnect', this.onReconnect);
      this.client.on('error', this.onError);
      this.client.on('message', this.onMessage);
    }
  }

  public disconnect(): void {
    if (this.client) {
      this.client.end();
      this.client = undefined;
      this.subscribeMembers = {};
      this.isConnected = false;
      console.log(`服务器已断开连接!`);
    }
  }

  public subscribe({ topic, callback }: SubscribeOptions): void {
    if (this.isConnected) {
      this.client?.subscribe(topic, { qos: 1 }, error => {
        if (error) {
          console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}失败: `, error);
        } else {
          console.log(`客户端: ${clientOptions.clientId}, 订阅主题: ${topic}成功`);
        }
      });
      this.subscribeMembers[topic] = callback;
    } else {
      this.pendingSubscriptions.push({ topic, callback });
    }
  }

  public unsubscribe(topic: string): void {
    if (!this.client) {
      return;
    }
    this.client.unsubscribe(topic, error => {
      if (error) {
        console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}失败: `, error);
      } else {
        console.log(`客户端: ${clientOptions.clientId}, 取消订阅主题: ${topic}成功`);
      }
    });
    this.subscribeMembers[topic] = undefined;
  }

  public publish({ topic, message }: PublishOptions): void {
    if (this.isConnected) {
      this.client?.publish(topic, message, { qos: 1 }, e => {
        if (e) {
          console.log(`客户端: ${clientOptions.clientId}, 发送主题为: ${topic} 的消息, 发送失败: `, e);
        }
      });
    } else {
      this.pendingPublications.push({ topic, message });
    }
  }

  private onConnect = (e: any): void => {
    console.log(`客户端: ${clientOptions.clientId}, 连接服务器成功:`, e);
    this.isConnected = true;
    this.processPendingSubscriptions();
    this.processPendingPublications();
  };

  private onReconnect = (): void => {
    console.log(`客户端: ${clientOptions.clientId}, 正在重连:`);
    this.isConnected = false;
  };

  private onError = (error: Error): void => {
    console.log(`客户端: ${clientOptions.clientId}, 连接失败:`, error);
    this.isConnected = false;
  };

  private onMessage = (topic: string, message: Buffer): void => {
    // console.log(
    //   `客户端: ${clientOptions.clientId}, 接收到来自主题: ${topic} 的消息: `,
    //   message.toString(),
    // );
    const callback = this.subscribeMembers?.[topic];
    callback?.(topic, message.toString());
  };

  private processPendingSubscriptions(): void {
    while (this.pendingSubscriptions.length > 0) {
      const { topic, callback, subscribeOption } = this.pendingSubscriptions.shift()!;
      this.subscribe({ topic, callback, subscribeOption });
    }
  }

  private processPendingPublications(): void {
    while (this.pendingPublications.length > 0) {
      const { topic, message } = this.pendingPublications.shift()!;
      this.publish({ topic, message });
    }
  }
}

const clientOptions: ClientOptions = {
  clean: true,
  connectTimeout: 500,
  protocolVersion: 5,
  rejectUnauthorized: false,
  username: 'admin',
  password: 'Anjian-emqx',
  clientId: `client-${Date.now()}`
};

// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance("ws://192.168.11.14:8083/mqtt");
// export default Mqtt.getInstance(JSON.parse(import.meta.env.VITE_OTHER_SERVICE_BASE_URL).mqtt);
const { protocol, host } = window.location;
export default Mqtt.getInstance(`${protocol.replace('http', 'ws')}//${host.replace('localhost', '127.0.0.1')}/mqtt/`);

注意:

  1. 环境配置
    .env.test
VITE_OTHER_SERVICE_BASE_URL= `{
  "mqtt": "ws://192.168.11.14:8083/mqtt"
}`
  1. qos设置 前后端统一为1
    在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/910608.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

全面解析谷歌浏览器的功能与使用技巧

谷歌浏览器&#xff08;Google Chrome&#xff09;作为全球最受欢迎的网页浏览器之一&#xff0c;以其简洁的界面、快速的加载速度和强大的功能赢得了广大用户的青睐。本文将全面解析谷歌浏览器的功能和使用技巧&#xff0c;帮助您更好地利用这一工具提升上网体验。&#xff08…

《探索Zynq MPSoC》学习笔记(二)

引言&#xff1a;本文开始学习第二章内容&#xff0c;本文重点介绍FPGA、Zynq和Zynq MPSoC器件技术演进以及Zynq和Zynq MPSoC器件的基本结构和特点。 第二章 FPGA、Zynq和Zynq MPSoC &#xff08;1&#xff09; Zynq MPSoC是Xilinx发布的第一款SoC Zynq-7000片上系统&#xf…

mac 本地docker-mysql主从复制部署

mac 本地docker-mysql主从复制部署,服务器同理 1.本地docker启动两个mysql服务.端口号不一样 没有选择挂载到宿主机.只做测试用. 只是端口号不一样容器删掉.就没有数据了. 生产测试,需要挂在 master docker run -d --name mysql-slave -p 3308:3306 \ -e MYSQL_ROOT_PASSWORD…

七.numpy模块

NumPy(Numerical Python) 是 Python 语言的一个扩展程序库&#xff0c;支持大量的维度数组与矩阵运算&#xff0c;此外也针对数组运算提供大量的数学函数库。 NumPy 的前身 Numeric 最早是由 Jim Hugunin 与其它协作者共同开发&#xff0c;2005 年&#xff0c;Travis Oliphant…

测试用例小锦囊——基于思维导图的测试用例生成和维护

敲黑板&#xff0c;测试用例真的很重要&#xff01; 测试用例是测试工作的基础&#xff0c;通过提供结构化和系统化的方法&#xff0c;来帮助验证软件产品的功能是否按预期正确实现&#xff0c;从而确保软件质量&#xff0c;提升用户满意度。 测试用例的关键要素包括用例编号、…

Linux网络命令:用于查看和修改路由表的重要工具ip route 详解

目录 一、概述 二、用法 1、基本语法 2、参数说明 3、常用选项 4、获取帮助 三、基本用法示例 1、 查看路由表 2、 添加路由 3、 删除路由 4、 修改路由 5、 添加默认路由 6、 删除默认路由 四、路由表管理 1、查看所有路由表 2、指定路由表 五、其他选项 1、…

银行信贷风控专题:Python、R 语言机器学习数据挖掘应用实例合集:xgboost、决策树、随机森林、贝叶斯等

银行信贷风控专题&#xff1a;Python、R 语言机器学习数据挖掘应用实例合集&#xff1a;xgboost、决策树、随机森林、贝叶斯等 原创 拓端研究室 全文链接&#xff1a;https://tecdat.cn/?p38026 在当今金融领域&#xff0c;风险管控至关重要。无论是汽车贷款违约预测、银行挖掘…

容器内pip安装Apache Airflow的经历:如何重置初始密码

背景 Apache Airflow™https://github.com/apache/airflow 是一个开源平台&#xff0c;用于开发、调度和监控面向批处理的工作流程。Airflow 可扩展的 Python 框架使您能够构建几乎可以连接任何技术的工作流程。Web 界面有助于管理工作流程的状态。Airflow 可以通过多种方式部…

RHCE作业四

一要求&#xff1a; 1.搭建dns服务器能够对自定义的正向或者反向域完成数据解析查询。 2.配置从DNS服务器&#xff0c;对主dns服务器进行数据备份。 二操作&#xff1a; 主服务器 1.安装 2主配置真反向 3正反设置 区域 1安装 2添加allow-transfer 3增量 4重启 Systemctl …

算法练习:1658. 将 x 减到 0 的最小操作数

题目链接&#xff1a;1658. 将 x 减到 0 的最小操作数 这道题目的意思就是&#xff0c;给定一个整数数组&#xff0c;和一个x&#xff0c;只能从数组最左边或者最右边进行删除&#xff0c;使得x恰好等于0&#xff0c;并且要操作次数最少的情况&#xff0c;否则返回-1. 这道题直…

职场如雷场,稍有不慎就会被炸翻?十大生存法则送给你

大多数人的一生都要经历过&#xff1a;求学&#xff0c;入职&#xff0c;退休三个阶段。其中职场生涯一般都在30至40年左右&#xff0c;占据了人生的大部分时间&#xff0c;而这段时间&#xff0c;是每个人最年富力强&#xff0c;精力充沛的时光。 那么&#xff0c;如何把这人…

这款神器,运维绝杀 !!!

项目简介 CrowdSec 是一款开源的、基于社区协作的网络安全防护工具&#xff0c;它通过分析和共享IP信誉数据来对抗恶意行为。该软件不仅支持IPv6&#xff0c;而且相较于传统的Python实现&#xff0c;其采用Go语言编写&#xff0c;运行速度提升了60倍。CrowdSec 利用Grok模式解析…

[C++] cpphttplib使用https而不是http

前言 首先我们假设是直接使用 httplib.h 的源文件。 支持 https 根据readme来看&#xff0c;需要开启一个宏&#xff0c;链接libssl和libcrypto就可以了。 下载openssl 保姆级OpenSSL下载及安装教程 选择非light的版本&#xff0c;这样才会有头文件和lib库引入文件。 编写C…

gitee 使用 webhoot 触发 Jenkins 自动构建

一、插件下载和配置 Manage Jenkins>Plugin Manager 搜索 gitee 进行安装 插件配置 1、前往Jenkins -> Manage Jenkins -> System -> Gitee Configuration -> Gitee connections 2、在 Connection name 中输入 Gitee 或者你想要的名字 3、Gitee host URL 中…

MDC(重要)

1.简介 MDC 介绍​ MDC&#xff08;Mapped Diagnostic Context&#xff0c;映射调试上下文&#xff09;是 log4j 和 logback 提供的一种方便在多线程条件下记录日志的功能。MDC 可以看成是一个与当前线程绑定的Map&#xff0c;可以往其中添加键值对。MDC 中包含的内容可以被同一…

Linux—进程学习-01

目录 Linux—进程学习—11.冯诺依曼体系结构2.操作系统2.1操作系统的概念2.2操作系统的目的2.3如何理解管理2.4计算机软硬件体系的理解2.5系统调用和库函数的概念 3.进程3.1进程是什么3.2管理进程3.2.1描述进程-PCB3.2.2组织进程3.2.3总结 3.3查看进程 4.与进程有关的系统调用 …

初始JavaEE篇——多线程(5):生产者-消费者模型、阻塞队列

找往期文章包括但不限于本期文章中不懂的知识点&#xff1a; 个人主页&#xff1a;我要学编程程(ಥ_ಥ)-CSDN博客 所属专栏&#xff1a;JavaEE 文章目录 阻塞队列生产者—消费者模型生产者—消费者模型的优势&#xff1a;生产者—消费者模型的劣势&#xff1a; Java标准库中的阻…

Redis常见面试题(二)

Redis性能优化 Redis性能测试 阿里Redis性能优化 使用批量操作减少网络传输 Redis命令执行步骤&#xff1a;1、发送命令&#xff1b;2、命令排队&#xff1b;3、命令执行&#xff1b;4、返回结果。其中 1 与 4 消耗时间 --> Round Trip Time&#xff08;RTT&#xff0c;…

Scala学习记录,List

List是一个不可变&#xff08;immutable&#xff09;的序列。特点&#xff1a;数据是有序的 前面学习的Set&#xff0c;Map数据是无序的&#xff1b;Array是有序的&#xff0c;Array数组物理空间上是连续的 List可变不可变&#xff1a; list中不可变的列表是不能修改的 list…

【从零开始的LeetCode-算法】1456. 定长子串中元音的最大数目

给你字符串 s 和整数 k 。 请返回字符串 s 中长度为 k 的单个子字符串中可能包含的最大元音字母数。 英文中的 元音字母 为&#xff08;a, e, i, o, u&#xff09;。 示例 1&#xff1a; 输入&#xff1a;s "abciiidef", k 3 输出&#xff1a;3 解释&#xff1a…