unity 中使用zeroMq和Mqtt 进行通讯

最近我在做一个车上的HMI项目,也就是车机应用,需要与云端和域控进行通信。HMI的功能已经外包了,但消息的统一层留给我自己来做。因为项目组其他人都没有经验,所以这个任务就落到了我头上,尽管我自己也没有太多经验,但也没办法,只能直接上手了。大概架构如下

图片

1、整体设计

设计原则很简单,通过阻塞队列进行交互,因为要和第三方进行对接,这里通过topicbytes 进行交互,让他们自己转化 看下消息队列定义,也没啥,就是两个阻塞队列,通过队列和第三方交互。

public class MsgQueue 
{
    // 用于存储接收到的二进制消息
    private static BlockingCollection<Msg> recMessageQueue = new BlockingCollection<Msg>();
    private static BlockingCollection<Msg> sendMessageQueue = new BlockingCollection<Msg>();

    public static BlockingCollection<Msg> RecvMessageQueue
    {
        get { return recMessageQueue; }
    }

    public static BlockingCollection<Msg> SendMessageQueue
    {
        get { return sendMessageQueue; }
    }
}

2、zeroMq功能实现

这次的需求是和域控进行通信,主要使用发布订阅模式,也就是我本地需要一个client,一个server

2.1 zeroMq 介绍

第一次使用zeroMq ,稍微介绍下;ZeroMQ 是一个高性能的异步消息库,旨在简化分布式或多线程应用程序中的消息传递。它提供了一种灵活且高效的方式来进行数据交换,支持多种消息模式,能够在不同的进程、机器和网络之间进行通信。以下是 ZeroMQ 的一些关键特性和概念:ZeroMQ 支持多种消息模式,包括:

  • 请求-响应(Req-Rep):客户端发送请求,服务器处理并回复。

  • 发布-订阅(Pub-Sub):发布者发布消息,订阅者接收感兴趣的消息。

  • 推送-拉取(Push-Pull):用于分布式任务处理,推送端将任务发送到拉取端。

  • 管道(Pipeline):将多个组件连接起来形成数据处理管道。简单说就是一个TCP通信的框架,可以在本地作为客户端和服务器 官方网站:https://zeromq.org/languages/csharp/

2.2 插件介绍

zeromq的在C#上主要是通过netMq库,这玩意好多年不更新了 具体地址:https://github.com/zeromq/netmq 这玩意折腾了好久,第一次上手,主要要注意版本,通过Nuget 安装

Install-Package NetMQ

不知道为什么我安装不成功,unity里还是无法使用,我直接拷贝了dllplugins拷贝的时候注意依赖项,总共有三个,要不然会报错

图片

2.3 代码实现

using NetMQ;
using NetMQ.Sockets;
using System;
using System.Text;
using System.Threading.Tasks;
using UnityEngine;

public class ZeroStarter : MonoBehaviour
{
    private SubscriberSocket subscriber;
    private PublisherSocket publisherSocket;
    private bool isRunning = true;

    void Start()
    {
        AsyncIO.ForceDotNet.Force(); // 确保 NetMQ 在 Unity 中正确工作

        // 初始化发布者
        publisherSocket = new PublisherSocket();
        publisherSocket.Bind("tcp://*:5557");

        subscriber = new SubscriberSocket();
        subscriber.Connect("tcp://localhost:5556");

        // 启动发布和订阅任务
        Task.Run(() => StartPub());
        Task.Run(() => StartSubscriber());
    }

    private void StartPub()
    {
        while (isRunning)
        {
            try
            {
                // 使用 BlockingCollection 的 Take 方法获取消息
                Msg message = MsgQueue.SendMessageQueue.Take();
                publisherSocket.SendMoreFrame(message.Topic).SendFrame(message.Data);
            }
            catch (Exception e)
            {
                Debug.LogError("Error in StartPub: " + e.Message);
            }
        }
        publisherSocket?.Close(); // 确保发布套接字在退出时关闭
    }

    private void StartSubscriber()
    {

        subscriber.Subscribe("");

        while (isRunning)
        {
            try
            {
                // 通过超时和 TryReceiveFrameString 检查订阅消息
                if (subscriber.TryReceiveFrameString(out string topic))
                {
                    byte[] bytes = subscriber.ReceiveFrameBytes();
                    MsgQueue.RecvMessageQueue.Add(new Msg(topic, bytes));
                    string str = Encoding.Default.GetString(bytes);
                    Debug.Log($"{topic} {str}");
                }
                Task.Delay(10).Wait(); // 使用短暂的延迟来避免过度循环
            }
            catch (Exception e)
            {
                Debug.LogError("Error in StartSubscriber: " + e.Message);
            }
        }

        subscriber.Close(); // 手动关闭订阅者套接字
    }

