golang开源的可嵌入应用程序高性能的MQTT服务

golang开源的可嵌入应用程序高性能的MQTT服务

什么是MQTT?

MQTT(Message Queuing Telemetry Transport)是一种轻量级的、开放的消息传输协议,设计用于在低带宽、高延迟或不可靠的网络环境中进行通信。MQTT最初由IBM开发,现已成为OASIS标准。
MQTT的设计目标是提供一种简单、轻量、可扩展的协议,适用于各种设备和网络条件。它通常用于物联网(IoT)和传感器网络,其中设备需要以有效的方式进行通信,并且资源(如带宽和电池寿命)可能受到限制。
MQTT的简单设计和适用性使其成为物联网中常用的通信协议之一。它被广泛用于传感器网络、嵌入式设备、移动应用程序和其他场景中,提供了一种可靠、高效的消息传输机制。

什么是Mochi-MQTT

源代码地址:https://github.com/mochi-mqtt/server

Mochi MQTT 是一个完全兼容 MQTT v5的可嵌入的中间件/服务器,完全使用 Go 语言编写,旨在用于遥测和物联网项目的开发。它可以作为独立的二进制文件使用,也可以嵌入到你自己的应用程序中库来使用,经过提出的设计以实现问题的轻量化和快速部署,同时也非常重视代码的质量和可维护性。

用途

物联网项目开发时,常常需要使用MQTT协议对设备接入,在很多场景中,私有化部署物联网系统时资源比较少,性能要求高,一些大型的MQTT服务不满足要求,而且代码不可控。
还有在边缘场景下,需要在边缘网关,边缘控制器设备上部署物联网系统,但是边缘网关的资源很少,内存大约只有4G,所以使用java开发的物联网系统就很难部署上去;使用C/C++开发效率又很低,所以Go语言是最合适的,
Mochi-MQTT刚好又完全是Go编写的开源的,可以嵌入到自己的程序启动。

Mochi MQTT独立部署

Golang的环境配置这里不做说明,请看我前面的博文说明

Mochi MQTT 可以作为独立的中间件使用。只需拉取此仓库代码,然后在 cmd 文件夹中运行 cmd/main.go ,默认将开启下面几个服务端口, tcp (:1883)、websocket (:1882) 和服务状态监控 (:8080) 。

cd cmd
go build -o mqtt && ./mqtt

docker部署

可以从 Docker Hub 仓库中拉取并运行Mochi MQTT官方镜像:

docker pull mochimqtt/server
或者
docker run mochimqtt/server

也提供了一个简单的 Dockerfile,用于运行 cmd/main.go 中的 Websocket(:1882)、TCP(:1883) 和服务端状态信息(:8080)这三个服务监听:

docker build -t mochi:latest .
docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 mochi:latest

嵌入自己项目运行和开发

下载Mochi MQTT包

go get github.com/mochi-mqtt/server/v2

将Mochi MQTT作为包导入使用, 示例代码如下

import (
  mqttServer "github.com/mochi-mqtt/server/v2"
	"github.com/mochi-mqtt/server/v2/listeners"
	"github.com/mochi-mqtt/server/v2/packets"
)

var Server *mqttServer.Server

func ServerMqttInit() {
	// 创建新的 MQTT 服务器。
	Server = mqttServer.New(&mqttServer.Options{
		InlineClient: true, // 启动内联客户端
	})
	
	// 初始化数据库实例
	edge := &edgeHook{deviceDao: deviceDao.NewDeviceRepository(),
		productDao:     productDao.NewProductRepository(),
	}
	// 添加自定义权限方法
	err := Server.AddHook(edge, nil)
	if err != nil {
		log.Fatal(err)
	}

	// 在1883端口上创建一个 TCP 服务端。
	tcp := listeners.NewTCP("t1", ":1883", nil)
	err = Server.AddListener(tcp)
	if err != nil {
		log.Fatal(err)
	}

	// 在1882端口上创建一个 Websocket 服务端。
	ws := listeners.NewWebsocket("ws1", ":1882", nil)
	err = server.AddListener(ws)
	if err != nil {
		log.Fatal(err)
	}

	go func() {
		err := Server.Serve()
		if err != nil {
			log.Fatal(err)
		}
	}()
}

