Debezium日常分享系列之:Debezium 3.0.2.Final Released
- 新功能和改进
- 核心
- 使用never snapshot模式执行阻塞快照
- MongoDB
- Oracle
- Vitess
- JDBC sink连接器的更改
- Debezium Operator
- 更多Debezium技术内容
新功能和改进
Debezium 3.0.2.Final引入了一些改进和功能,让我们逐个看看每个功能。
核心
使用never snapshot模式执行阻塞快照
Debezium阻塞快照过程旨在根据提供的信号执行初始快照,选择性地发出一个或多个表的历史数据。当此功能与never snapshot模式配对时,会导致意外行为。
在此版本中,我们修改了连接器偏移量,以跟踪配置的snapshot.mode,允许阻塞快照在有信号时成功执行初始快照,即使配置了never perform a snapshot的快照模式。这使用户可以在此配置下安全地使用此功能。
由于连接器偏移存储的变化,一旦连接器升级到 3.0.2.Final 或更高版本,则连接器无法降级到 3.0.1.Final 或更早版本。
MongoDB
RowsChanged JMX度量类型已更改
在MongoDB连接器的早期版本中,RowsChanged JMX度量以java.util.Map的形式公开,这与关系连接器上公开的相同JMX度量TabularData相矛盾。在3.0.2.Final中已经修复了这个问题,JMX度量在所有连接器实现中均使用TabularData以实现统一性。
如果您以前捕获了RowsChanged,任何现有的MongoDB JMX管道可能需要进行调整。
Oracle
更高精度的时间戳
传统上,Debezium for Oracle在挖掘会话连接上设置的NLS会话属性控制下,以毫秒精度发出列时间戳值。精度得到改进,并提供了基于纳秒(也称为FF9)的值。
发出的字段类型基于列的数据类型,因此发出的字段数据类型保持不变。改变的是那些具有微秒或纳秒为基础值的列的情况,在这些列以前为零的情况下,现在它们将具有非零值。
警告或跳过DML异常
可以配置event.processing.failure.handling.mode以使其在特定连接器错误条件下失败、警告或跳过,以提高连接器对各种数据问题的可靠性。此配置在观察到DDL失败时用于控制Oracle连接器的行为。
在此版本中,event.processing.failure.handling.mode也用于控制基于DML的事件的故障。如果Oracle连接器在解析您的插入、更新或删除操作时出现问题,您可以根据需要安全地配置连接器以失败、警告或跳过DML事件。
默认行为是在连接器无法安全处理事件时始终失败。通过将其调整为警告或跳过,虽然连接器将在失败的事件之后安全地继续进行,但会引入数据丢失,需要手动处理。
Vitess
性能改进
在早期版本的Debezium for Vitess连接器中,连接器使用基于正则表达式的过滤系统,根据不同后缀的前缀匹配所有表,并根据配置应用后续的排除。这可能会浪费CPU资源并创建热点,因为会为稍后要过滤和垃圾回收的事件创建中间对象。
在此版本中,我们通过在事件处理链中更早地应用过滤来改进Vitess连接器处理此用例的方式。这应该减少创建的中间对象数量,并提高连接器的整体性能。对于具有相同前缀和不同后缀的关键空间,这应该比旧版本提供更好的整体性能。
JDBC sink连接器的更改
由于sink模块使用的是集合而不是表的命名约定,因此已弃用并替换了几个配置属性。旧属性将在Debezium 3.0.x版本中继续工作;但是在Debezium 3.1版本中将被删除。
- table.name.format属性被collection.name.format属性替换。
- table.naming.strategy属性被collection.naming.strategy属性替换。
此外,由table.naming.strategy属性指定的io.debezium.connector.jdbc.naming.TableNamingStrategy的约定已被弃用。引入了一个新的io.debezium.sink.naming.CollectionNamingStrategy,具有略微不同的签名。
TableNamingStrategy contract
/**
* Resolves the logical table name from the sink record.
*
* @param config sink connector configuration, should not be {@code null}
* @param record Kafka sink record, should not be {@code null}
* @return the resolved logical table name; if {@code null} the record should not be processed
*/
String resolveTableName(JdbcSinkConnectorConfig config, SinkRecord record);
CollectionNamingStrategy contract
/**
* Resolves the logical collection name from the Debezium sink record.
*
* @param record Debezium sink record, should not be {@code null}
* @param collectionNameFormat the format string for the collection name (mapped from the topic name)
* @return the resolved logical collection name; if {@code null} the record should not be processed
*/
String resolveCollectionName(DebeziumSinkRecord record, String collectionNameFormat);
主要的区别包括新的DebeziumSinkRecord取代了SinkRecord,并显式传递集合命名格式而不是配置类。
如果您在Debezium JDBC sink连接器的部署中实现了自定义的TableNamingStrategy,请确保调整您的代码以使用新的CollectionNamingStrategy,以便在更新到Debezium 3.1+时您的流水线能够继续安全运行。
Debezium Operator
启用Debezium Server REST端点
现在,可以通过使用Debezium Operator在Kubernetes上部署Debezium Server来自动启用Debezium Server的API REST端点。在部署描述符的spec部分中,您可以包含runtime.api.enabled属性来切换API端点,如下所示。
YAML 配置示例
apiVersion: debezium.io/v1alpha1
kind: DebeziumServer
metadata:
name: my-debezium
spec:
image: quay.io/debezium/server:3.0.2.Final
quarkus:
config:
log.console.json: false
kubernetes-config.enabled: true
kubernetes-config.secrets: postgresql-credentials
runtime:
api:
enabled: true
sink:
type: kafka
config:
producer.bootstrap.servers: dbz-kafka-kafka-bootstrap:9092
producer.key.serializer: org.apache.kafka.common.serialization.StringSerializer
producer.value.serializer: org.apache.kafka.common.serialization.StringSerializer
source:
class: io.debezium.connector.postgresql.PostgresConnector
offset:
memory: { }
schemaHistory:
memory: { }
config:
database.hostname: postgresql
database.port: 5432
database.user: ${POSTGRES_USER}
database.password: ${POSTGRES_PASSWORD}
database.dbname: ${POSTGRES_DB}
topic.prefix: inventory
schema.include.list: inventory
默认情况下,Debezium Server API 端点是禁用的,但可以通过将 spec.runtime.api.enabled 设置为 true 来启用,如上所示。
更多Debezium技术内容
更多Debezium技术请参考:
- Debezium技术专栏