最近我在做一个车上的HMI项目,也就是车机应用,需要与云端和域控进行通信。HMI的功能已经外包了,但消息的统一层留给我自己来做。因为项目组其他人都没有经验,所以这个任务就落到了我头上,尽管我自己也没有太多经验,但也没办法,只能直接上手了。大概架构如下
1、整体设计
设计原则很简单,通过阻塞队列进行交互,因为要和第三方进行对接,这里通过topic
和 bytes
进行交互,让他们自己转化 看下消息队列定义,也没啥,就是两个阻塞队列,通过队列和第三方交互。
public class MsgQueue
{
// 用于存储接收到的二进制消息
private static BlockingCollection<Msg> recMessageQueue = new BlockingCollection<Msg>();
private static BlockingCollection<Msg> sendMessageQueue = new BlockingCollection<Msg>();
public static BlockingCollection<Msg> RecvMessageQueue
{
get { return recMessageQueue; }
}
public static BlockingCollection<Msg> SendMessageQueue
{
get { return sendMessageQueue; }
}
}
2、zeroMq功能实现
这次的需求是和域控进行通信,主要使用发布订阅模式,也就是我本地需要一个client
,一个server
。
2.1 zeroMq 介绍
第一次使用zeroMq
,稍微介绍下;ZeroMQ
是一个高性能的异步消息库,旨在简化分布式或多线程应用程序中的消息传递。它提供了一种灵活且高效的方式来进行数据交换,支持多种消息模式,能够在不同的进程、机器和网络之间进行通信。以下是 ZeroMQ
的一些关键特性和概念:ZeroMQ
支持多种消息模式,包括:
-
请求-响应(Req-Rep):客户端发送请求,服务器处理并回复。
-
发布-订阅(Pub-Sub):发布者发布消息,订阅者接收感兴趣的消息。
-
推送-拉取(Push-Pull):用于分布式任务处理,推送端将任务发送到拉取端。
-
管道(Pipeline):将多个组件连接起来形成数据处理管道。简单说就是一个
TCP
通信的框架,可以在本地作为客户端和服务器 官方网站:https://zeromq.org/languages/csharp/
2.2 插件介绍
zeromq
的在C#
上主要是通过netMq
库,这玩意好多年不更新了 具体地址:https://github.com/zeromq/netmq 这玩意折腾了好久,第一次上手,主要要注意版本,通过Nuget 安装
Install-Package NetMQ
不知道为什么我安装不成功,unity里还是无法使用,我直接拷贝了dll
到plugins
拷贝的时候注意依赖项,总共有三个,要不然会报错
2.3 代码实现
using NetMQ;
using NetMQ.Sockets;
using System;
using System.Text;
using System.Threading.Tasks;
using UnityEngine;
public class ZeroStarter : MonoBehaviour
{
private SubscriberSocket subscriber;
private PublisherSocket publisherSocket;
private bool isRunning = true;
void Start()
{
AsyncIO.ForceDotNet.Force(); // 确保 NetMQ 在 Unity 中正确工作
// 初始化发布者
publisherSocket = new PublisherSocket();
publisherSocket.Bind("tcp://*:5557");
subscriber = new SubscriberSocket();
subscriber.Connect("tcp://localhost:5556");
// 启动发布和订阅任务
Task.Run(() => StartPub());
Task.Run(() => StartSubscriber());
}
private void StartPub()
{
while (isRunning)
{
try
{
// 使用 BlockingCollection 的 Take 方法获取消息
Msg message = MsgQueue.SendMessageQueue.Take();
publisherSocket.SendMoreFrame(message.Topic).SendFrame(message.Data);
}
catch (Exception e)
{
Debug.LogError("Error in StartPub: " + e.Message);
}
}
publisherSocket?.Close(); // 确保发布套接字在退出时关闭
}
private void StartSubscriber()
{
subscriber.Subscribe("");
while (isRunning)
{
try
{
// 通过超时和 TryReceiveFrameString 检查订阅消息
if (subscriber.TryReceiveFrameString(out string topic))
{
byte[] bytes = subscriber.ReceiveFrameBytes();
MsgQueue.RecvMessageQueue.Add(new Msg(topic, bytes));
string str = Encoding.Default.GetString(bytes);
Debug.Log($"{topic} {str}");
}
Task.Delay(10).Wait(); // 使用短暂的延迟来避免过度循环
}
catch (Exception e)
{
Debug.LogError("Error in StartSubscriber: " + e.Message);
}
}
subscriber.Close(); // 手动关闭订阅者套接字
}
private void OnDestroy()
{
isRunning = false; // 设置标志位,通知任务退出
// 确保发布套接字关闭
publisherSocket?.Close();
NetMQConfig.Cleanup(); // 清理 NetMQ
}
}
这里有一个注意点就是zeroMq 不支持topic, 但是支持 SendMoreFrame(message.Topic).SendFrame(message.Data) 这个有点风险,也很奇怪,但是貌似是官方推荐的做法,不纠结,就这样吧。
3、Mqtt功能实现
3.1 emqx介绍
mqtt也是一个消息队列,主要是用在IOT
,简单来说就是一个TCP
服务器,消息头会小一些。适合不稳定的网络。我们用的是Emqx
,还行,挺好用。工具客户端使用的是mqttx
,下载地址:https://mqttx.app/zh 官方网站:https://www.emqx.com/zh
3.2 unity插件
这里使用的是官方推荐的插件库 sdk 介绍地址:https://docs.emqx.com/zh/emqx/latest/connect-emqx/introduction.html
官方示例:https://github.com/emqx/MQTT-Client-Examples
我这里使用的是:https://github.com/eclipse/paho.mqtt.m2mqtt 我也使用nuget安装了库,也不起作用,不知道为毛,盲猜是版本的问题,打开上面库,下载之后自己编译 因为用unity
,所以打开了mono
的库
编译之后
将生成的dll
拷贝到plugins
下
3.3 代码实现
using System.Text;
using UnityEngine;
using uPLibrary.Networking.M2Mqtt;
using uPLibrary.Networking.M2Mqtt.Messages;
public class MqttStarter : MonoBehaviour
{
// Start is called before the first frame update
void Start()
{
string ipAddr = "192.168.3.8";
int emqxPort = 1883;
string clientId = "csharpclientid";
//服务器默认密码是这个
string username = "username";
string password = "pwd";
MqttClient client = new MqttClient(ipAddr, emqxPort, false, null, null, MqttSslProtocols.None);
// register to message received
client.MqttMsgPublishReceived += client_MqttMsgPublishReceived;
client.Connect(clientId,username,password);
//
client.Subscribe(
new string[] { "idse/cloud2veh/#" },
new byte[] { MqttMsgBase.QOS_LEVEL_AT_MOST_ONCE });
}
private void client_MqttMsgPublishReceived(object sender, MqttMsgPublishEventArgs e)
{
MsgQueue.RecvMessageQueue.Add(new Msg(e.Topic, e.Message));
Debug.Log("mqtt--> " +e.Topic +" ==== >" + Encoding.Default.GetString(e.Message));
}
}
这里需要注意的是m2mqtt
会自动开启线程,不需要单独实现线程。其他的都是常规操作。
4、验证
先看下zeromq
的收发
验证下Mqtt的收发
5、Java代码zeromq的使用
pom中加入
<dependency>
<groupId>org.zeromq</groupId>
<artifactId>jeromq</artifactId>
<version>0.5.2</version>
</dependency>
zeromq 发布者
package org.example;
/**
* Hello world!
*
*/
import org.zeromq.ZMQ;
import org.zeromq.ZContext;
public class App {
public static void main(String[] args) {
int i = 0;
// 创建一个ZeroMQ的上下文
try (ZContext context = new ZContext()) {
// 创建一个发布者socket
ZMQ.Socket publisher = context.createSocket(ZMQ.PUB);
// 绑定到指定的端口
String address = "tcp://*:5556";
publisher.bind(address);
System.out.println("Publisher started at " + address);
// 持续发送消息
int messageNumber = 0;
while (!Thread.currentThread().isInterrupted()) {
String message = "Message " + messageNumber;
publisher.sendMore("t:"+ (messageNumber%2));
publisher.send("abc".getBytes());
System.out.println("Sent: " + message);
// 增加消息计数并休眠1秒
messageNumber++;
Thread.sleep(1000);
}
// 关闭发布者socket
publisher.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
zeromq 订阅者
package org.example;
import org.zeromq.ZMQ;
import org.zeromq.ZContext;
import java.nio.ByteBuffer;
public class ZmqSubscriber {
public static void main(String[] args) {
// 创建一个ZeroMQ的上下文
try (ZContext context = new ZContext()) {
// 创建一个订阅者socket
ZMQ.Socket subscriber = context.createSocket(ZMQ.SUB);
// 连接到发布者的地址
String address = "tcp://localhost:5557"; // 确保使用正确的地址
subscriber.connect(address);
System.out.println("Subscriber connected to " + address);
// 订阅所有消息
subscriber.subscribe("".getBytes()); // 订阅所有消息,可以根据需要指定主题
// 持续接收消息
while (!Thread.currentThread().isInterrupted()) {
// 接收消息
String topic = subscriber.recvStr(0);
byte[] message = subscriber.recv(0);
ByteBuffer wrap = ByteBuffer.wrap(message);
if (message != null) {
System.out.println("Received: " + topic +" " + wrap.order(java.nio.ByteOrder.LITTLE_ENDIAN).getInt());
}
}
// 关闭订阅者socket
subscriber.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
6、总结
第一次搞zeromq
的消息队列,和平常用的kafka
和rocketmq
差的很多,甚至完全不在同一个讨论方向。
还有一些需要研究
-
在
unity
中nuget
的学习 -
c#
不同平台的学习 -
启动后台线程之后无法关闭,导致
unity
死掉。
byte[] bytes = BitConverter.GetBytes(66666);
-
C#侧生成的bytes 是小端序列