type edgeHook struct {
	mqttServer.HookBase
	deviceDao      deviceDao.DeviceRepository
	productDao     productDao.ProductRepository
}

func (h *edgeHook) ID() string {
	return "mqtt-auth"
}

func (h *edgeHook) Provides(b byte) bool {
	// 实现钩子函数
	return bytes.Contains([]byte{
		//MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
		mqttServer.OnConnectAuthenticate,
		//MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。
		mqttServer.OnACLCheck,
		//在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
		mqttServer.OnSessionEstablish,
		//当客户端因任何原因断开连接时调用。
		mqttServer.OnDisconnect,
		//当客户端向订阅者发布消息后调用。
		mqttServer.OnPublished,
	}, []byte{b})
}

// OnConnectAuthenticate MQTT连接时认证. 当用户尝试与服务器进行身份验证时调用。
func (h *edgeHook) OnConnectAuthenticate(cl *mqttServer.Client, pk packets.Packet) bool {
	username := string(pk.Connect.Username)
	password := string(pk.Connect.Password)
	if username == "" || len(username) == 0 {
		return false
	}
	if password == "" || len(password) == 0 {
		return false
	}
	return true
}

// OnACLCheck MQTT topic权限控制. 当用户尝试发布或订阅主题时调用,用来检测ACL规则。
func (h *edgeHook) OnACLCheck(cl *mqttServer.Client, topic string, write bool) bool {
	username := string(cl.Properties.Username)
	if username == "" || len(username) == 0 {
		return false
	}
	if topic == "" || len(topic) == 0 {
		return false
	}
	return true
}

// OnSessionEstablish 在新客户端连接并进行身份验证后,会立即调用此方法,并在会话建立和发送CONNACK之前立即调用。
func (h *edgeHook) OnSessionEstablish(cl *mqttServer.Client, pk packets.Packet) {
	username := string(cl.Properties.Username)
	if username == "" || len(username) == 0 {
		return
	}
	//设备连接MQTT成功后保存设备在线状态
}

// OnDisconnect 当客户端因任何原因断开连接时调用。
func (h *edgeHook) OnDisconnect(cl *mqttServer.Client, err error, expire bool) {
	username := string(cl.Properties.Username)
	if username == "" || len(username) == 0 {
		return
	}
	//设备断开MQTT成功后保存设备离线状态
}

// OnPublished 当客户端向订阅者发布消息后调用。
func (h *edgeHook) OnPublished(cl *mqttServer.Client, pk packets.Packet) {
	Log.Infof("mqtt server OnPublished info topic=%s, msg=%s", pk.TopicName, string(pk.Payload))
	//收到客户端消息后做业务逻辑处理
}

// 使用内联客户端方式,向MQTT发送消息
func PublishMsg(topic string, msg []byte) bool {
	err := Server.Publish(topic, msg, false, 0)
	if err != nil {
		Log.Errorf("mqtt EdgePublish error=%v, topic=%s, msg=%s", err, topic, msg)
		return false
	}
	return true
}

// 使用内联客户端方式,订阅边缘MQTT消息topic
func SubscribeTopic(topic string, subscriptionId int, callback func(topic string, msg []byte)) {
	callbackFn := func(cl *mqttServer.Client, sub packets.Subscription, pk packets.Packet) {
		Log.Info("mqtt EdgeSubscribe received message", "client", cl.ID, "subscriptionId", sub.Identifier,
			"topic", pk.TopicName, "payload", string(pk.Payload))
		callback(pk.TopicName, pk.Payload)
	}
	_ = Server.Subscribe(topic, subscriptionId, callbackFn)
}

// 使用内联客户端方式,取消订阅边缘MQTT消息topic
func UnsubscribeTopic(topic string, subscriptionId int) {
	_ = Server.Unsubscribe(topic, subscriptionId)
}

func main() {
	// 创建信号用于等待服务端关闭信号
  sigs := make(chan os.Signal, 1)
  done := make(chan bool, 1)
  signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
  go func() {
    <-sigs
    done <- true
  }()
  
  <-done
	Log.Error("caught signal, stopping...")
	Server.Close()
	Log.Error("main.go finished")
}

监控MQTT指标信息

