车联网架构设计(二)_消息缓存

在上一篇博客车联网架构设计(一)_消息平台的搭建-CSDN博客中,我介绍了车联网平台需要实现的一些功能,并介绍了如何用EMQX+HAPROXY来搭建一个MQTT消息平台。车联网平台的应用需要消费车辆发布的消息,同时也会下发消息给车辆,以实现车辆控制等功能。通常我们会在MQTT消息平台收到车辆消息后对消息进行缓存,以供上层应用使用。我们可以直接把消息保存到数据库,或者引入一个消息队列,这样可以方便对应用和车辆之间进行解耦合。

这里我将介绍一下如何引入一个Kafka消息队列,把车辆以及上层应用之间需要交互的消息缓存到这个消息队列之中。

在EMQX的企业版中,提供了丰富的数据桥接功能,可以支持把MQTT消息桥接到其他外部系统,例如Kafka或数据库中。但是在开源版,只提供了很有限的数据桥接,不支持Kafka。为此我们可以通过给EMQX开发Hook extension的方式,来加载我们的插件,实现把数据桥接到Kafak。

在EMQX官网的介绍中,Hook扩展是通过gRPC的方式来实现的,支持多种编程语言。如下图:

这里我以Python为例子,来定义一个扩展。

搭建Kafka

首先是在K8S上部署一个Kafka集群,这里我选择了Strimizi的Kafka operator来部署

先创建一个namespace

kubectl create namespace kafka

安装Operator, CRD以及定义RBAC等

kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka

创建一个只包含一个节点的Kafka

kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka 

打开两个终端,分别运行以下的订阅和发布的指令,测试Kafka是否正常工作

kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic
kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning

开发ExHook

首先是获取当前EMQX版本定义的gPRC proto。在EMQX服务器的/opt/emqx/lib/emqx_exhook-5.0.14/priv/protos/目录下面有一个exhook.proto文件。

运行以下命令来基于这个proto生成python文件

python -m grpc_tools.protoc -I./ --python_out=. --pyi_out=. --grpc_python
_out=. ./exhook.proto

运行之后,在当前目录下会新生成三个文件,exhook_pb2_grpc.py,exhook_pb2.py,exhook_pb2.pyi

新建一个exhook_server.py文件,继承exhook_pb2_grpc里面的HookProviderServicer,注册对应事件的处理方法,如以下代码:

from concurrent import futures
import logging
from multiprocessing.sharedctypes import Value

import grpc

import exhook_pb2
import exhook_pb2_grpc

import pickle
from kafka import KafkaProducer

