在上一篇博客车联网架构设计(一)_消息平台的搭建-CSDN博客中,我介绍了车联网平台需要实现的一些功能,并介绍了如何用EMQX+HAPROXY来搭建一个MQTT消息平台。车联网平台的应用需要消费车辆发布的消息,同时也会下发消息给车辆,以实现车辆控制等功能。通常我们会在MQTT消息平台收到车辆消息后对消息进行缓存,以供上层应用使用。我们可以直接把消息保存到数据库,或者引入一个消息队列,这样可以方便对应用和车辆之间进行解耦合。
这里我将介绍一下如何引入一个Kafka消息队列,把车辆以及上层应用之间需要交互的消息缓存到这个消息队列之中。
在EMQX的企业版中,提供了丰富的数据桥接功能,可以支持把MQTT消息桥接到其他外部系统,例如Kafka或数据库中。但是在开源版,只提供了很有限的数据桥接,不支持Kafka。为此我们可以通过给EMQX开发Hook extension的方式,来加载我们的插件,实现把数据桥接到Kafak。
在EMQX官网的介绍中,Hook扩展是通过gRPC的方式来实现的,支持多种编程语言。如下图:
这里我以Python为例子,来定义一个扩展。
搭建Kafka
首先是在K8S上部署一个Kafka集群,这里我选择了Strimizi的Kafka operator来部署
先创建一个namespace
kubectl create namespace kafka
安装Operator, CRD以及定义RBAC等
kubectl create -f 'https://strimzi.io/install/latest?namespace=kafka' -n kafka
创建一个只包含一个节点的Kafka
kubectl apply -f https://strimzi.io/examples/latest/kafka/kafka-persistent-single.yaml -n kafka
打开两个终端,分别运行以下的订阅和发布的指令,测试Kafka是否正常工作
kubectl -n kafka run kafka-producer -ti --image=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0 --rm=true --restart=Never -- bin/kafka-console-producer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic
kubectl -n kafka run kafka-consumer -ti --image=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0 --rm=true --restart=Never -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --topic my-topic --from-beginning
开发ExHook
首先是获取当前EMQX版本定义的gPRC proto。在EMQX服务器的/opt/emqx/lib/emqx_exhook-5.0.14/priv/protos/目录下面有一个exhook.proto文件。
运行以下命令来基于这个proto生成python文件
python -m grpc_tools.protoc -I./ --python_out=. --pyi_out=. --grpc_python
_out=. ./exhook.proto
运行之后,在当前目录下会新生成三个文件,exhook_pb2_grpc.py,exhook_pb2.py,exhook_pb2.pyi
新建一个exhook_server.py文件,继承exhook_pb2_grpc里面的HookProviderServicer,注册对应事件的处理方法,如以下代码:
from concurrent import futures
import logging
from multiprocessing.sharedctypes import Value
import grpc
import exhook_pb2
import exhook_pb2_grpc
import pickle
from kafka import KafkaProducer
class HookProvider(exhook_pb2_grpc.HookProviderServicer):
def __init__(self):
self.producer = KafkaProducer(bootstrap_servers='my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092')
def OnProviderLoaded(self, request, context):
print("OnProviderLoaded:", request)
'''
specs = [exhook_pb2.HookSpec(name="client.connect"),
exhook_pb2.HookSpec(name="client.connack"),
exhook_pb2.HookSpec(name="client.connected"),
exhook_pb2.HookSpec(name="client.disconnected"),
exhook_pb2.HookSpec(name="client.authenticate"),
exhook_pb2.HookSpec(name="client.authorize"),
exhook_pb2.HookSpec(name="client.subscribe"),
exhook_pb2.HookSpec(name="client.unsubscribe"),
exhook_pb2.HookSpec(name="session.created"),
exhook_pb2.HookSpec(name="session.subscribed"),
exhook_pb2.HookSpec(name="session.unsubscribed"),
exhook_pb2.HookSpec(name="session.resumed"),
exhook_pb2.HookSpec(name="session.discarded"),
exhook_pb2.HookSpec(name="session.takenover"),
exhook_pb2.HookSpec(name="session.terminated"),
exhook_pb2.HookSpec(name="message.publish"),
exhook_pb2.HookSpec(name="message.delivered"),
exhook_pb2.HookSpec(name="message.acked"),
exhook_pb2.HookSpec(name="message.dropped")
]
'''
specs = [exhook_pb2.HookSpec(name="message.publish")]
return exhook_pb2.LoadedResponse(hooks=specs)
def OnProviderUnloaded(self, request, context):
print("OnProviderUnloaded:", request)
return exhook_pb2.EmptySuccess()
def OnClientConnect(self, request, context):
print("OnClientConnect:", request)
return exhook_pb2.EmptySuccess()
def OnClientConnack(self, request, context):
print("OnClientConnack:", request)
return exhook_pb2.EmptySuccess()
def OnClientConnected(self, request, context):
print("OnClientConnected:", request)
return exhook_pb2.EmptySuccess()
def OnClientDisconnected(self, request, context):
print("OnClientDisconnected:", request)
return exhook_pb2.EmptySuccess()
def OnClientAuthenticate(self, request, context):
print("OnClientAuthenticate:", request)
reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)
return reply
def OnClientAuthorize(self, request, context):
print("OnClientAuthorize:", request)
reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", bool_result=True)
return reply
def OnClientSubscribe(self, request, context):
print("OnClientSubscribe:", request)
return exhook_pb2.EmptySuccess()
def OnClientUnsubscribe(self, request, context):
print("OnClientUnsubscribe:", request)
return exhook_pb2.EmptySuccess()
def OnSessionCreated(self, request, context):
print("OnSessionCreated:", request)
return exhook_pb2.EmptySuccess()
def OnSessionSubscribed(self, request, context):
print("OnSessionSubscribed:", request)
return exhook_pb2.EmptySuccess()
def OnSessionUnsubscribed(self, request, context):
print("OnSessionUnsubscribed:", request)
return exhook_pb2.EmptySuccess()
def OnSessionResumed(self, request, context):
print("OnSessionResumed:", request)
return exhook_pb2.EmptySuccess()
def OnSessionDiscarded(self, request, context):
print("OnSessionDiscarded:", request)
return exhook_pb2.EmptySuccess()
def OnSessionTakenover(self, request, context):
print("OnSessionTakenover:", request)
return exhook_pb2.EmptySuccess()
def OnSessionTerminated(self, request, context):
print("OnSessionTerminated:", request)
return exhook_pb2.EmptySuccess()
def OnMessagePublish(self, request, context):
self.producer.send('testtopic', pickle.dumps(nmsg))
reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
return reply
## case2: stop publish the 't/d' messages
#def OnMessagePublish(self, request, context):
# nmsg = request.message
# if nmsg.topic == 't/d':
# nmsg.payload = b""
# nmsg.headers['allow_publish'] = b"false"
#
# reply = exhook_pb2.ValuedResponse(type="STOP_AND_RETURN", message=nmsg)
# return reply
def OnMessageDelivered(self, request, context):
print("OnMessageDelivered:", request)
return exhook_pb2.EmptySuccess()
def OnMessageDropped(self, request, context):
print("OnMessageDropped:", request)
return exhook_pb2.EmptySuccess()
def OnMessageAcked(self, request, context):
print("OnMessageAcked:", request)
return exhook_pb2.EmptySuccess()
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
exhook_pb2_grpc.add_HookProviderServicer_to_server(HookProvider(), server)
server.add_insecure_port('[::]:9000')
server.start()
print("Started gRPC server on [::]:9000")
server.wait_for_termination()
if __name__ == '__main__':
logging.basicConfig()
serve()
解释一下代码,在OnProvidedLoader里面是加载各种事件的钩子,这里只加载message.publish事件。在OnMessagePublish是对应事件的处理函数,这里把收到的MQTT消息通过Pickle进行序列化,发送到Kafka的对应topic
部署ExHook
写一个Dockerfile,把代码打包为一个镜像
FROM python:3.7-slim
WORKDIR /app
COPY requirements.txt ./
RUN pip install -r requirements.txt
COPY . .
CMD ["python", "./exhook_server.py"]
requirements.txt文件内容为
grpcio==1.59.3
grpcio-tools==1.59.3
kafka-python==2.0.2
运行以下命令来构建镜像
docker build --network=host -t emqx_plugin_test:v1.0 .
创建一个部署这个镜像的deployment和service,然后部署到K8S
apiVersion: apps/v1
kind: Deployment
metadata:
name: emqx-hookserver-deployment
labels:
app: hookserver
namespace: emqx
spec:
replicas: 1
selector:
matchLabels:
app: hookserver
template:
metadata:
labels:
app: hookserver
spec:
containers:
- name: hookserver
image: emqx_plugin_test:v1.0
imagePullPolicy: Never
resources:
requests:
memory: "250Mi"
cpu: "100m"
limits:
memory: "250Mi"
cpu: "100m"
ports:
- name: rpc
containerPort: 9000
---
apiVersion: v1
kind: Service
metadata:
name: hookserver-service
namespace: emqx
spec:
selector:
app: hookserver
ports:
- name: rpc
port: 9000
回到EMQX的控制面板Dashboard,在ExHook里面添加,url填入http://hookserver-service.emqx.svc.cluster.local:9000,然后选择启用即可,可以看到状态为连接成功,并且显示注册了1个钩子。
在minikube上部署,一开始是显示连接中,等了很久仍然无法连接成功,最后查了资料,原来是coredns的问题,运行以下命令重启即可:
kubectl -n kube-system rollout restart deployment coredns
之后打开订阅Kafka的testtopic,然后通过MQTT连接到EMQX发送消息,可以看到Kafka能成功收到EMQX转发的消息。