mqttRouters := r.Group("/mqtt", func(context *gin.Context) {})
	{
		mqttRouters.GET("stats", func(c *gin.Context) {
			util.R(c, nil, mqtt.Server.Info)
		})
	}

在这里插入图片描述

详情使用指南请看:https://github.com/mochi-mqtt/server

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/361233.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

python webdriver 测试框架数据驱动json文件驱动的方式

简介&#xff1a; 数据驱动excel驱动方式,就是数据配置在excel里面&#xff0c;主程序调用的时候每次用从excel里取出的数据作为参数&#xff0c;进行操作&#xff0c; 需要掌握的地方是对excel的操作&#xff0c;要灵活的找到目标数据 测试数据.xlsx: 路径-D:\test\0627 E…

产品原型图设计规范大全

目前&#xff0c;市场上许多产品经理或设计师都在使用一些优秀的原型设计规范&#xff0c;这些规范几乎涵盖了原型设计的许多方面。一套好的、完整的原型设计规范可以统一产品设计风格&#xff0c;检验产品的可用性&#xff0c;有效提高产品经理绘制原型图的效率&#xff0c;更…

力扣238. 除自身以外数组的乘积(前后缀和)

Problem: 238. 除自身以外数组的乘积 文章目录 题目描述思路复杂度Code 题目描述 思路 思路1&#xff1a; 1.先求取数组的包括当前下标值得前后缀乘积&#xff08;利用两个数组记录下来分别为leftProduct和rightProduct&#xff09; 2.当求取一个下标为i的数组中的元素&#x…

构建基于Flask的跑腿外卖小程序

跑腿外卖小程序作为现代生活中的重要组成部分&#xff0c;其技术实现涉及诸多方面&#xff0c;其中Web开发框架是至关重要的一环。在这篇文章中&#xff0c;我们将使用Python的Flask框架构建一个简单的跑腿外卖小程序的原型&#xff0c;展示其基本功能和实现原理。 首先&…

linux --中断管理 -- irq的自动探测机制

irq自动探测机制 如果一个设备的驱动程序无法确定它说管理的设备的软件中断号irq&#xff0c;此时设备驱动程序可以使用irq的自动探测机制来获取其正在使用的irq。 使用自动探测机制的条件 内核与驱动&#xff0c;必须共同努力才能完成只限于非共享中断的情况 探测前&#…

如何查看某一页面在在谷歌有哪些关键词

随着跨境贸易的不断发展&#xff0c;谷歌SEO也被越来越多的人群所了解&#xff0c;所接受。我们在日常操作SEO的时候&#xff0c;往往都会远见这样的事情&#xff0c;那就是自己网站的某一个页面原本只是简单的承载着某一个关键词&#xff0c;但是随着时间的推移&#xff0c;这…

Shell脚本之 -------------免交互操作

一、Here Document 1.Here Document概述 Here Document 使用I/O重定向的方式将命令列表提供给交互式程序 Here Document 是标准输 入的一种替代品&#xff0c;可以帮助脚本开发人员不必使用临时文件来构建输入信息&#xff0c;而是直接就地 生产出一个文件并用作命令的标准…

Linux——动静态库

在进行开发过程中&#xff0c;我们不可避免地会使用到人家的库&#xff0c;那么库到底是什 么&#xff1f;而库又分为动态库和静态库&#xff0c;那么这两个又是什么&#xff1f;这篇博客由我来 简单介绍动静态库。文章目录 1. 库2. 静态库a. 静态库的制作b. 使用静态库 3. 动态…

打击者H5小游戏

欢迎来到程序小院 打击者 玩法&#xff1a;点击飞机上下左右移动躲过子弹射击&#xff0c;打掉上方敌人飞机&#xff0c; 遇到药包会增加能量&#xff0c;弹药包会升级武器&#xff0c;快去射击吧^^。开始游戏https://www.ormcc.com/play/gameStart/262 html <div id"…

基于矢量控制的交流电机驱动simulink建模与仿真

目录 1.课题概述 2.系统仿真结果 3.核心程序与模型 4.系统原理简介 4.1 交流电机基础 4.2 矢量控制原理 4.3 矢量控制的实现 5.完整工程文件 1.课题概述 基于矢量控制的交流电机驱动simulink建模与仿真。系统仿真输出电压&#xff0c;电流&#xff0c;电机转速以及扭矩…