class HookProvider(exhook_pb2_grpc.HookProviderServicer):
    def __init__(self):
        self.producer = KafkaProducer(bootstrap_servers='my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092')

    def OnProviderLoaded(self, request, context):
        print("OnProviderLoaded:", request)
        '''
        specs = [exhook_pb2.HookSpec(name="client.connect"),
                 exhook_pb2.HookSpec(name="client.connack"),
                 exhook_pb2.HookSpec(name="client.connected"),
                 exhook_pb2.HookSpec(name="client.disconnected"),
                 exhook_pb2.HookSpec(name="client.authenticate"),
                 exhook_pb2.HookSpec(name="client.authorize"),
                 exhook_pb2.HookSpec(name="client.subscribe"),
                 exhook_pb2.HookSpec(name="client.unsubscribe"),

                 exhook_pb2.HookSpec(name="session.created"),
                 exhook_pb2.HookSpec(name="session.subscribed"),
                 exhook_pb2.HookSpec(name="session.unsubscribed"),
                 exhook_pb2.HookSpec(name="session.resumed"),
                 exhook_pb2.HookSpec(name="session.discarded"),
                 exhook_pb2.HookSpec(name="session.takenover"),
                 exhook_pb2.HookSpec(name="session.terminated"),

                 exhook_pb2.HookSpec(name="message.publish"),
                 exhook_pb2.HookSpec(name="message.delivered"),
                 exhook_pb2.HookSpec(name="message.acked"),
                 exhook_pb2.HookSpec(name="message.dropped")
                ]
        '''
        specs = [exhook_pb2.HookSpec(name="message.publish")]
        return exhook_pb2.LoadedResponse(hooks=specs)

    def OnProviderUnloaded(self, request, context):
        print("OnProviderUnloaded:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientConnect(self, request, context):
        print("OnClientConnect:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientConnack(self, request, context):
        print("OnClientConnack:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientConnected(self, request, context):
        print("OnClientConnected:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientDisconnected(self, request, context):
        print("OnClientDisconnected:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientAuthenticate(self, request, context):
        print("OnClientAuthenticate:", request)
        reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)
        return reply

    def OnClientAuthorize(self, request, context):
        print("OnClientAuthorize:", request)
        reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)
        return reply

    def OnClientSubscribe(self, request, context):
        print("OnClientSubscribe:", request)
        return exhook_pb2.EmptySuccess()

    def OnClientUnsubscribe(self, request, context):
        print("OnClientUnsubscribe:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionCreated(self, request, context):
        print("OnSessionCreated:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionSubscribed(self, request, context):
        print("OnSessionSubscribed:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionUnsubscribed(self, request, context):
        print("OnSessionUnsubscribed:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionResumed(self, request, context):
        print("OnSessionResumed:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionDiscarded(self, request, context):
        print("OnSessionDiscarded:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionTakenover(self, request, context):
        print("OnSessionTakenover:", request)
        return exhook_pb2.EmptySuccess()

    def OnSessionTerminated(self, request, context):
        print("OnSessionTerminated:", request)
        return exhook_pb2.EmptySuccess()

    def OnMessagePublish(self, request, context):
        self.producer.send('testtopic', pickle.dumps(nmsg))
        reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
        return reply

    ## case2: stop publish the 't/d' messages
    #def OnMessagePublish(self, request, context):
    #    nmsg = request.message
    #    if nmsg.topic == 't/d':
    #        nmsg.payload = b""
    #        nmsg.headers['allow_publish'] = b"false"
    #
    #    reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
    #    return reply

    def OnMessageDelivered(self, request, context):
        print("OnMessageDelivered:", request)
        return exhook_pb2.EmptySuccess()

    def OnMessageDropped(self, request, context):
        print("OnMessageDropped:", request)
        return exhook_pb2.EmptySuccess()

    def OnMessageAcked(self, request, context):
        print("OnMessageAcked:", request)
        return exhook_pb2.EmptySuccess()

def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    exhook_pb2_grpc.add_HookProviderServicer_to_server(HookProvider(), server)
    server.add_insecure_port('[::]:9000')
    server.start()

    print("Started gRPC server on [::]:9000")

    server.wait_for_termination()


if __name__ == '__main__':
    logging.basicConfig()
    serve()

 解释一下代码,在OnProvidedLoader里面是加载各种事件的钩子,这里只加载message.publish事件。在OnMessagePublish是对应事件的处理函数,这里把收到的MQTT消息通过Pickle进行序列化,发送到Kafka的对应topic

部署ExHook

写一个Dockerfile,把代码打包为一个镜像

FROM python:3.7-slim
WORKDIR /app
COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "./exhook_server.py"]

 requirements.txt文件内容为

grpcio==1.59.3
grpcio-tools==1.59.3
kafka-python==2.0.2

运行以下命令来构建镜像

docker build --network=host -t emqx_plugin_test:v1.0 .

创建一个部署这个镜像的deployment和service,然后部署到K8S

apiVersion: apps/v1
kind: Deployment
metadata:
  name: emqx-hookserver-deployment
  labels:
    app: hookserver
  namespace: emqx
spec:
  replicas: 1
  selector:
    matchLabels:
      app: hookserver
  template:
    metadata:
      labels:
        app: hookserver
    spec:
      containers:
      - name: hookserver
        image: emqx_plugin_test:v1.0
        imagePullPolicy: Never
        resources:
          requests:
            memory: "250Mi"
            cpu: "100m"
          limits:
            memory: "250Mi"
            cpu: "100m"
        ports:
        - name: rpc
          containerPort: 9000
---
apiVersion: v1
kind: Service
metadata:
  name: hookserver-service
  namespace: emqx
spec:
  selector:
    app: hookserver
  ports:
    - name: rpc
      port: 9000

回到EMQX的控制面板Dashboard,在ExHook里面添加,url填入http://hookserver-service.emqx.svc.cluster.local:9000,然后选择启用即可,可以看到状态为连接成功,并且显示注册了1个钩子。

在minikube上部署,一开始是显示连接中,等了很久仍然无法连接成功,最后查了资料,原来是coredns的问题,运行以下命令重启即可:

kubectl -n kube-system rollout restart deployment coredns

之后打开订阅Kafka的testtopic,然后通过MQTT连接到EMQX发送消息,可以看到Kafka能成功收到EMQX转发的消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/222815.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

ModStartCMS v7.7.0 集成内容区块,文件选择顺序

ModStart 是一个基于 Laravel 模块化极速开发框架。模块市场拥有丰富的功能应用,支持后台一键快速安装,让开发者能快的实现业务功能开发。 系统完全开源,基于 Apache 2.0 开源协议,免费且不限制商业使用。 功能特性 丰富的模块市…

羊大师发现,广州可能真的要下雪了!

羊大师发现,广州可能真的要下雪了! 关于这次广州可能要下雪的消息,来源于气象部门的初步预测。据气象部门表示,近期广州将受到较强的冷空气影响,降温幅度可达5-7摄氏度,且湿度较大,这都是下雪的…

动静态IP代理是怎么实现的?如何搭建稳定独享住宅IP?

首先,让我们来了解一下什么是动静态IP代理。动静态IP代理是一种网络代理服务,它可以通过设置IP代理服务器来隐藏用户的真实IP地址,从而保护用户的隐私和安全。 根据是否需要手动切换IP地址,可以将动静态IP代理分为动态代理和静态代…

C-11练习题

一、单项选择题(本大题共20小题,每小题2分,共40分。在每小题给出的四个备选项中选出一个正确的答案,并将所选项前的字母填写在答题纸的相应位置上。) 1,在C语言中,合法的长整型常数是() A. OxOL B. 4962710M C. 324562& D. 216D 2,设有定义: int a[10],*pa6,*q…

Git配置

个人主页:Lei宝啊 愿所有美好如期而遇 前言 前面我们新建了远程仓库并且在Linux上克隆了远程仓库,但是在新建仓库时我们提到会配置gitignore文件,这次我们将会配置他,并给命令起别名。 目录 前言 忽略特殊文件 给命令起别名…

matplotlib 默认属性和绘图风格

matplotlib 默认属性 一、绘图风格1. 绘制叠加折线图2. Solarize_Light23. _classic_test_patch4. _mpl-gallery5. _mpl-gallery-nogrid6. bmh7. classic8. fivethirtyeight9. ggplot10. grayscale11. seaborn12. seaborn-bright13. seaborn-colorblind14. seaborn-dark15. sea…

东芝CT高压电源维修VP-33452 ULTIMAX80 DREX-ULT80

东芝高压电源多用于东芝CT机XVISION/EX、AUKLET系列、ASTEION系列、以及多排系列。 电源内部电路不得随意更改。电源维修的几点注意事项,希望大家能够在以后遇到类似的问题能帮帮助到大家。spellmαnl电源维修一首先在维修开关电源时,维修人员在修理时注…

Linux环境下安装Nginx

Nginx(发音:engine-x)是一个高性能的HTTP和反向代理服务器,也可以作为邮件代理服务器使用。它是由俄罗斯程序员Igor Sysoev开发的,并在2004年公开发布。Nginx是一个开源项目,可以在Linux、Unix、BSD和Windo…

UVM验证平台中加入sequencer

sequence机制用于产生激励,它是UVM中最重要的机制之一。在 一个规范化的UVM验证平台中,driver只负责驱动transaction,而不负责产生transaction。sequence机制有两大组成部分,一是 sequence,二是sequencer。如何在验证平…

集合01 - Java

集合 1、数组的不足2、集合3、集合的框架体系(背)CollectionMap 1、数组的不足 前面我们保存多个数据使用的是数组,那么数组有不足的地方,我们分析一下。 数组: 长度开始时必须指定,而且一旦指定,不能更改…

【从删库到跑路 | MySQL数据库总结篇】JDBC编程

个人主页:兜里有颗棉花糖 欢迎 点赞👍 收藏✨ 留言✉ 加关注💓本文由 兜里有颗棉花糖 原创 收录于专栏【MySQL学习专栏】🎈 本专栏旨在分享学习MySQL的一点学习心得,欢迎大家在评论区讨论💌 目录 一、前言…

Java多线程万字详解(基础概念、多线程实现方式、锁、消费者机制、线程池)

1 、基础概念解释 1.1线程与进程 线程:是操作系统能够进行运算调度的最小单位。它被包含在进程当中,是进程中的实际运作单位。 进程:是程序的基本执行实体。一个进程中至少有一个线程。一个进程中是可以有多个线程的。如QQ,微信那…

同旺科技 USB TO RS-485 定制款适配器--- 拆解(二)

内附链接 1、USB TO RS-485 定制款适配器 ● 支持USB 2.0/3.0接口,并兼容USB 1.1接口; ● 支持USB总线供电; ● 支持Windows系统驱动,包含WIN10 / WIN11系统32 / 64位; ● 支持Windows RT、Linux、Mac OS X、Windo…

android studio安装说明

一、安装文件下载: Android studio、SDK、NDK下载: https://developer.android.google.cn/ndk/downloads?hlzh-cn 二、双击android studio 安装文件,开始安装: 三、进入安装界面,点击“next”。 四、点击“next”&…

二手物品交易系统源码小程序H5闲置物品转让APP成品

这是一个二手物品交易系统的基本功能介绍,以下是对每个功能的详细解释: 商品发布:卖家可以通过系统发布二手商品信息,包括商品详情、价格、图片等。商品展示:系统会将所有发布的二手商品进行展示,买家可以…

微信小程序收款手续费怎么搞成0.2

今天,我将分享如何有效地降低日常中的收款手续费率。我们都知道,不管是微信支付还是支付宝,平台都会从中扣除一定的手续费。但你是否知道,其实手续费率是可以降低的呢?今天介绍如何申请最低手续费率为0.2%的方法&#…

虹科干货 | 关于JSON数据库

来源:艾特保IT 虹科干货 | 关于JSON数据库 原文链接:https://mp.weixin.qq.com/s/NutCGWa32rOcEHrk3UDGcQ 欢迎关注虹科,为您提供最新资讯! 如何理解JSON数据库?作为NoSQL数据库的一种类型,JSON数据库有哪…

低压无功补偿在分布式光伏现场中的应用

摘要:分布式光伏电站由于建设时间短、技术成熟、收益明显而发展迅速,但光伏并网引起用户功率因数异常的问题也逐渐凸显。针对分布式光伏电站接入配电网后功率因数降低的问题,本文分析了低压无功补偿装置补偿失效的原因,并提出了一…

掌握终端,尽在ZOC for Mac – 最强大的终端仿真器!

在数字时代,终端仿真器是专业人士和开发者必备的工具之一。而ZOC for Mac将为您提供无与伦比的终端体验,助力您更轻松地管理远程连接、维护服务器和进行编程任务。 ZOC for Mac的卓越功能: 多协议支持:ZOC支持Telnet、SSH、SSH2、…

基于Java SSM框架实现超市进销存购物商城管理系统项目【项目源码+论文说明】计算机毕业设计

基于java的SSM框架实现超市进销存购物商城管理系统演示 摘要 随着科学技术的飞速发展,社会的方方面面、各行各业都在努力与现代的先进技术接轨,通过科技手段来提高自身的优势,社区生活超市管理系统当然也不能排除在外。社区生活超市管理系统…