基于MQTT协议实现微服务架构事件总线

一、场景描述

昨天在博客《客户端订阅服务端事件的实现方法》中提出了利用websocket、服务端EventEmitter和客户端mitt实现客户端订阅服务端事件,大大简化了客户端对服务端数据实时响应的逻辑。上述方案适用于单服务节点的情形。

对于由服务集群支撑的微服务架构,websocket提供的点对点通信已无法满足前端订阅后端集群事件的需求,升级方案是使用基于消息总线的通信方式。

在这里插入图片描述

二、几种消息总线适用性比较

常用的消息总线包括Kafka、Redis和基于MQTT协议实现的EMQX。

Kafka和MQTT都是从发布/订阅系统演化而来,但发展侧重点不同。Kafka通过分布式架构提供了海量数据流的存储,并保证数据流顺序,它的设计目标是支持数据发布、订阅和存储。而MQTT用于网络中传输小型数据包,其设计目的是实现简单、可靠的设备间通信。

而Redis是从内存数据库系统演化而来,发布/订阅功能是把消息保存在内存中。与Kafka相比,其只能提供半持久化;与MQTT相比,其通信效率较低。

由于应用场景没有对消息持久化的需求,且考虑到产业大脑平台未来会接入工业互联网,使用MQTT协议来搭建事件总线更利于平台在工业互联网环境下的扩展。

三、MQTT简介

几年前,本人曾写过MQTT简介。本文摘抄其中重要概念。

(一)MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是基于“订阅/发布”模式的轻量级通信协议,该协议基于TCP/IP,能以极低的带宽为海量(百万级)跨域设备提供可靠的消息服务,因此在物联网、小型移动终端、边缘计算方面有广泛应用。
所谓可靠的消息传输,体现为可配置消息的服务质量(QoS),有三种服务质量可选:

  • 至多一次:
    消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。应用场景如环境传感器的数据采集,丢失一次记录无所谓,因为不久后还会有第二次发送。
  • 至少一次:
    确保消息送达订阅者,但消息可能重复,适用于幂等性操作。
  • 只有一次:
    最严格的消息服务质量,确保消息到达且仅到达一次订阅者。应用场景如计费系统等。

MQTT协议中存在三种身份:消息总线(Broker)、发布者(Publish)和订阅者(Subscribe),其中消息总线属于服务器,后两者都属于客户端。发布者和订阅者可以是各种物联网设备和小型终端,消息发布者可以同时也是消息订阅者,如下图所示。

MQTT.png

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

  • Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);
  • payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

订阅消息时,可以在订阅表达式中使用通配符筛选器对主题进行筛选,可同时订阅所匹配的多个主题。
MQTT协议中主要有以下5个方法:

  • connect:客户端建立与服务器的连接
  • disconnect:等待客户端完成工作后,端口与总线的会话
  • subscribe:客户端向消息总线注册订阅主题
  • unsubscribe:客户端等待消息总线取消所注册的订阅
  • publish:客户端向消息总线发送某主题的消息

(二)开源消息总线EMQX

EMQX(Erlang/Enterprise/Elastic MQTT Broker),是基于Erlang语言开发的开源物联网MQTT消息总线。其是一款由前华为员工开发的开源软件,软件主页为https://www.emqx.io/。可根据操作系统类别选择不同版本下载安装,或通过docker部署。

软件安装后,通过 emqx start以后台方式启动。启动后将会开放两个端口:

  • 18083端口为控制台端口,可通过浏览器访问该端口,首次登录的用户名和密码为admin和public。控制台提供了总线监控、用户权限管理、在线客户端订阅/发布等功能。
  • 8083端口为通信端口,MQTT客户端可通过该端口与EMQX消息总线通信。

(三)MQTT.js客户端

MQTT.js是MQTT客户端Nodejs SDK,可在浏览器(ES模块)和Node.js环境(CommonJS模块)下使用,前者可通过MQTT over WebSocket使用,后者既可以通过MQTT over WebSocket使用,也可以直接使用MQTT。区别仅仅是连接参数的协议头不同。

1. 安装和帮助文件

$ pnpm i mqtt -S #安装
$ npx mqtt help  #帮助
MQTT.js command line interface, available commands are:

  * publish     publish a message to the broker
  * subscribe   subscribe for updates from the broker
  * version     the current MQTT.js version
  * help        help about commands

Launch 'mqtt help [command]' to know more about the commands.

2. 使用方法

// const mqtt = require('mqtt') //ES模块
import mqtt from 'mqtt'  //CommonJS模块

// 连接选项
const options = {
      clean: true, // true: 清除会话, false: 保留会话
      connectTimeout: 4000, // 超时时间
      // 认证信息
      clientId: 'user_id', // 要保证唯一性
      // 若在控制台配置了用户名和密码:
      // username: 'xxx',
      // password: 'xxx',
}

