EdgeX 规则引擎eKuiper
一、架构设计
LF Edge eKuiper 是物联网数据分析和流式计算引擎。它是一个通用的边缘计算服务或中间件,为资源有限的边缘网关或设备而设计。
eKuiper 采用 Go 语言编写,其架构如下图所示:
eKuiper 是 Golang 实现的轻量级物联网边缘分析、流式处理开源软件,可以运行在各类资源受限的边缘设备上。eKuiper 基于源 (Source),SQL (业务逻辑处理), 目标 (Sink) 的方式来支持流式数据处理。
- 源(Source):流式数据的数据源,例如来自于 MQTT 服务器的数据。在 EdgeX 的场景下,数据源就是 EdgeX 消息总线(EdgeX message bus),可以是来自于 ZeroMQ 或者 MQTT 服务器;
- SQL:SQL 是你流式数据处理指定业务逻辑的地方,eKuiper 提供了 SQL 语句可以对数据进行抽取、过滤和转换;
- 目标(Sink):目标用于将分析结果发送到特定的目标。例如,将分析结果发送到另外的 MQTT 服务器,或者一个 HTTP Rest 地址;
使用 eKuiper,一般需要完成以下三个步骤。
- 创建流,就是你定义数据源的地方
- 写规则为数据分析写 SQL
- 指定一个保存分析结果的目标
- 部署,并且运行规则
二、EdgeX集成eKuiper
在不同的微服务之间,EdgeX 使用消息总线进行数据交换。它包含了一个抽象的消息总线接口,并分别实现了 ZeroMQ 与 MQTT,在不同的微服务之间信息交互的支持。eKuiper 和 EdgeX 的集成工作包含了以下三部分,
- 扩展了一个 EdgeX 消息总线源,支持从 EdgeX 消息总线中接收数据
- 为了可以分析数据,eKuiper 需知道传入的数据流的格式。一般来说,用户最好在创建流的时候指定被分析的流数据的格式。
如下所示,一个 demo 流包含了一个名为 temperature 的字段。这与在关系型数据库中创建表格定义的时候非常像。在创建了流定义以后,eKuiper 可以在编译或者运行时对进入的数据进行类型检查,相应错误也会报告给用户。
CREATE STREAM demo (temperature bigint) WITH (FORMAT="JSON"...)
然而在 EdgeX 中,数据类型定义在 EdgeX event/reading 中已经指定,为了提升使用体验,用户可以在创建流的时候不指定数据类型。当接收到来自于消息总线的数据的时候,会根规则转换为相应的数据类型。
- 扩展支持 EdgeX 消息总线目标(sink),用于将处理结果写回至 EdgeX 消息总线。用户也可以选择将分析结果发送到 eKuiper 之前已经支持的 RestAPI 接口等。
三、使用eKuiper规则引擎控制设备
该章节描述了如何在 EdgeX 中使用 eKuiper 规则引擎,根据分析结果来实现对设备的控制。为了便于理解,该文章使用 device-virtual 示例,它对 device-virtual 服务发送的数据进行分析,然后根据由 eKuiper 规则引擎生成的分析结果来控制设备。
在本文中,将创建并运行以下两条规则。
- 监视 Random-UnsignedInteger-Device 设备的规则,如果 uint8 值大于 20,则向 Random-Boolean-Device 设备发送命令,并开启布尔值的随机生成。
- 监视 Random-Integer-Device 设备的规则,如果每20秒 int8 的平均值大于0,则向 Random-Boolean-Device