    private void OnDestroy()
    {
        isRunning = false; // 设置标志位,通知任务退出

        // 确保发布套接字关闭
        publisherSocket?.Close();
        NetMQConfig.Cleanup(); // 清理 NetMQ
    }
}

这里有一个注意点就是zeroMq 不支持topic, 但是支持 SendMoreFrame(message.Topic).SendFrame(message.Data) 这个有点风险,也很奇怪,但是貌似是官方推荐的做法,不纠结,就这样吧。

3、Mqtt功能实现

3.1 emqx介绍

mqtt也是一个消息队列,主要是用在IOT,简单来说就是一个TCP服务器,消息头会小一些。适合不稳定的网络。我们用的是Emqx ,还行,挺好用。工具客户端使用的是mqttx,下载地址:https://mqttx.app/zh 官方网站:https://www.emqx.com/zh

3.2 unity插件

这里使用的是官方推荐的插件库 sdk 介绍地址:https://docs.emqx.com/zh/emqx/latest/connect-emqx/introduction.html

官方示例:https://github.com/emqx/MQTT-Client-Examples

我这里使用的是:https://github.com/eclipse/paho.mqtt.m2mqtt 我也使用nuget安装了库,也不起作用,不知道为毛,盲猜是版本的问题,打开上面库,下载之后自己编译 因为用unity,所以打开了mono的库

图片

编译之后

图片

将生成的dll拷贝到plugins

3.3 代码实现

using System.Text;
using UnityEngine;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;

public class MqttStarter : MonoBehaviour
{
    // Start is called before the first frame update
    void Start()
    {
        string ipAddr = "192.168.3.8";
        int emqxPort = 1883;
        string clientId = "csharpclientid";
        //服务器默认密码是这个
        string username = "username";
        string password = "pwd";
        MqttClient client = new MqttClient(ipAddr, emqxPort,  false, null, null, MqttSslProtocols.None);

        // register to message received
        client.MqttMsgPublishReceived += client_MqttMsgPublishReceived;

        client.Connect(clientId,username,password);

        // 
        client.Subscribe(
            new string[] { "idse/cloud2veh/#" }, 
            new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });

    }

    private void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
    {
        MsgQueue.RecvMessageQueue.Add(new Msg(e.Topic, e.Message));
        Debug.Log("mqtt--> " +e.Topic +"  ==== >" + Encoding.Default.GetString(e.Message));
    }
}

这里需要注意的是m2mqtt 会自动开启线程,不需要单独实现线程。其他的都是常规操作。

4、验证

先看下zeromq的收发

图片

验证下Mqtt的收发

图片

5、Java代码zeromq的使用

pom中加入

   <dependency>
      <groupId>org.zeromq</groupId>
      <artifactId>jeromq</artifactId>
      <version>0.5.2</version>
    </dependency>

zeromq 发布者

package org.example;

/**
 * Hello world!
 *
 */
import org.zeromq.ZMQ;
import org.zeromq.ZContext;

