Debezium日常分享系列之:Debezium 通知
- 一、概论
- 二、Debezium通知格式
- 三、Debezium 有关初始快照状态的通知
- 四、Debezium 有关增量快照进度的通知
- 五、启用 Debezium 通知
- 六、访问 Debezium JMX 通知
- 七、自定义通知渠道
- 八、应用案例
一、概论
Debezium 通知提供了一种获取有关连接器状态信息的机制。通知可以发送到以下渠道:
- 接收器通知通道:通过 Connect API 将通知发送到配置的主题。
- 日志通知通道:通知会附加到日志中。
- JmxNotificationChannel:通知作为 JMX bean 中的属性公开。
- 定制:通知将发送到您实施的自定义渠道。
二、Debezium通知格式
通知消息包含以下信息:
属性 | 描述 |
---|---|
id | 分配给通知的唯一标识符。对于增量快照通知,id 与使用执行快照信号发送的相同。 |
aggregate_type | 快照类型 |
type | 提供有关在aggregate_type 字段中指定的事件的状态信息。 |
additional_data | 包含有关通知的详细信息的 Map<String,String>。 |
timestamp | 创建通知的时间。 Epoch unix 时间戳(以毫秒为单位) |
三、Debezium 有关初始快照状态的通知
以下示例显示了提供初始快照状态的典型通知:
{
"id": "5563ae14-49f8-4579-9641-c1bbc2d76f99",
"aggregate_type": "Initial Snapshot",
"type": "COMPLETED",
"additional_data" : {
"connector_name": "myConnector"
},
"timestamp": "1695817046353"
}
类型字段可以包含以下值之一:
- STARTED
- IN_PROGRESS
- TABLE_SCAN_COMPLETED
- COMPLETED
- ABORTED
- SKIPPED
下表显示了报告初始快照状态的通知中可能存在的不同负载的示例:
- STARTED
{
"id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
"aggregate_type":"Initial Snapshot",
"type":"STARTED",
"additional_data":{
"connector_name":"my-connector"
},
"timestamp": "1695817046353"
}
- IN_PROGRESS
{
"id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
"aggregate_type":"Initial Snapshot",
"type":"IN_PROGRESS",
"additional_data":{
"connector_name":"my-connector",
"data_collections":"table1, table2",
"current_collection_in_progress":"table1"
},
"timestamp": "1695817046353"
}
Mongo 连接器当前不支持字段 data_collection
- TABLE_SCAN_COMPLETED
"id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
"aggregate_type":"Initial Snapshot",
"type":"TABLE_SCAN_COMPLETED",
"additional_data":{
"connector_name":"my-connector",
"data_collection":"table1, table2",
"scanned_collection":"table1",
"total_rows_scanned":"100",
"status":"SUCCEEDED"
},
"timestamp": "1695817046353"
}
在前面的示例中,additional_data.status 字段可以包含以下值之一:
SQL_异常:执行快照时发生 SQL 异常。
成功了:快照成功完成。
Mongo 连接器当前不支持字段total_rows_scanned 和data_collection
- COMPLETED
{
"id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
"aggregate_type":"Initial Snapshot",
"type":"COMPLETED",
"additional_data":{
"connector_name":"my-connector"
},
"timestamp": "1695817046353"
}
- ABORTED
{
"id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
"aggregate_type":"Initial Snapshot",
"type":"ABORTED",
"additional_data":{
"connector_name":"my-connector"
},
"timestamp": "1695817046353"
}
- SKIPPED
{
"id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
"aggregate_type":"Initial Snapshot",
"type":"SKIPPED",
"additional_data":{
"connector_name":"my-connector"
},
"timestamp": "1695817046353"
}
四、Debezium 有关增量快照进度的通知
下表显示了报告增量快照状态的通知中可能存在的不同负载的示例:
- Start
{
"id":"ff81ba59-15ea-42ae-b5d0-4d74f1f4038f",
"aggregate_type":"Incremental Snapshot",
"type":"STARTED",
"additional_data":{
"connector_name":"my-connector",
"data_collections":"table1, table2"
},
"timestamp": "1695817046353"
}
- Paused
{
"id":"068d07a5-d16b-4c4a-b95f-8ad061a69d51",
"aggregate_type":"Incremental Snapshot",
"type":"PAUSED",
"additional_data":{
"connector_name":"my-connector",
"data_collections":"table1, table2"
},
"timestamp": "1695817046353"
}
- Resumed
{
"id":"a9468204-769d-430f-96d2-b0933d4839f3",
"aggregate_type":"Incremental Snapshot",
"type":"RESUMED",
"additional_data":{
"connector_name":"my-connector",
"data_collections":"table1, table2"
},
"timestamp": "1695817046353"
}
- Stopped
{
"id":"83fb3d6c-190b-4e40-96eb-f8f427bf482c",
"aggregate_type":"Incremental Snapshot",
"type":"ABORTED",
"additional_data":{
"connector_name":"my-connector"
},
"timestamp": "1695817046353"
}
- Processing chunk
{
"id":"d02047d6-377f-4a21-a4e9-cb6e817cf744",
"aggregate_type":"Incremental Snapshot",
"type":"IN_PROGRESS",
"additional_data":{
"connector_name":"my-connector",
"data_collections":"table1, table2",
"current_collection_in_progress":"table1",
"maximum_key":"100",
"last_processed_key":"50"
},
"timestamp": "1695817046353"
}
- Snapshot completed for a table
{
"id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
"aggregate_type":"Incremental Snapshot",
"type":"TABLE_SCAN_COMPLETED",
"additional_data":{
"connector_name":"my-connector",
"data_collection":"table1, table2",
"scanned_collection":"table1",
"total_rows_scanned":"100",
"status":"SUCCEEDED"
},
"timestamp": "1695817046353"
}
在前面的示例中,additional_data.status 字段可以包含以下值之一:
EMPTY:该表不包含任何值。
NO_PRIMARY_KEY:无法完成快照;表没有主键。
SKIPPED:无法完成此类表的快照。有关详细信息,请参阅日志。
SQL_EXCEPTION:执行快照时发生 SQL 异常。
SUCCEEDED:快照成功完成。
UNKNOWN_SCHEMA:找不到该表的架构。检查日志中已知表的列表。
- Completed
{
"id":"6d82a3ec-ba86-4b36-9168-7423b0dd5c1d",
"aggregate_type":"Incremental Snapshot",
"type":"COMPLETED",
"additional_data":{
"connector_name":"my-connector"
},
"timestamp": "1695817046353"
}
五、启用 Debezium 通知
要使 Debezium 能够发出通知,请通过设置 notification.enabled.channels 配置属性来指定通知通道列表。默认情况下,以下通知渠道可用:
- sink
- log
- jmx
重要的:要使用接收器通知通道,还必须将 notification.sink.topic.name 配置属性设置为希望 Debezium 发送通知的主题的名称。
六、访问 Debezium JMX 通知
要使 Debezium 能够报告通过 JMX beans 公开的事件,请完成以下配置步骤:
- 启用 JMX MBean 服务器以公开通知 bean。
- 将 jmx 添加到连接器配置中的 notification.enabled.channels 属性中。
- 将首选的 JMX 客户端连接到 MBean 服务器。
通知通过名称为 debezium..management.notifications. 的 bean 的“Notifications”属性公开。
下图显示了报告增量快照开始的通知:
要放弃通知,请对 bean 调用重置操作。
通知还公开为 debezium.notification 类型的 JMX 通知。要使应用程序能够侦听 MBean 发出的 JMX 通知,请为应用程序订阅通知。
七、自定义通知渠道
通知机制被设计为可扩展的。可以根据需要实施渠道,以最适合的环境的方式传递通知。添加通知通道涉及几个步骤:
- 为通道创建一个Java项目来实现通道,并添加Debezium Core作为依赖项。
- 部署通知通道。
- 通过修改连接器配置,使连接器能够使用自定义通知通道。
配置自定义通知渠道
自定义通知通道是实现 io.debezium.pipeline.notification.channels.NotificationChannel 服务提供者接口 (SPI) 的 Java 类。例如:
public interface NotificationChannel {
String name(); 1
void init(CommonConnectorConfig config); 2
void send(Notification notification); 3
void close(); 4
}
- 频道的名称。要使 Debezium 能够使用该通道,请在连接器的 notification.enabled.channels 属性中指定此名称。
- 初始化通道所需的特定配置、变量或连接。
- 在频道上发送通知。 Debezium 调用此方法来报告其状态。
- 关闭所有分配的资源。 Debezium 在连接器停止时调用此方法。
Debezium 核心模块依赖项
自定义通知通道 Java 项目具有对 Debezium 核心模块的编译依赖项。必须将这些编译依赖项包含在项目的 pom.xml 文件中,如以下示例所示:
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version>
</dependency>
${version.debezium} 表示 Debezium 连接器的版本。
在 META-INF/services/io.debezium.pipeline.notification.channels.NotificationChannel 文件中声明您的实现。
部署自定义通知渠道
先决条件:有一个自定义通知通道 Java 程序。
程序:要将通知通道与 Debezium 连接器结合使用,请将 Java 项目导出到 JAR 文件,然后将该文件复制到包含要与其一起使用的每个 Debezium 连接器的 JAR 文件的目录。
例如,在典型部署中,Debezium 连接器文件存储在 Kafka Connect 目录 (/kafka/connect) 的子目录中,每个连接器 JAR 位于其自己的子目录中 (/kafka/connect/debezium-connector-db2、/kafka /connect/debezium-connector-mysql 等)。要将信号通道与连接器一起使用,请将转换器 JAR 文件添加到连接器的子目录中。
注意:要将自定义通知通道与多个连接器一起使用,必须将通知通道 JAR 文件的副本放置在每个连接器子目录中。
配置连接器以使用自定义通知通道:在连接器配置中,将自定义通知通道的名称添加到 notification.enabled.channels 属性中。
八、应用案例
- Debezium系列之:实现增量快照incremental技术的详细步骤
- Debezium系列之:基于数据库信号表和Kafka信号Topic两种技术方案实现增量快照incremental技术的详细步骤
- Debezium系列之:深入理解临时阻塞快照
更多Debezium实战应用可以参考博主Debezium专栏:
- Debezium专栏,Debezium实战应用详细总结