为了更好的阅读体验,建议移步至笔者的博客阅读:JetLinks设备接入的认识与理解
1、认识 JetLinks
1.1、官网文档
官网:https://www.jetlinks.cn/
JetLinks 有两个产品:JetLinks-lot和JetLinks-view
官方文档:
-
JetLinks 物联网基础平台
-
JetLinks 物联网平台开发手册
1.2、JetLinks
JetLinks 是可支持多种方式接入设备的物联网设备管理平台
https://hanta.yuque.com/px7kg1/yfac2l/fwqriw24lp3cy2lw
JetLinks IOT 是一个开源的、企业级的物联网平台,它集成了设备管理、数据安全通信、消息订阅、规则引擎等一系列物联网核心能力,支持以平台适配设备的方式连接海量设备,采集设备数据上云,提供云端API,通过调用云端API实现远程控制。JetLinks物联网平台还支持多种设备接入协议,并提供了丰富的协议库。
支持:多协议(MQTT、HTTP、CoAP、UDP、TCP、WebSocket)自定义编解码插件接入;
支持:云平台对接接入;
支持: ModBus/TCP、OPC UA通道接入;
支持:基于GB/T 28181国标协议视频接入;
支持:自研边缘计算网关接入。
1.3、产品架构的理解
https://hanta.yuque.com/px7kg1/yfac2l/tvlxz93cht8zyl94
1.3.1、理解1
通过不同层级功能职责的封装、组合,以支持多设备、多协议接入平台
-
设备连接层:支持MQTT、TCP、UDP、CoAP、HTTP、WebSocket协议,提供统一设备接入的能力。
-
设备管理层:提供设备注册、配置、维护和监控的功能,支持设备属性、状态实时展示和历史属性、设备日志记录查询等。
-
业务逻辑层:提供规则引擎、数据转发和数据解析等功能,支持多种业务场景下的数据处理和交互操作。
-
应用开发层:提供RESTAPI和WebSocket接口,支持前端对接和自定义应用开发。同时还提供了可视化的数据展示和操作页面,方便用户快速搭建物联网应用系统。
1.3.2、理解2
设备接入JetLinks物联网平台后,可实现:设备通讯、数据的采集、认证、流转、存储、分析和实时监控
13.3、理解3
开发者需要自行实现编解码器逻辑,才可以让平台对设备数据进行全面管理
1.4、基本概念
https://hanta.yuque.com/px7kg1/yfac2l/dagxgfzc3vnul0sn
1.4.1、产品
产品是指一组具有相同功能和规格的设备集合,通常由同一家生产厂家制造。
设备可能是传感器、执行器、控制器等各种不同类型的物联网设备,它们可以通过网络连接到物联网平台。通过将这些设备组合到一个产品中,企业可以对这些设备进行统一管理和监控,以便更有效地控制其行为和状态。
1.4.2、设备
设备是指物理存在的、可通过网络连接的单个物联网设备。
设备可以是各种类型的物品,例如传感器、执行器、控制器等。这些设备通过物联网连接到平台,以便与其他设备或应用程序进行通信、交换数据和接收命令。
1.4.3、物模型
物模型说明:http://doc.jetlinks.cn/function-description/metadata_description.html
物模型是物理空间中的实体在云端的数字化表示,有 4 个纬度:属性、功能、事件、标签。
-
属性:用于描述设备运行时具体信息和状态。例如温湿度传感器包含“温度”、“湿度”两个属性。
-
功能:设备可被外部调用的能力或方法,可设置输入参数和输出参数。相比于属性,服务可通过一条指令实现更复杂的业务逻辑
-
事件:用于描述设备上报云端的多个参数,多用于复杂报文结构或设备本身在某个阈值触发的报文。
-
标签:统一为设备添加拓展字段,添加后将在设备信息页显示。
2、开发手册
社区版后端工程:
- github 仓库:https://github.com/jetlinks/jetlinks-community
- gitee 仓库:https://gitee.com/jetlinks/jetlinks-community
2.1、模块说明
社区版系统模块说明:https://hanta.yuque.com/px7kg1/nn1gdr/gfqb3xmxg8fsvyxf#lR7Pd
- jetlinks-components # 组件库
- common-component # 通用组件、工具类等
- configure-component # 统一配置模块
- dashboard-component # 仪表盘模块
- elasticsearch-component # ElasticSearch集成
- gateway-component # 网关模块,统一定义网关接口等信息
- io-component # IO模块,文件管理等
- logging-component # 日志模块
- network-component # 网络组件模块,统一定义网络组件规范以及默认实现
- http-component # http模块
- mqtt-component # mqtt模块
- network-core # 网络组件核心模块
- tcp-component # tcp模块
- notify-component # 通知模块,统一定义通知规范以及默认实现
- notify-core # 通知模块核心
- notify-dingtalk # 钉钉通知模块
- notify-email # 邮件通知模块
- notify-sms # 短信通知模块
- notify-voice # 语音通知模块
- notify-webhook # webhook通知模块
- notify-wechat # 微信通知模块
- protocol-component # 协议模块
- relation-component # 关系模块,用于描述物与物之间的关系
- rule-engine-component # 规则引擎模块,集成规则引擎通用功能
- script-component # 脚本模块,封装脚本引擎
- tdengine-component # 对tdengine的支持
- things-component # 物管理模块
- timeseries-component # 时序数据组件
- jetlinks-manager # 管理功能
- authentication-manager # 用户,权限管理模块
- device-manager # 设备管理模块
- logging-manager # 日志管理模块
- network-manager # 网络组件管理模块
- notify-manager # 通知管理模块
- rule-engine-manager # 规则引擎管理模块
- jetlinks-standalone #单例模块,启动JetLinks平台
2.2、技术选型
技术栈 | 描述 |
---|---|
Java8 | 编程语言 |
hsweb Framework | 业务基础框架 |
Spring Boot 2.7.x | 响应式web支持 |
vert.x,netty | 高性能网络框架 |
R2DBC | 关系型数据库响应式驱动 |
Postgresql | 关系型数据库,可更换为mysql、sqlserver |
ElasticSearch | 设备数据与日志存储,可更换为其他中间件 |
Redis | 用户信息与权限缓存、设备注册中心缓存 |
scalecube | 基于JVM的分布式服务框架,支持响应式 |
micrometer | 监控指标框架 |
2.3、必要的开发知识
响应式编程:http://doc.jetlinks.cn/dev-guide/reactor.html
事件驱动:http://doc.jetlinks.cn/dev-guide/event-driver.html
添加自定义模块:https://hanta.yuque.com/px7kg1/dev/wdymp6flcfa1vwh5
3、设备接入流程
设备接入流程:http://doc.jetlinks.cn/function-description/device_message_description.html#%E8%AE%BE%E5%A4%87%E6%8E%A5%E5%85%A5%E6%B5%81%E7%A8%8B
HTTP协议设备接入:https://hanta.yuque.com/px7kg1/yfac2l/qlr6nz5btr5rwrgk
3.1、流程图
3.2、开发:协议包
开发者自行实现自定义协议,官方教程:http://doc.jetlinks.cn/dev-guide/custom-message-protocol.html
官方提供了协议开发示例工程:https://github.com/jetlinks/jetlinks-official-protocol
JetLinks 官方协议 jar 包:https://github.com/jetlinks/jetlinks-official-protocol/blob/v3/package/jetlinks-official-protocol-3.0.0.jar
-
编写
自定义编解码器
:创建
org.jetlinks.core.message.codec.DeviceMessageCodec
接口实现类,重写encode()
、decode()
、getSupportTransport()
方法 -
编写
协议的元信息
创建
org.jetlinks.core.metadata.DefaultConfigMetadata
对象并设置对应属性 -
编写
自定义设备协议支持提供商
:创建
org.jetlinks.core.spi.ProtocolSupportProvider
接口实现类,并重写create()
方法,在
create()
方法中将:将自定义编解码器
注册到协议中 -
配置路由配置:
在
org.jetlinks.core.spi.ProtocolSupportProvider
接口实现类的create()
方法中创建org.jetlinks.core.defaults.CompositeProtocolSupport
对象,在其中配置路由配置、身份认证(可选)
3.3、添加:协议包
将协议包上传到协议管理中
3.4、添加:网络组件
- 配置:本地和公网的接口地址、端口号
- 配置:接口处理的服务类型
3.5、添加:设备接入网关
将上述的协议包和网络组件进行绑定
3.6、添加:产品
-
配置:产品信息
-
绑定:上述的自定义网络组件(官方定义:设备接入)
-
配置:认证信息
-
配置存储策略
-
配置:物模型
- 属性定义
- 功能定义
- 事件定义
- 标签定义
-
启用:产品
行式存储
ElasticSearch-行式存储是系统默认情况下使用的存储方案。每一个属性值都保存为一条索引记录。
典型应用场景:设备每次只会上报一部分属性, 以及支持读取部分属性数据的时候。
列式存储
一个属性作为一列,一条属性消息作为一条索引记录进行存储。
典型应用场景:适合设备每次都上报所有的属性值的场景。
3.7、添加:设备
- 配置:设备ID、名称
- 绑定:上述配置好的产品(只能配置状态是正常的产品,即已启用的产品)
- 启动:设备
- (可选)默认继承了所属产品的物模型。可以配置专属当前设备的物模型
4、理解协议包
关于协议包:https://hanta.yuque.com/px7kg1/nn1gdr/kcqv8dn8y6778t2a
协议包主要包含 4 个部分
-
数据传输协议:协议包约定了常见的网络通信协议,例如MQTT、HTTP、TCP、CoAP等,来实现物联网设备与JetLinks平台之间的数据传输。开发者可根据设备实际情况选择对应的通信协议。
-
数据解析标准:协议包定义了一套设备数据解析标准,使得各种类型的物联网设备通过网络协议传输至JetLinks后,根据协议包内的数据解析标准将不同类型的报文转换成平台统一的消息。
-
设备管理功能:协议包内可以获取平台内定义的设备数据,包括设备信息、设备配置、设备状态等,方便开发者在接入设备时获取设备相关数据进行自定义的业务逻辑处理。
-
身份认证:协议包支持物联网设备的身份认证,用户可以在协议包内编写身份认证逻辑来验证连接的客户端身份,以保护设备和数据的安全。
5、理解自定义编解码器开发流程
5.1、步骤1
自定义 DeviceMessageCodec 接口实现类,重写 encode()、decode() 方法
5.1.1、消息编码
重写 DeviceMessageCodec 接口中的encode()
方法
5.1.2、消息解码
重写 DeviceMessageCodec 接口中的decode()
方法
5.2、步骤2
自定义 ProtocolSupportProvider 接口实现类,配置元数据信息
5.3、步骤3
配置路由与 DeviceMessage 的绑定关系
5.4、步骤4
自定义 Authenticator 接口实现并配置
6、理解编解码涉及的核心类关系
协议加载设计:https://hanta.yuque.com/px7kg1/nn1gdr/gascdx49ia6u4lsf
平台统一设备消息定义:http://doc.jetlinks.cn/function-description/device_message_description.html
7、协议包上传逻辑分析
7.1、步骤1:上传协议 jar
后端接口
POST
/api/file/upload
请求报文
表单请求,接收参数名为:file 的文件数据对象
Content-Disposition: form-data; name="file"; filename="jetlinks-official-protocol-3.0.0.jar"
Content-Type: application/octet-stream
接口类:org.jetlinks.community.io.file.web.FileManagerController#upload
处理逻辑
-
获取文件信息,并将文件数据保存到本地指定目录
-
默认文件目录为:./data/files/yyyyMMdd/
-
重命名 jar 文件名,生成规则:md5(uuid())
-
计算当前文件的 md5 和 sha256 值
-
-
将文件相关信息保存到数据库中,数据对象:
org.jetlinks.community.io.file.FileEntity
-
保存成功的文件数据记录主键和文件信息一起通过接口返回
响应报文
返回文件数据相关记录信息,核心信息:
{ "message": "success", "result": { "id": "9c9ce661a1fadb8019ca50145b33a074", "name": "jetlinks-official-protocol-3.0.0.jar", "extension": "jar", "length": 102512, "md5": "24504ceb0d6570b84b86e6180d9fca9f", "sha256": "fb0c6144ad056326e26eb829c13759b5080da095c7bb02386c7f064ac059f24e", "createTime": 1699859432789, "creatorId": "1199596756811550720", "options": [], "others": { "accessKey": "c24b19b0c91119c6673fa1a06a4d2ae0" } }, "status": 200, "timestamp": 1699859454032 }
7.2、步骤2:确定协议
7.2.1、后端接口
PATCH
/api/protocol
接口类:org.jetlinks.community.device.web.ProtocolSupportController
7.2.2、请求报文
{
"id": "1722876422724329472",
"name": "官方协议v3.0",
"description": "",
"type": "jar",
"state": 1,
"creatorId": "1199596756811550720",
"createTime": 1699600723328,
"configuration": {
"location": "http://localhost:5173/api/file/9c9ce661a1fadb8019ca50145b33a074?accessKey=c24b19b0c91119c6673fa1a06a4d2ae0"
}
}
7.2.3、处理逻辑
- 将前端请求的文件信息保存到数据库中,数据对象:
org.jetlinks.community.device.entity.ProtocolSupportEntity
前端逻辑:将步骤1 的响应结果拼接成:文件地址(用户不可编辑)+ 用户填写的协议包基本信息(名称、类型、说明)
org.jetlinks.community.device.web.ProtocolSupportController
实现了org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController
接口。
org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController
接口又继承了三个接口:org.hswebframework.web.crud.web.reactive.ReactiveServiceSaveController
、org.hswebframework.web.crud.web.reactive.ReactiveServiceQueryController
、org.hswebframework.web.crud.web.reactive.ReactiveServiceDeleteController
ProtocolSupportController
package org.jetlinks.community.device.web;
import org.hswebframework.web.crud.web.reactive.ReactiveServiceCrudController;
import org.jetlinks.community.device.service.LocalProtocolSupportService;
@RestController
@RequestMapping("/protocol")
public class ProtocolSupportController
implements ReactiveServiceCrudController<ProtocolSupportEntity, String> {
@Autowired
@Getter
private LocalProtocolSupportService service;
}
ReactiveServiceCrudController
package org.hswebframework.web.crud.web.reactive;
public interface ReactiveServiceCrudController<E, K> extends
ReactiveServiceSaveController<E, K>,
ReactiveServiceQueryController<E, K>,
ReactiveServiceDeleteController<E, K> {
}
PATH /api/protocol
接口实际由:ReactiveServiceSaveController
接口提供的默认 save()
方法处理数据,最终调用getService()
方法进行save()
操作。
package org.hswebframework.web.crud.web.reactive;
import org.hswebframework.web.authorization.annotation.Authorize;
public interface ReactiveServiceSaveController<E, K> {
@Authorize(ignore = true)
ReactiveCrudService<E, K> getService();
@PatchMapping
@Operation(summary = "保存数据", description = "如果传入了id,并且对应数据存在,则尝试覆盖,不存在则新增.")
default Mono<SaveResult> save(@RequestBody Flux<E> payload) {
return Authentication
.currentReactive()
.flatMapMany(auth -> payload.map(entity -> applyAuthentication(entity, auth)))
.switchIfEmpty(payload)
.as(getService()::save);
}
}
由于ProtocolSupportController
注入了org.jetlinks.community.device.service.LocalProtocolSupportService
,并且属性名为:service,因此ProtocolSupportController
的getService()
就是ReactiveServiceSaveController
接口的getService()
方法实现。显而易见,确定协议的核心逻辑就在:LocalProtocolSupportService
的save()
方法。
org.jetlinks.community.device.service.LocalProtocolSupportService
类继承了org.hswebframework.web.crud.service.GenericReactiveCrudService
抽象类,而GenericReactiveCrudService
抽象类又实现了org.hswebframework.web.crud.service.ReactiveCrudService
接口,在ReactiveCrudService
中有save()
方法
ProtocolSupportController
package org.jetlinks.community.device.service;
import org.jetlinks.community.reference.DataReferenceManager;
import org.jetlinks.supports.protocol.management.ProtocolSupportManager;
@Service
public class LocalProtocolSupportService extends GenericReactiveCrudService<ProtocolSupportEntity, String> {
@Autowired
private ProtocolSupportManager supportManager;
@Autowired
private DataReferenceManager referenceManager;
}
GenericReactiveCrudService
package org.hswebframework.web.crud.service;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
import org.springframework.beans.factory.annotation.Autowired;
public abstract class GenericReactiveCrudService<E, K> implements ReactiveCrudService<E, K> {
@Autowired
private ReactiveRepository<E, K> repository;
@Override
public ReactiveRepository<E, K> getRepository() {
return repository;
}
}
GenericReactiveCrudService 注入了 ReactiveRepository 接口,该接口的实现类为:org.hswebframework.ezorm.rdb.mapping.defaults.DefaultReactiveRepository
,里面实现了save()
方法:
package org.hswebframework.ezorm.rdb.mapping.defaults;
import org.hswebframework.ezorm.rdb.mapping.ReactiveRepository;
public class DefaultReactiveRepository<E, K> extends DefaultRepository<E> implements ReactiveRepository<E, K> {
@Override
public Mono<SaveResult> save(Publisher<E> data) {
return Flux
.from(data)
.collectList()
.filter(CollectionUtils::isNotEmpty)
.flatMap(list -> doSave(list).reactive().as(this::setupLogger))
.defaultIfEmpty(SaveResult.of(0, 0));
}
}
上述doSave()
方法是org.hswebframework.ezorm.rdb.mapping.defaults.DefaultRepository
抽象类提供的默认方法:
package org.hswebframework.ezorm.rdb.mapping.defaults;
import org.hswebframework.ezorm.rdb.mapping.events.EventResultOperator;
public abstract class DefaultRepository<E> {
protected SaveResultOperator doSave(Collection<E> data) {
RDBTableMetadata table = getTable();
UpsertOperator upsert = operator.dml().upsert(table.getFullName());
return EventResultOperator.create(
() -> {
upsert.columns(getProperties());
List<String> ignore = new ArrayList<>();
for (E e : data) {
upsert.values(Stream.of(getProperties())
.map(property -> getInsertColumnValue(e, property, (prop, val) -> ignore.add(prop)))
.toArray());
}
upsert.ignoreUpdate(ignore.toArray(new String[0]));
return upsert.execute();
},
SaveResultOperator.class,
table,
MappingEventTypes.save_before,
MappingEventTypes.save_after,
getDefaultContextKeyValue(instance(data),
type("batch"),
tableMetadata(table),
upsert(upsert))
);
}
}
上述EventResultOperator
的create()
方法中,发布了EntitySavedEvent<E>
事件(通过 Spring的ApplicationEventPublisher 发送事件)。
在org.jetlinks.community.device.service.ProtocolSupportHandler
中订阅了EntitySavedEvent<ProtocolSupportEntity>
事件:
package org.jetlinks.community.device.service;
import lombok.AllArgsConstructor;
import org.jetlinks.core.ProtocolSupport;
import org.jetlinks.community.reference.DataReferenceManager;
import org.jetlinks.supports.protocol.management.ProtocolSupportLoader;
import org.jetlinks.supports.protocol.management.ProtocolSupportManager;
@Component
@AllArgsConstructor
public class ProtocolSupportHandler {
private final DataReferenceManager referenceManager;
private ProtocolSupportLoader loader;
private ProtocolSupportManager supportManager;
@EventListener
public void handleCreated(EntityCreatedEvent<ProtocolSupportEntity> event) {
event.async(reloadProtocol(event.getEntity()));
}
@EventListener
public void handleSaved(EntitySavedEvent<ProtocolSupportEntity> event) {
event.async(reloadProtocol(event.getEntity()));
}
@EventListener
public void handleModify(EntityModifyEvent<ProtocolSupportEntity> event) {
event.async(reloadProtocol(event.getAfter()));
}
// 重新加载协议
private Mono<Void> reloadProtocol(Collection<ProtocolSupportEntity> protocol) {
return Flux
.fromIterable(protocol)
.filter(entity -> entity.getState() != null)
.map(entity -> entity.getState() == 1 ? entity.toDeployDefinition() : entity.toUnDeployDefinition())
.flatMap(def -> loader
//加载一下检验是否正确,然后就卸载
.load(def)
.doOnNext(ProtocolSupport::dispose)
.thenReturn(def))
.onErrorMap(err -> new BusinessException("error.unable_to_load_protocol", 500, err.getMessage()))
.flatMap(supportManager::save)
.then();
}
}
上述ProtocolSupportLoader
接口的实现类为org.jetlinks.community.protocol.SpringProtocolSupportLoader
,其中load()
方法会动态加载 jar 包为org.jetlinks.core.spi.ProtocolSupportProvider
接口实现,并执行create()
方法。
7.2.4、响应报文
{
"message": "success",
"result": {
"added": 0,
"updated": 1,
"total": 1
},
"status": 200,
"timestamp": 1699859463951
}
8、加载协议包时机
8.1、加载协议包时机1
通过org.jetlinks.community.device.service.ProtocolSupportHandler
监听EntityCreatedEvent<ProtocolSupportEntity>
、EntitySavedEvent<ProtocolSupportEntity>
、EntityModifyEvent<ProtocolSupportEntity>
事件,调用ProtocolSupportLoader
的load()
方法加载协议
在 ProtocolSupportLoader 的 load() 方法中:会调用 org.jetlinks.core.spi.ProtocolSupportProvider 接口实现,并执行 create() 方法
8.2、加载协议包时机2
通过org.jetlinks.community.protocol.LazyInitManagementProtocolSupports
实现org.springframework.boot.CommandLineRunner
接口,在项目启动时执行init()
方法,调用ProtocolSupportLoader
的load()
方法加载协议
在 ProtocolSupportLoader 的 load() 方法中:会调用 org.jetlinks.core.spi.ProtocolSupportProvider 接口实现,并执行 create() 方法
9、设备网关加载机制
- 通过 DeviceGatewayEventHandler 实现 CommandLineRunner 接口,在项目启动时执行 init() 方法
- 通过 DeviceGatewayEventHandler 监听 DeviceGatewayEntity 的保存、创建、更新事件
为了更好的阅读体验,建议移步至笔者的博客阅读:JetLinks设备接入的认识与理解