// 连接字符串, 通过协议指定使用的连接方式
// ws 未加密 WebSocket 连接
// wss 加密 WebSocket 连接
// mqtt 未加密 TCP 连接
// mqtts 加密 TCP 连接
// wxs 微信小程序连接
// alis 支付宝小程序连接
const connectUrl = 'ws://localhost:8084/mqtt'
const client = mqtt.connect(connectUrl, options)

client.on('reconnect', error => {
  console.error('正在重连:', error)
})

client.on('error', error => {
  console.error('连接失败:', error)
})

//收到消息
client.on('message', (topic, message) => {
  console.log('收到消息:', topic, message.toString()) //message是二进制流,需要转换成字符串
})

//订阅主题
const topic='/user_id/#'
const qos=0  //0:最多交付1次;1:至少交付1次;2:只交付1次
client.subscribe(topic, qos, error=>{  //订阅user_id主题下所有消息
  if(error){
    console.error('订阅主题失败:', error)
    return
  }
  console.log('订阅成功')
})

//发布消息
client.publish('user_id/a',JSON.stringify({a:123}),qos,error=>{
  if(error){
    console.error('发布消息失败:', error)
  }
})

//取消订阅
client.unsubscrib(topic, qos, error=>{
  if(error){
    console.error('取消订阅失败', error)
    return
  }
})

//断开连接
if(client.connected){
  try{
    client.end(false,()=>{
      console.log('成功断开连接')
    })catch(error){
      console.error('断开连接失败:', error)
    }
  }
}

(四)安全性

1. 排它订阅

排它订阅是 EMQX 支持的 MQTT 扩展功能。排它订阅允许对主题进行互斥订阅,一个主题同一时刻仅被允许存在一个订阅者,在当前订阅者未取消订阅前,其他订阅者都将无法订阅对应主题。

2. JWT认证

系统整体采用JWT认证方式,通过一台认证服务器颁发JWT Token。MQTT客户端访问EMQX总线时,携带由认证服务器颁发的JWT Token。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/414874.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

招聘系统架构的设计与实现

在当今竞争激烈的人才市场中,有效的招聘系统对企业吸引、筛选和管理人才至关重要。本文将探讨招聘系统的架构设计与实现,帮助企业构建一个高效、可靠的人才招聘平台。 ## 1. 系统架构设计 ### 1.1 微服务架构 招聘系统通常采用微服务架构,将…

Java JVM虚拟机面试题

Java JVM虚拟机面试题 前言1、ThreadLocal的底层原理和应用?2、Java中的锁池和等待池?3、wait(),yield(),join(),sleep()的区别?4、你们项⽬如何排查JVM问题?5、YGC和FGC发生时间?6、…

11.vue学习笔记(组件生命周期+生命周期应用+动态组件+组件保持存活)

文章目录 1.组件生命周期2.生命周期应用2.1通过ref获取元素DOM结构2.2.模拟网络请求渲染数据 3.动态组件3.1.A,B两个组件 4.组件保持存活(销毁期) 1.组件生命周期 每个Vue组件实例在创建时都需要经历一系列的初始化步骤,比如设置…

探索AI视频模型的无限可能:OpenAI的Sora引领创新浪潮

文章目录 📑前言一、技术解析二、应用场景三、未来展望四、伦理与创意五、用户体验与互动🌤️总结 📑前言 随着人工智能技术的蓬勃发展,AI视频模型正逐渐成为科技领域的新宠。在这个变革的浪潮中,OpenAI推出的首个AI视…

Spring中 Unsupported class file major version 61 报错

初学Spring时遇到的一个错误&#xff1a;Unsupported class file major version 61 &#xff0c;如图所示&#xff1a; 网上查了一下大概是JDK的版本与Spring的版本不一致导致的错误&#xff1b;刚开始我用的Spring版本是&#xff1a; <dependencies><dependency>…

(全部习题答案)研究生英语读写教程基础级教师用书PDF|| 研究生英语读写教程提高级教师用书PDF

研究生英语读写教程基础级教师用书PDF 研究生英语读写教程提高级教师用书PDF pdf下载&#xff08;完整版下载&#xff09; &#xff08;1&#xff09;研究生英语读写教程基础级教师用书PDF &#xff08;2&#xff09;研究生英语读写教程基提高级教师用书PDF

【Pytorch深度学习开发实践学习】Pytorch实现LeNet神经网络(1)

1.model.py import torch.nn as nn import torch.nn.functional as F引入pytorch的两个模块 关于这两个模块的作用&#xff0c;可以参考下面 Pytorch官方文档 torch.nn包含了构成计算图的基本模块 torch,nn.function包括了计算图中的各种主要函数&#xff0c;包括&#…

linux gdb 调试工具

1.写程序 首先&#xff0c;我们先写出一个 .c 或者.cpp程序 如 然后 gcc -g hello.c -o hello 或者 g -g hello.cpp -o hello &#xff08;-g&#xff09;要加 2. gdb调试 用 gdb &#xff08;可执行程序&#xff0c;如hello&#xff09; 进入之后&#xff0c;有…

