原文地址: https://debezium.io/blog/2023/01/06/change-data-capture-with-questdb-and-debezium/
欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.
Change Data Capture with QuestDB and Debezium
January 6, 2023 by Yitaek Hwang
questdb kafka debezium time series
本教程最初由 问题b ,在哪里, 黄伊泰 向我们展示了如何将数据通过数据采集和卡夫卡连接来将数据流到测试数据库中。
现代数据体系结构在很大程度上已经从 电子语言 (萃取-转化-负载) 埃尔特 (提取-负载-转换),原始数据在应用转换之前先被加载到数据池(例如。,集合,连接)供进一步分析。传统的ETL管道很难维护,而且随着业务需求的变化相对灵活。由于新的云技术带来了更便宜的存储和更好的可伸缩性,数据管道可以从预先构建的提取和批处理上传转移到更流化的体系结构。
更改数据采集 (ccc)很好地适应了这一范式的转变,即从一个来源到数据的变化可以流到其他目的地。顾名思义,疾病预防控制中心跟踪数据的变化(通常是数据库),并提供插件来处理这些变化。对于事件驱动的架构,疾病预防控制中心作为服务边界之间的一致数据传递机制特别有用(例如。, 收发箱模式 )。在一个复杂的微服务环境中,疾病预防控制中心帮助简化数据传递逻辑,将负担卸下给疾病预防控制中心系统。
为了说明这一点,让我们使用一个参考体系结构来将库存更新从后格SQL流到查询db。一个简单的java春季应用程序通过标号来调查股票价格,并将当前价格更新到后GREGSQL数据库。然后更新被德贝齐姆检测到并输入到卡夫卡主题。最后,卡夫卡连接QISSTB连接器监听该主题,并将流更改为QSTSTB进行分析。
设计概述
这样结构化数据管道使应用程序变得简单。Java春季应用程序只需要获取最新的股票数据并提交到后GREGSQL。由于后格SQL是一个优秀的olp(事务性)数据库,因此应用程序可以依赖于酸依从性,以确保下游服务只看到提交的数据。应用程序开发人员不需要担心复杂的重试逻辑或不同步数据集。从数据库的角度来看,可以对后GERGSQL进行优化,以做它最擅长的事–事务查询。卡夫卡可以被用来可靠地将数据提供给其他端点,并且可以使用QSTSTDB来存储历史数据来运行分析查询和可视化。
所以不用再多说了,让我们来举个例子:
先决条件
垃圾
码头发动机:20.10+
成立
要在本地运行该示例,首先克隆 卡夫卡连接器回购 :
$ git clone https://github.com/questdb/kafka-questdb-connector.git
然后,导航到股票样本来构建和运行码头工人组成文件:
$ cd kafka-questdb-connector/kafka-questdb-connector-samples/stocks/
$ docker compose build
$ docker compose up
或更早的版本,compose 子命令可能不可用。你可以尝试执行docker-compose 代替docker compose .如果docker-compose 在你的销售中是不可用的,你可以 安装它 从手中。
这将为测试数据库中的Java弹簧应用程序/卡夫卡连接器构建码头文件,并向下拉下后格拉基(预先配置了德贝齐姆)、卡夫卡/动物园管理员、Qustdb和格拉法纳容器。卡夫卡和卡夫卡连接需要一点初始化。通过检查连接容器等待日志停止.
Start the Debezium connector
在这一点上,Java应用程序正在不断更新"后格SQL"中的股票表,但连接尚未设置。创建德贝兹接头(即。通过执行下列措施:
curl -X POST -H “Content-Type: application/json” -d '{“name”:“debezium_source”,“config”:{“tasks.max”:1,“database.hostname”:“postgres”,“database.port”:5432,“database.user”:“postgres”,“database.password”:“postgres”,“connector.class”:“io.debezium.connector.postgresql.PostgresConnector”,“database.dbname”:“postgres”,“database.server.name”:“dbserver1”}} ’ localhost:8083/connectors
Start the QuestDB Kafka Connect sink
通过创建kafka -> QuestDB
curl -X POST -H “Content-Type: application/json” -d ‘{“name”:“questdb-connect”,“config”:{“topics”:“dbserver1.public.stock”,“table”:“stock”, “connector.class”:“io.questdb.kafka.QuestDBSinkConnector”,“tasks.max”:“1”,“key.converter”:“org.apache.kafka.connect.storage.StringConverter”,“value.converter”:“org.apache.kafka.connect.json.JsonConverter”,“host”:“questdb”, “transforms”:“unwrap”, “transforms.unwrap.type”:“io.debezium.transforms.ExtractNewRecordState”, “include.key”: “false”, “symbols”: “symbol”, “timestamp.field.name”: “last_update”}}’ localhost:8083/connectors
最后结果
现在,写到后GERGSQL表的所有更新也将反映在查询数据库中。去验证,导航到 http://localhost:19000 从库存表中选择:
SELECT * FROM stock;
您还可以运行聚合进行更复杂的分析:
SELECT
timestamp,
symbol,
avg(price),
min(price),
max(price)
FROM stock
WHERE symbol = ‘IBM’
SAMPLE BY 1m ALIGN TO CALENDAR;
最后,您可以与格拉法纳仪表板交互,以便在 http://localhost:3000/d/stocks/stocks?orgId=1&refresh=5s&viewPanel=2 .
可视化是一个蜡烛图表,由Debezns捕捉到的变化组成;每一根蜡烛在给定的时间间隔中显示开、关、高、低价格。时间间隔可以通过选择左上角"时间间隔"选项来改变:
Grafana candle chart
深潜
现在我们已经启动并运行了示例应用程序,让我们深入研究一下 股票 例子。
我们将查看以下文件:
├── kafka-questdb-connector/kafka-questdb-connector-samples/stocks/
│ ├── Dockerfile-App
| | – The Dockerfile to package our Java App
| ├── Dockerfile-Connect
| | – The Dockerfile to combine the Debezium container
| | – image the with QuestDB Kafka connector
│ ├── src/main/resources/schema.sql
| | – The SQL which creates the stock table in PostgreSQL
| | – and populates it with initial data
│ ├── src/main/java/com/questdb/kafka/connector/samples/StocksApplication.java
| | – The Java Spring App which updates the stock table in PostgreSQL
| | – in regular intervals
…
生产者(爪哇应用程序)
制作人是一个简单的Java弹簧启动应用程序。它有两个组成部分:
… 架构。 文件。此文件用于在后GREGSQL中创建库存表并填充其初始数据。它被弹簧启动应用程序接收到并在启动时执行。
[source,sql]
如果没有存货,则创建表格(
序列主键,
symbol varchar(10) unique,
价格浮动8,
更新时间戳
);
插入库存(符号、价格、上一次更新)值(‘aapl’,500.0,现在())冲突时无所作为;
插入库存(符号,价格,最新)值(“IBM”,50.0,现在()在冲突中无所作为;
插入库存(符号、价格、上一次更新)值,在冲突时不做任何事;
插入库存(符号、价格、上一次更新)值(“谷歌”,100.0,现在())冲突时无所作为;
插入库存(符号、价格、上一次更新)值(‘fb’,200.0,现在())冲突时无所作为;
插入库存(符号、价格、上一次更新)值(‘amzn’,100.0,现在())冲突时无所作为;
插入库存(符号、价格、上一次更新)值(‘tsla’,500.0,现在())冲突时无所作为;
插入库存(符号、价格、上一次更新)值(“nflx”,500.0,现在())冲突时无所作为;
插入库存(符号、价格、上一次更新)值(‘ttr’,50.0,现在())冲突时无所作为;
插入库存(符号、价格、最新)值(“突然”,10.0,现在())冲突时无所作为;
"关于冲突无所作为"条款被用来避免重复条目的出现。
重新启动应用时的表。
Java代码 以随机值更新价格和时间戳。更新并不是完全随机的,应用程序使用一个非常简单的算法生成更新,非常类似于股票价格的波动。在现实的场景中,应用程序将从某些外部来源获得价格。
生产者被包装成最小的文件, 码头文件 ,并连接到后格来格:
从马文出发:3.8-JDK-11小建筑
收到。/1.xml/选择系统/库存/
收到。/特别资源中心。/选择方案/库存/战略资源中心
工作量/选择方案/库存
运行MVN清洁安装-Dskip测试
来自阿祖尔/祖卢-开放jdk:最新11
复制–来自=建造者/被占领巴勒斯坦领土/库存/目标/卡夫卡–样本-库存–*。罐/罐
CMD [“java”, “-jar”, “/stocks.jar”]
卡夫卡连接器、德贝兹和卡夫卡连接器
在我们深入研究卡夫卡连接、德贝兹和卡夫卡连接器的配置之前,让我们看看它们之间的关系。
卡夫卡连接是构建连接器以在卡夫卡和其他系统之间移动数据的框架。它支持2类连接器:
源连接器-从源系统读取数据并将其写入卡夫卡
插入连接器-读取卡夫卡的数据并将其写入接收系统
Debezum是卡夫卡连接的源连接器,可以监视和捕获数据库中的行级更改。它是什么意思?每当在数据库中插入、更新或删除一行时,Debezum将捕获更改并将其作为事件写入卡夫卡。
在技术层面上,德贝兹是在卡夫卡连接框架内运行的卡夫卡连接器。这反映在 德贝兹容器图像 ,该包卡夫卡连接与德贝兹连接器预先安装。
卡夫卡连接器也是卡夫卡连接器。它是一个接收器连接器,可以读取卡夫卡的数据并将其写入QISSTDB。我们添加了QESTDB卡夫卡连接器到德贝兹容器图像,我们得到了一个卡夫卡连接图像,两者都有德贝兹和QSTDB卡夫卡连接器安装!
这是我们用来制作图像的码头文件:
( 码头文件连接 )
来自乌本图:最新的建筑师
工作场所/选择项目
运行程序-获得更新和应用-获得安装-Y旋涡解压缩JQ
RUN curl -s https://api.github.com/repos/questdb/kafka-questdb-connector/releases/latest | jq -r ‘.assets[]|select(.content_type == “application/zip”)|.browser_download_url’|wget -qi -
运行解压缩卡夫卡-奎斯特-连接*-绑定。
从德贝兹/连接点:1.9.6.最后
COPY --from=builder /opt/kafka-questdb-connector/*.jar /kafka/connect/questdb-connector/
码头文件下载了最新发布的QESTDB卡夫卡连接器,解压缩将其复制到Debezum容器图像。生成的图像同时安装了德贝兹和奎斯特卜卡夫卡连接器:
加装卡夫卡连接器层
整个卡夫卡连接器与源连接器和接收器连接器完成:
源和汇连接器如何与卡夫卡集群和数据库一起工作
德贝兹接头
我们已经知道,Debezum是一个卡夫卡连接连接器,可以监视和捕获数据库中的行级更改。我们也有一个码头图像,有德贝兹和奎斯特卡夫卡连接器安装。然而,此时两个连接器都不运行。我们需要配置并启动它们。这是通过向卡夫卡连接RESTAPI发送一个邮件请求的CAR命令来完成的。
curl -X POST -H “Content-Type: application/json” -d '{“name”:“debezium_source”,“config”:{“tasks.max”:1,“database.hostname”:“postgres”,“database.port”:5432,“database.user”:“postgres”,“database.password”:“postgres”,“connector.class”:“io.debezium.connector.postgresql.PostgresConnector”,“database.dbname”:“postgres”,“database.server.name”:“dbserver1”}} ’ localhost:8083/connectors
请求体包含德贝兹连接器的配置,让我们分解它:
{
“name”: “debezium_source”,
“config”: {
“tasks.max”: 1,
“database.hostname”: “postgres”,
“database.port”: 5432,
“database.user”: “postgres”,
“database.password”: “postgres”,
“connector.class”: “io.debezium.connector.postgresql.PostgresConnector”,
“database.dbname”: “postgres”,
“database.server.name”: “dbserver1”
}
}
它听后GREGSQL数据库中的更改,并使用上述配置向卡夫卡发布。主题名称默认为..
所以在我们发出请求后,德贝佐姆将开始倾听stock 把它们放在桌上发表到dbserver1.public.stock 专题。
卡夫卡连接器
在这一点上,我们有一个后格勒克表stock 充满了随机股价和卡夫卡主题dbserver1.public.stock 包含了更改。下一个步骤是将Qustdb卡夫卡连接器配置为从dbserver1.public.stock 主题并将数据写到查询数据库。
让我们更深入地研究一下 启动卡夫卡连接槽 :
{
“name”: “questdb-connect”,
“config”: {
“topics”: “dbserver1.public.stock”,
“table”: “stock”,
“connector.class”: “io.questdb.kafka.QuestDBSinkConnector”,
“tasks.max”: “1”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“host”: “questdb”,
“transforms”: “unwrap”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
“include.key”: “false”,
“symbols”: “symbol”,
“timestamp.field.name”: “last_update”
}
}
这里需要注意的是:
table 和topics :"卡夫卡"连接器将创建带有名称的"卡夫卡"表stock 把数据写在dbserver1.public.stock 它的主题。
host :卡夫卡连接器将连接到运行在questdb 招待。这是查询容器的名称。
connector.class:卡夫卡连接器类名称。这告诉卡夫卡连接使用卡夫卡连接器。
value.converter :德贝兹连接器以JSON格式生成数据。这就是为什么我们需要配置查询数据库连接器来使用JSON转换器来读取数据:org.apache.kafka.connect.json.JsonConverter .
symbols *股票符号被翻译成 查询符号类型 ,用于低基数的字串值(例如。…
timestamp.field.name当前位置:由于QESTDB对时间戳和基于时间戳的分区有很大的支持,我们可以指定指定的时间戳列。
transforms :打开场用途io.debezium.transforms.ExtractNewRecordState 类型只提取新数据,而不提取Debezns发布的元数据。换句话说,这是一个过滤器payload.after 关于卡夫卡主题的德贝兹数据的一部分。看到 文件 更多的细节。
…ExtractNewRecordState 转换可能是配置中最不直观的部分。让我们更仔细地看看它:简而言之,对于后面的SQL表中的每一个变化,Debezum都会向卡夫卡主题发送一个JSON消息,比如:
{
“schema”: “This JSON key contains Debezium message schema. It’s not very relevant for this sample. Omitted for brevity.”,
“payload”: {
“before”: null,
“after”: {
“id”: 8,
“symbol”: “NFLX”,
“price”: 1544.3357414199545,
“last_update”: 1666172978269856
}
},
“source”: {
“version”: “1.9.6.Final”,
“connector”: “postgresql”,
“name”: “dbserver1”,
“ts_ms”: 1666172978272,
“snapshot”: “false”,
“db”: “postgres”,
“sequence”: “[“87397208”,“87397208”]”,
“schema”: “public”,
“table”: “stock”,
“txId”: 402087,
“lsn”: 87397208,
“xmin”: null
},
“op”: “u”,
“ts_ms”: 1666172978637,
“transaction”: null
}
如果你被这条信息的尺寸吓到了,不要害怕。大多数字段是元数据,它们与此示例无关。看 德贝兹文件 想知道更多细节。重要的是,我们不能将整个JSON消息推到查询数据库,而且我们也不希望查询数据库中的所有元数据。我们需要取出payload.after 邮件的一部分,然后将其推到查询。这正是ExtractNewRecordState 转换后会:它将大消息转换为一个较小的信息,只包含payload.after 一部分信息。因此,这条信息看起来就像这样:
{
“id”: 8,
“symbol”: “NFLX”,
“price”: 1544.3357414199545,
“last_update”: 1666172978269856
}
这是我们可以推敲的信息。卡夫卡连接器将读取此消息并将其写入到卡夫达表。如果QESTB卡夫卡表不存在,则该连接器也将创建该表。查询表将具有与JSON消息相同的模式–其中每个JSON字段将是查询表中的一列。
问题b和格拉法纳
一旦数据写入了查询表,我们就可以更容易地处理时间序列数据。由于QESTDB是兼容后GREQLY协议的,所以我们可以使用格拉法纳上的后GRESQL数据源来可视化数据。预先配置的仪表板使用以下查询:
SELECT
$__time(timestamp),
min(price) as low,
max(price) as high,
first(price) as open,
last(price) as close
FROM
stock
WHERE
KaTeX parse error: Expected group after '_' at position 1: _̲_timeFilter(tim…Symbol’
SAMPLE BY $Interval ALIGN TO CALENDAR;
我们已经创建了一个系统,它可以连续地追踪和存储多个股票的最新价格。然后这些价格作为事件通过德贝齐姆输入卡夫卡,这抓住了每一个价格变化。卡夫卡连接器读取卡夫卡的这些事件,并将每一个变化作为卡夫卡的一个新行存储,使我们能够保留一个完整的股票价格历史。这段历史可以通过使用工具(例如格拉法纳)进行分析和可视化,如蜡烛图所示。
下一步
这个示例项目是将数据从关系数据库流到优化的时间序列数据库的基本参考体系结构。对于使用后GERGSQL的现有项目,可以对Debezum进行配置,以启动将数据流到查询db,并利用时间序列查询和分区的优势。对于同时存储原始历史数据的数据库,采用Debezum可能需要一些架构更改。然而,这是有益的,因为这是一个在事务数据库和分析性时序数据库之间改进性能和建立服务边界的机会。
这个引用架构也可以扩展到配置卡夫卡连接到流到其他数据仓库进行长期存储。在检查数据后,测试数据也可以配置为为更长期的存储,甚至可以对数据进行向下采样。 分离分区以节省空间 .
把这个给我 抽样应用 尝试加入 休闲社区 如果你有任何问题的话。