原文地址: https://debezium.io/blog/2020/04/09/using-debezium-with-apicurio-api-schema-registry/
欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.
将 Debezium 与 Apicurio API 和架构注册表结合使用
2020 年 4 月 9 日 作者: Jiri Pechanec
模式 avro apicurio
Debezium 从数据库流式传输的更改事件(用开发人员的话说)是强类型的。这意味着事件使用者应该了解事件中传递的数据类型。传递消息类型数据的问题可以通过多种方式解决:
消息结构以带外方式传递给消费者,消费者能够处理其中存储的数据
消息包含嵌入在消息中的元数据(模式)
该消息包含对注册表的引用,该注册表包含关联的元数据
第一种情况的一个例子是 Apache Kafka 众所周知的JsonConverter. 它可以以两种模式运行——有模式和无模式。当配置为不使用模式时,它会生成一条简单的 JSON 消息,其中消费者需要事先了解每个字段的类型,或者需要执行启发式规则来“猜测”并将值映射到数据类型。虽然这种方法非常灵活,但它对于更高级的情况可能会失败,例如编码为字符串的时间或其他语义类型。此外,与类型相关的约束通常会丢失。
以下是此类消息的示例:
{
“before”: null,
“after”: {
“id”: 1001,
“first_name”: “Sally”,
“last_name”: “Thomas”,
“email”: “sally.thomas@acme.com”
},
“source”: {
“version”: “1.1.0.Final”,
“connector”: “mysql”,
“name”: “dbserver1”,
“ts_ms”: 0,
“snapshot”: “true”,
“db”: “inventory”,
“table”: “customers”,
“server_id”: 0,
“gtid”: null,
“file”: “mysql-bin.000003”,
“pos”: 154,
“row”: 0,
“thread”: null,
“query”: null
},
“op”: “c”,
“ts_ms”: 1586331101491,
“transaction”: null
}
请注意,除了 JSON 的基本类型系统之外,不存在任何类型信息。例如,消费者无法从事件本身推断出数字id字段的长度。
第二种情况的例子又是JsonConverter。通过其schemas.enable选项,JSON 消息将由两部分组成 -schema和payload。该payload部分与前一个案例完全相同;该schema部分包含消息的描述、其字段、字段类型和关联的类型约束。这使得消费者能够以类型安全的方式处理消息。这种方法的缺点是消息大小显着增加,因为模式是一个相当大的对象。由于架构很少更改(您多久更改一次数据库表列的定义?),因此将架构添加到每个事件都会带来巨大的开销。
以下带有模式的消息示例清楚地表明模式本身可能比有效负载大得多,并且使用起来不太经济:
{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“field”: “id”
},
{
“type”: “string”,
“optional”: false,
“field”: “first_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “last_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “email”
}
],
“optional”: true,
“name”: “dbserver1.inventory.customers.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“field”: “id”
},
{
“type”: “string”,
“optional”: false,
“field”: “first_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “last_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “email”
}
],
“optional”: true,
“name”: “dbserver1.inventory.customers.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “version”
},
{
“type”: “string”,
“optional”: false,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_ms”
},
{
“type”: “string”,
“optional”: true,
“name”: “io.debezium.data.Enum”,
“version”: 1,
“parameters”: {
“allowed”: “true,last,false”
},
“default”: “false”,
“field”: “snapshot”
},
{
“type”: “string”,
“optional”: false,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “table”
},
{
“type”: “int64”,
“optional”: false,
“field”: “server_id”
},
{
“type”: “string”,
“optional”: true,
“field”: “gtid”
},
{
“type”: “string”,
“optional”: false,
“field”: “file”
},
{
“type”: “int64”,
“optional”: false,
“field”: “pos”
},
{
“type”: “int32”,
“optional”: false,
“field”: “row”
},
{
“type”: “int64”,
“optional”: true,
“field”: “thread”
},
{
“type”: “string”,
“optional”: true,
“field”: “query”
}
],
“optional”: false,
“name”: “io.debezium.connector.mysql.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “total_order”
},
{
“type”: “int64”,
“optional”: false,
“field”: “data_collection_order”
}
],
“optional”: true,
“field”: “transaction”
}
],
“optional”: false,
“name”: “dbserver1.inventory.customers.Envelope”
},
“payload”: {
“before”: null,
“after”: {
“id”: 1001,
“first_name”: “Sally”,
“last_name”: “Thomas”,
“email”: “sally.thomas@acme.com”
},
“source”: {
“version”: “1.1.0.Final”,
“connector”: “mysql”,
“name”: “dbserver1”,
“ts_ms”: 0,
“snapshot”: “true”,
“db”: “inventory”,
“table”: “customers”,
“server_id”: 0,
“gtid”: null,
“file”: “mysql-bin.000003”,
“pos”: 154,
“row”: 0,
“thread”: null,
“query”: null
},
“op”: “c”,
“ts_ms”: 1586331101491,
“transaction”: null
}
}
登记处
第三种方法结合了前两种方法的优点,同时消除了前两种方法的缺点,但代价是引入了一个新组件(注册表),用于存储和版本消息模式。
有多种模式注册表实现可用;接下来我们将重点关注Apicurio 注册表,它是一个开源(Apache 许可证 2.0)API 和架构注册表。该项目不仅提供注册表本身,还提供客户端库,并以序列化器和转换器的形式与 Apache Kafka 和 Kafka Connect 紧密集成。
Apicurio 使 Debezium 和消费者能够交换其模式存储在注册表中的消息,并且仅传递对消息本身中的模式的引用。随着捕获的源表的结构和消息模式的发展,注册表也会创建模式的新版本,因此不仅当前模式而且历史模式都可用。
Apicurio 提供多种开箱即用的序列化格式:
具有外部化模式支持的 JSON
阿帕奇阿夫罗
协议缓冲区
每个序列化器和反序列化器都知道如何自动与 Apicurio API 交互,因此消费者作为实现细节与其隔离。唯一需要的信息是注册表的位置。
Apicurio 还为 IBM 和 Confluence 的模式注册表提供 API 兼容层。这是一个非常有用的功能,因为它允许使用kafkacat等第三方工具,即使它们不知道 Apicurio 的本机 API。
JSON转换器
在 Debezium 示例存储库中,有一个基于Docker Compose的示例,它与标准 Debezium 教程示例设置并行部署 Apicurio 注册表。
图片来自于原文
图 1. 部署拓扑
要遵循该示例,您需要克隆 Debezium示例存储库。
自 Debezium 1.2 起,Debezium 容器映像附带了 Apicurio 转换器支持。
您可以通过使用debezium/connect或debezium/connect-base图像版本 >=1.2 并添加环境变量来启用 Apicurio 转换器ENABLE_APICURIO_CONVERTERS=true。
$ cd tutorial
$ export DEBZIUM_VERSION=1.1
Start the deployment
$ docker-compose -f docker-compose-mysql-apicurio.yaml up -d --build
Start the connector
curl -i -X POST -H “Accept:application/json”
-H “Content-Type:application/json”
http://localhost:8083/connectors/ -d @register-mysql-apicurio-converter-json.json
Read content of the first message
$ docker run --rm --tty
–network tutorial_default debezium/tooling bash
-c ‘kafkacat -b kafka:9092 -C -o beginning -q -t dbserver1.inventory.customers -c 1 | jq .’
结果消息应如下所示:
{
“schemaId”: 48,
“payload”: {
“before”: null,
“after”: {
“id”: 1001,
“first_name”: “Sally”,
“last_name”: “Thomas”,
“email”: “sally.thomas@acme.com”
},
“source”: {
“version”: “1.1.0.Final”,
“connector”: “mysql”,
“name”: “dbserver1”,
“ts_ms”: 0,
“snapshot”: “true”,
“db”: “inventory”,
“table”: “customers”,
“server_id”: 0,
“gtid”: null,
“file”: “mysql-bin.000003”,
“pos”: 154,
“row”: 0,
“thread”: null,
“query”: null
},
“op”: “c”,
“ts_ms”: 1586334283147,
“transaction”: null
}
}
JSON 消息包含完整的有效负载,同时包含对 id 模式的引用48。可以使用idDebezium 文档定义的模式符号名称从注册表查询模式。在这种情况下,两个命令
$ docker run --rm --tty
–network tutorial_default
debezium/tooling bash -c ‘http http://apicurio:8080/ids/64 | jq .’
$ docker run --rm --tty
–network tutorial_default
debezium/tooling bash -c ‘http http://apicurio:8080/artifacts/dbserver1.inventory.customers-value | jq .’
产生相同的模式描述:
{
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int32”,
“optional”: false,
“field”: “id”
},
{
“type”: “string”,
“optional”: false,
“field”: “first_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “last_name”
},
{
“type”: “string”,
“optional”: false,
“field”: “email”
}
],
“optional”: true,
“name”: “dbserver1.inventory.customers.Value”,
“field”: “before”
},
…
],
“optional”: false,
“name”: “dbserver1.inventory.customers.Envelope”
}
这与我们之前在“JSON with schema”示例中看到的相同。
连接器注册请求与前一个请求有几行不同:
…
“key.converter”: “io.apicurio.registry.utils.converter.ExtJsonConverter”,
“key.converter.apicurio.registry.url”: “http://apicurio:8080”,
“key.converter.apicurio.registry.global-id”:
“io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy”,
“value.converter”: “io.apicurio.registry.utils.converter.ExtJsonConverter”,
“value.converter.apicurio.registry.url”: “http://apicurio:8080”,
“value.converter.apicurio.registry.global-id”:
“io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy”
…
Apicurio JSON 转换器用作键和值转换器
Apicurio 注册表端点
此设置确保可以自动注册架构 ID,这是 Debezium 部署中的典型设置
Avro 转换器
到目前为止,我们仅演示了将消息序列化为 JSON 格式。虽然在注册表中使用 JSON 格式有很多优点,例如易于人类阅读,但它仍然不太节省空间。
要真正只传输数据而不产生任何重大开销,使用 Avro 格式等二进制格式序列化非常有用。在这种情况下,我们将仅打包数据,而无需任何字段名称和其他仪式,并且该消息将再次包含对存储在注册表中的模式的引用。
让我们看看 Avro 序列化如何轻松地与 Apicurio 的 Avro 转换器一起使用。
Tear down the previous deployment
$ docker-compose -f docker-compose-mysql-apicurio.yaml down
Start the deployment
$ docker-compose -f docker-compose-mysql-apicurio.yaml up -d --build
Start the connector
curl -i -X POST -H “Accept:application/json”
-H “Content-Type:application/json”
http://localhost:8083/connectors/
-d @register-mysql-apicurio-converter-avro.json
我们可以使用模式名称查询注册表:
$ docker run --rm --tty
–network tutorial_default
debezium/tooling
bash -c ‘http http://apicurio:8080/artifacts/dbserver1.inventory.customers-value | jq .’
生成的模式描述与之前的模式描述略有不同,因为它具有 Avro 风格:
{
“type”: “record”,
“name”: “Envelope”,
“namespace”: “dbserver1.inventory.customers”,
“fields”: [
{
“name”: “before”,
“type”: [
“null”,
{
“type”: “record”,
“name”: “Value”,
“fields”: [
{
“name”: “id”,
“type”: “int”
},
{
“name”: “first_name”,
“type”: “string”
},
{
“name”: “last_name”,
“type”: “string”
},
{
“name”: “email”,
“type”: “string”
}
],
“connect.name”: “dbserver1.inventory.customers.Value”
}
],
“default”: null
},
{
“name”: “after”,
“type”: [
“null”,
“Value”
],
“default”: null
},
…
],
“connect.name”: “dbserver1.inventory.customers.Envelope”
}
连接器注册请求与标准请求也有几行不同:
…
“key.converter”: “io.apicurio.registry.utils.converter.AvroConverter”,
“key.converter.apicurio.registry.url”: “http://apicurio:8080”,
“key.converter.apicurio.registry.converter.serializer”:
“io.apicurio.registry.utils.serde.AvroKafkaSerializer”,
“key.converter.apicurio.registry.converter.deserializer”:
“io.apicurio.registry.utils.serde.AvroKafkaDeserializer”,
“key.converter.apicurio.registry.global-id”:
“io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy”,
“value.converter”: “io.apicurio.registry.utils.converter.AvroConverter”,
“value.converter.apicurio.registry.url”: “http://apicurio:8080”,
“value.converter.apicurio.registry.converter.serializer”:
“io.apicurio.registry.utils.serde.AvroKafkaSerializer”,
“value.converter.apicurio.registry.converter.deserializer”:
“io.apicurio.registry.utils.serde.AvroKafkaDeserializer”,
“value.converter.apicurio.registry.global-id”:
“io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy”,
…
Apicurio Avro 转换器用作键和值转换器
Apicurio 注册表端点
规定转换器应使用哪个串行器和解串器
此设置确保可以自动注册架构 ID,这是 Debezium 部署中的典型设置
例如,为了演示接收器端消息的消耗,我们可以使用Kafka Connect Elasticsearch 连接器。接收器配置将再次仅通过转换器配置进行扩展,并且接收器连接器可以使用启用 Avro 的主题,而不需要任何其他更改。
{
“name”: “elastic-sink”,
“config”: {
“connector.class”: “io.confluent.connect.elasticsearch.ElasticsearchSinkConnector”,
“tasks.max”: “1”,
“topics”: “customers”,
“connection.url”: “http://elastic:9200”,
“transforms”: “unwrap,key”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“transforms.unwrap.drop.tombstones”: “false”,
“transforms.key.type”: “org.apache.kafka.connect.transforms.ExtractField$Key”,
“transforms.key.field”: “id”,
“key.ignore”: “false”,
“type.name”: “customer”,
“behavior.on.null.values”: “delete”,
"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url": "http://apicurio:8080",
"key.converter.apicurio.registry.converter.serializer":
"io.apicurio.registry.utils.serde.AvroKafkaSerializer",
"key.converter.apicurio.registry.converter.deserializer":
"io.apicurio.registry.utils.serde.AvroKafkaDeserializer",
"key.converter.apicurio.registry.global-id":
"io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url": "http://apicurio:8080",
"value.converter.apicurio.registry.converter.serializer":
"io.apicurio.registry.utils.serde.AvroKafkaSerializer",
"value.converter.apicurio.registry.converter.deserializer":
"io.apicurio.registry.utils.serde.AvroKafkaDeserializer",
"value.converter.apicurio.registry.global-id":
"io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",
}
}
结论
在本文中,我们讨论了消息/模式关联的多种方法。Apicurio 注册表是作为模式存储和版本控制的解决方案提出的,我们已经演示了 Apicurio 如何与 Debezium 连接器集成,以有效地将带有模式的消息传递给消费者。
您可以在 GitHub 上 Debezium 示例存储库的教程项目中找到将 Debezium 连接器与 Apicurio 注册表一起使用的完整示例。