public class App {
    public static void main(String[] args) {
        int i = 0;
        // 创建一个ZeroMQ的上下文
        try (ZContext context = new ZContext()) {
            // 创建一个发布者socket
            ZMQ.Socket publisher = context.createSocket(ZMQ.PUB);

            // 绑定到指定的端口
            String address = "tcp://*:5556";
            publisher.bind(address);
            System.out.println("Publisher started at " + address);

            // 持续发送消息
            int messageNumber = 0;
            while (!Thread.currentThread().isInterrupted()) {
                String message = "Message " + messageNumber;
                publisher.sendMore("t:"+ (messageNumber%2));
                publisher.send("abc".getBytes());
                System.out.println("Sent: " + message);

                // 增加消息计数并休眠1秒
                messageNumber++;
                Thread.sleep(1000);
            }

            // 关闭发布者socket
            publisher.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

zeromq 订阅者

package org.example;

import org.zeromq.ZMQ;
import org.zeromq.ZContext;

import java.nio.ByteBuffer;

public class ZmqSubscriber {
    public static void main(String[] args) {
        // 创建一个ZeroMQ的上下文
        try (ZContext context = new ZContext()) {
            // 创建一个订阅者socket
            ZMQ.Socket subscriber = context.createSocket(ZMQ.SUB);

            // 连接到发布者的地址
            String address = "tcp://localhost:5557"; // 确保使用正确的地址
            subscriber.connect(address);
            System.out.println("Subscriber connected to " + address);

            // 订阅所有消息
            subscriber.subscribe("".getBytes()); // 订阅所有消息,可以根据需要指定主题

            // 持续接收消息
            while (!Thread.currentThread().isInterrupted()) {
                // 接收消息
                String topic = subscriber.recvStr(0);
                byte[] message = subscriber.recv(0);
                ByteBuffer wrap = ByteBuffer.wrap(message);

                if (message != null) {
                    System.out.println("Received: " + topic +"  " + wrap.order(java.nio.ByteOrder.LITTLE_ENDIAN).getInt());
                }
            }

            // 关闭订阅者socket
            subscriber.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

6、总结

第一次搞zeromq的消息队列,和平常用的kafkarocketmq 差的很多,甚至完全不在同一个讨论方向。

还有一些需要研究

  • unitynuget的学习

  • c# 不同平台的学习 

  • 启动后台线程之后无法关闭,导致unity死掉。

byte[] bytes = BitConverter.GetBytes(66666);
  • C#侧生成的bytes 是小端序列

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/909081.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Java | Leetcode Java题解之第526题优美的排列

题目&#xff1a; 题解&#xff1a; class Solution {public int countArrangement(int n) {int[] f new int[1 << n];f[0] 1;for (int mask 1; mask < (1 << n); mask) {int num Integer.bitCount(mask);for (int i 0; i < n; i) {if ((mask & (1…

命令行参数、环境变量、地址空间

命令行参数&#xff1a; int main(int argc, char *argv[ ])&#xff0c;main的参数可带可不带。argc参数通常代表后面的char *argv的元素个数有多少。 在linux中会把输入的字符串存到char *argv[ ]中&#xff0c;在数组的结尾为NULL。 命令行参数可以让同一个程序可以通过不同…

软件测试学习笔记丨SeleniumPO模式

本文转自测试人社区&#xff0c;原文链接&#xff1a;https://ceshiren.com/t/topic/22525 本文为霍格沃兹测试开发学社的学习经历分享&#xff0c;写出来分享给大家&#xff0c;希望有志同道合的小伙伴可以一起交流技术&#xff0c;一起进步~ 说明&#xff1a;本篇博客基于sel…

网络自动化03:简单解释send_config_set方法并举例

目录 拓扑图设备信息 netmiko涉及方法send_config_set()方法的简单示例代码输出结果代码解释导入模块配置信息config_device_interface_description 函数主程序块总结 send_config_set方法参数&#xff1a;1. enter_config_mode2. config_commands3. enter_config_mode4. error…

UI自动化测试 —— CSS元素定位实践!

前言 自动化测试元素定位是指在自动化测试过程中&#xff0c;通过特定的方法或策略来准确识别和定位页面上的元素&#xff0c;以便对这些元素进行进一步的操作或断言。这些元素可以是文本框、按钮、链接、图片等HTML页面上的任何可见或不可见的组件。 在自动化测试中&#xf…

zxing生成、解析二维码,条形码

1、maven依赖 <!--zxing依赖--><dependency><groupId>com.google.zxing</groupId><artifactId>core</artifactId><version>3.1.0</version></dependency><dependency><groupId>com.google.zxing</groupI…

有效增加网站流量的实用策略和技巧

内容概要 在数字化时代&#xff0c;网站流量的增加被视为在线业务成功的关键。网站流量不仅仅意味着访问者的数量&#xff0c;还影响着品牌知名度、用户参与度和销售转化率。针对这一需求&#xff0c;企业需要采取行之有效的策略&#xff0c;例如搜索引擎优化&#xff08;SEO&…

玄机-应急响应- Linux入侵排查

一、web目录存在木马&#xff0c;请找到木马的密码提交 到web目录进行搜索 find ./ type f -name "*.php" | xargs grep "eval(" 发现有三个可疑文件 1.php看到密码 1 flag{1} 二、服务器疑似存在不死马&#xff0c;请找到不死马的密码提交 被md5加密的…

从 vue 源码看问题 — vue 如何进行异步更新?

前言 在上一篇 如何理解 vue 响应式&#xff1f; 中&#xff0c;了解到响应式其实是通过 Observer 类中调用 defineReactive() 即 Object.defineProperty() 方法为每个目标对象的 key&#xff08;key 对应的 value 为非数组的&#xff09; 设置 getter 和 setter 实现拦截&…

本地部署bert-base-chinese模型交互式问答,gradio

首先下载bert-base-chinese&#xff0c;可以在 Huggingface, modelscope, github下载 pip install gradio torch transformers import gradio as gr import torch from transformers import BertTokenizer, BertForQuestionAnswering# 加载bert-base-chinese模型和分词器 mod…

SYN590RH是SYNOXO全新开发设计的一款宽电压范围,低功耗,高性能,无需外置AGC电容de单芯片ASK或00 K射频接收器

一般描述 SYN590RH是SYNOXO全新开发设计的一款宽电压范围&#xff0c;低功耗&#xff0c;高性能&#xff0c;无需外置AGC电容&#xff0c;灵敏度达到典型-110 dBm,400MHz~450MHz频率范围应用的单芯片ASK或00 K射频接收器。 SYN590RH是一款典型的即插即用型单片高…

Unreal5从入门到精通之如何在指定的显示器上运行UE程序

前言 我们有一个设备,是一个带双显示器的机柜,主显示器是一个小竖屏,可以触屏操作,大显示器是一个普通的横屏显示器。我们用这个机柜的原因就是可以摆脱鼠标和键盘,直接使用触屏操作,又可以在大屏观看,非常适合用于教学。 然后我们为这款机柜做了很多个VR项目,包括Uni…

C++中unordered_map和unordered_set的介绍以及用哈希表封装实现unordered_map和unordered_set

目录 1.unordered_map和unordered_set的使用 1.1unordered_set类的介绍 1.2unordered_set和set的使用差异 1.3unordered_map和map的使用差异 1.4unordered_multimap/unordered_multiset 2.用哈希表封装实现unordered_set和unordered_map 2.1实现出复用哈希表的框架并支持…

stm32学习4

学习目录 一.流水灯1.创建文件2.编写相关代码 一.流水灯 1.创建文件 将方法进行分类保存在不同的 .c 文件中&#xff0c;方便复用和寻找&#xff1b; 创建Hardware\LED文件&#xff0c;其中有led.c和led.h文件&#xff0c;用于存放有关LED灯操作的方法&#xff1b; 在User文…

JVM结构图

JVM&#xff08;Java虚拟机&#xff09;是Java编程语言的核心组件之一&#xff0c;负责将Java字节码翻译成机器码并执行。JVM由多个子系统组成&#xff0c;包括类加载子系统、运行时数据区、执行引擎、Java本地接口和本地方法库。 类加载子系统&#xff08;Class Loading Subsy…

mysql之命令行基础指令

一&#xff1a;安装好mysql后&#xff0c;注册好账号密码。 二&#xff1a;在命令行进行登录的指令如下 mysql -u用户名 -p 例如&#xff1a;mysql -uroot -p; 然后按下回车&#xff0c;进入输入密码。 三&#xff1a;基本指令&#xff1a; 1&#xff1a;查看当前账户的所有…

LabVIEW适合开发的软件

LabVIEW作为一种图形化编程环境&#xff0c;主要用于测试、测量和控制系统的开发。以下是LabVIEW在不同应用场景中的适用性和优势。 一、测试与测量系统 LabVIEW在测试与测量系统中的应用广泛&#xff0c;是工程测试领域的主流工具之一。利用其强大的数据采集与处理功能&…

MySQL表的增删改查(CRUD3约束)

这次我们开始先不复习嗷&#xff0c;等到把数据表的删除说完咱们统一&#xff0c;总结书写 1.数据表的删除&#xff1a; 语法&#xff1a; 1. 使用 DROP TABLE 语句删除单个表 基本语法&#xff1a;DROP TABLE [IF EXISTS] table_name; table_name是要删除的表的名称。IF EXIS…

国内首位聋人 Android 软件工程师体验通义灵码,“这真是太棒了”

Hi 大家好&#xff01; 我就是人见人爱、Bug 闪开的通义灵码&#xff01; 上个月&#xff0c;我上线了一项新能力&#xff1a; 体验通义灵码 workspace&#xff1a;轻松分析项目结构&#xff0c;结合代码仓库理解工程、查询问答等 补充说明&#xff1a;当你需要快速了解一个工…