语言革命:NLP与GPT-3.5如何改变我们的世界

文章目录 &#x1f4d1;前言一、技术进步与应用场景1.1 技术进步1.2 应用场景 二、挑战与前景三、伦理和社会影响四、实践经验五、总结与展望 &#x1f4d1;前言 自然语言处理&#xff08;Natural Language Processing&#xff0c;NLP&#xff09;是人工智能领域的一个重要分支…

快速入门存内计算—助力人工智能加速深度学习模型的训练和推理

存内计算&#xff1a;提高计算性能和能效的新技术 传统的计算机架构是将数据存储在存储器中&#xff0c;然后将数据传输到计算单元进行处理。这种架构存在一个性能瓶颈&#xff0c;即数据传输延迟。存内计算通过将计算单元集成到存储器中&#xff0c;消除了数据传输延迟&#…

中国的茶文化:现代生活中的茶文化

中国的茶文化&#xff1a;现代生活中的茶文化 引言 在现代社会的快节奏生活中&#xff0c;茶文化并未随时间流逝而褪色&#xff0c;反而以其独特的方式融入了全球各地人们的日常生活。它超越了饮品本身的范畴&#xff0c;成为一种连接历史、人文与现代生活方式的艺术形式。本文…

Git 介绍 与 配置

Git 介绍 Git是一个分布式版本控制系统&#xff0c;用于跟踪文件的更改和协作开发。它可以管理项目的版本历史记录&#xff0c;并允许多个开发者在同一时间进行并行开发。 解决上图产生的问题就出现了git 分布式版本控制系统 看下图 Git 配置 Git的基本配置包括用户名和电子邮…

Linux split命令 切割文件

目录 一. 主要配置项二. 按照行数切割文件三. 按照指定大小切割文件 一. 主要配置项 ⏹将文件按照行数或者大小切割为若干份小文件&#xff0c;主要作用就是用来切割文件 -l&#xff1a;表示将文件按照行分割-d&#xff1a;表示使用数字作为分割后的文件名后缀, 而不是默认的…

BUUCTF-Real-[ThinkPHP]5-Rce

1、ThinkPHP检测工具 https://github.com/anx0ing/thinkphp_scan 漏洞检测 通过漏洞检测&#xff0c;我们发现存在rce漏洞&#xff01; 2、漏洞利用 ---- [!] Name: Thinkphp5 5.0.22/5.1.29 Remote Code Execution VulnerabilityScript: thinkphp5022_5129.pyUrl: http://n…

跟着cherno手搓游戏引擎【16】Camera和Uniform变量的封装

相机封装&#xff1a; OrthographicCamera.h: #pragma once #include <glm/glm.hpp> namespace YOTO {class OrthographicCamera{public:OrthographicCamera(float left,float right , float bottom,float top);const glm::vec3& GetPosition()const { return m_Pos…

阿赵UE学习笔记——13、贴花

阿赵UE学习笔记目录 大家好&#xff0c;我是阿赵。   继续学习虚幻引擎的使用。这次介绍一种特殊的材质类型&#xff0c;贴花。 一、获取贴花资源 在没有分析贴花的原理之前&#xff0c;可以先去获得一些免费的贴花资源来使用&#xff0c;比如在Quixel上面就有专门的一个资源…

rp-bf:一款Windows下辅助进行ROP gadgets搜索的Rust库

关于rp-bf rp-bf是一款Windows下辅助进行ROP gadgets搜索的Rust库&#xff0c;该工具可以通过模拟Windows用户模式下的崩溃转储来爆破枚举ROP gadgets。 在很多系统安全测试场景中&#xff0c;研究人员成功劫持控制流后&#xff0c;通常需要将堆栈数据转移到他们所能够控制的…

iZotope RX 10.4.2 mac激活版 音频修复和增强工具

iZotope RX 10 for Mac是一款专业的音频修复软件&#xff0c;旨在提供强大、精确的工具&#xff0c;让用户能够清晰、纯净地处理音频。以下是其主要功能和特点&#xff1a; 软件下载&#xff1a;iZotope RX 10.4.2 mac激活版下载 强大的降噪功能&#xff1a;iZotope RX 10采用了…