Window系统安装USB Redirector结合cpolar实现远程访问本地USB设备

文章目录 前言1. 安装下载软件1.1 内网安装使用USB Redirector1.2 下载安装cpolar内网穿透 2. 完成USB Redirector服务端和客户端映射连接3. 设置固定的公网地址 前言 USB Redirector是一款方便易用的USB设备共享服务应用程序&#xff0c;它提供了共享和访问本地或互联网上的U…

物业智能水电抄表管理系统

物业智能水电抄表管理系统是物业管理行业的关键技术之一&#xff0c;其结合了智能化、远程监控和数据分析等功能&#xff0c;为物业管理公司和业主提供了高效、精准的水电抄表管理解决方案。该系统具有多项优势&#xff0c;能够提升物业管理效率&#xff0c;降低成本&#xff0…

[计算机网络]--MAC/ARP/DNS协议

前言 作者&#xff1a;小蜗牛向前冲 名言&#xff1a;我可以接受失败&#xff0c;但我不能接受放弃 如果觉的博主的文章还不错的话&#xff0c;还请点赞&#xff0c;收藏&#xff0c;关注&#x1f440;支持博主。如果发现有问题的地方欢迎❀大家在评论区指正 目录 一、认识以…

UnityWebGL UGUI中文不显示问题

这是Unity编辑中效果 打包成webgl后的效果&#xff08;中文没有显示出来&#xff09; 解决方法 将Unity默认使用的Arial替换成中文字体。 1.找到电脑字体库&#xff08;win电脑字体库路径&#xff1a;C:\Windows\Fonts &#xff1b;Mac电脑搜索“字体册”&#xff09;。 2.将…

TC3xx SMU、PMIC和Tranceiver的功能安全闭环

目录 1.TLF35584安全状态输出响应对象 1.1 响应ERR 收集到的错误信号 1.2 响应监控功能引发的ROT 1.3 响应看门狗引发的错误 1.4 环境过温引发的错误状态 1.5 为什么设计SSx&#xff1f; 2. 安全状态输出给谁 3.小结 在之前文章里&#xff0c;我们简述了TC3xx SMU如何…

js 面试题--事件循环event loop--宏任务和微任务

1 事件循环event loop概念&#xff1a; js 是非阻塞单线程语言&#xff0c;js在执行过程中会产生执行环境&#xff0c;执行环境会按顺序添加到执行栈中&#xff0c;先执行同步栈中的任务&#xff0c;当遇到异步任务时会添加到task队列中&#xff0c;同步栈执行完后&#xff0c…

Python学习系列 -初探标准库之logging库

系列文章目录 第一章 初始 Python 第二章 认识 Python 变量、类型、运算符 第三章 认识 条件分支、循环结构 第四章 认识 Python的五种数据结构 第五章 认识 Python 函数、模块 第六章 认识面向对象三大特性 第七章 初探标准库之os库 第八章 初探标准库之pathlib库 第九章 初探…

【数据结构(C语言)】排序详解

目录 文章目录 前言 一、排序的概念 1.1 排序的概念 1.2 常见的排序算法 二、插入排序 2.1 直接插入排序 2.1.1 基本思想 2.1.2 特性总结 2.1.3 代码实现 2.2 希尔排序 2.2.1 基本思想 2.2.2 特性总结 2.2.3 代码实现 三、选择排序 3.1 直接选择排序 3.1.1…

【数据结构和算法初阶(C语言)】空间复杂度(例题剖析一起探究空间如何评价算法)

目录 1.衔接前言-时间复杂度的回顾 2.关于算法复杂度 3.本文主角-空间复杂度 3.1大O的渐进表示方法 4.空间复杂度例题----实际感受空间复杂度 4.1冒泡排序的空间复杂度 4.2计算递归函数的空间复杂度 4.3动态开辟内存版本求斐波那契数列的空间复杂度 4.4&#xff08;…

蓝桥杯_定时器的基本原理与应用

一 什么是定时器 定时器/计数器是一种能够对内部时钟信号或外部输入信号进行计数&#xff0c;当计数值达到设定要求时&#xff0c;向cpu提出中断处理请求&#xff0c;从而实现&#xff0c;定时或者计数功能的外设。 二 51单片机的定时/计数器 单片机外部晶振12MHZ&#xff0c;…

如何实现双向循环链表

博主主页&#xff1a;17_Kevin-CSDN博客 收录专栏&#xff1a;《数据结构》 引言 双向带头循环链表是一种常见的数据结构&#xff0c;它具有双向遍历的特性&#xff0c;并且在表头和表尾之间形成一个循环。本文将深入探讨双向带头循环链表的结构、操作和应用场景&#xff0c;帮…

el-table通过这样封装可以实现校验-表格校验的原理

我们一般在后台系统中&#xff0c;很常见的操作时表格里面嵌套表单&#xff0c;之前我的网上找到了一些封装的用法&#xff1a; <el-form :model"formData" :rules"ruleData" ref"formDom"><el-table :data"formData.tableData&q…