TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka

TiDB数据库从入门到精通系列之六:使用 TiCDC 将 TiDB 的数据同步到 Apache Kafka

  • 一、技术流程
  • 二、搭建环境
  • 三、创建Kafka changefeed
  • 四、写入数据以产生变更日志
  • 五、配置 Flink 消费 Kafka 数据

一、技术流程

  • 快速搭建 TiCDC 集群、Kafka 集群和 Flink 集群
  • 创建 changefeed,将 TiDB 增量数据输出至 Kafka
  • 使用 go-tpc 写入数据到上游 TiDB
  • 使用 Kafka console consumer 观察数据被写入到指定的 Topic
  • (可选)配置 Flink 集群消费 Kafka 内数据

二、搭建环境

部署包含 TiCDC 的 TiDB 集群

在实验或测试环境中,可以使用 TiUP Playground 功能,快速部署 TiCDC,命令如下:

tiup playground --host 0.0.0.0 --db 1 --pd 1 --kv 1 --tiflash 0 --ticdc 1
# 查看集群状态
tiup status

三、创建Kafka changefeed

1.创建 changefeed 配置文件

根据 Flink 的要求和规范,每张表的增量数据需要发送到独立的 Topic 中,并且每个事件需要按照主键值分发 Partition。因此,需要创建一个名为 changefeed.conf 的配置文件,填写如下内容:

[sink]
dispatchers = [
{matcher = ['*.*'], topic = "tidb_{schema}_{table}", partition="index-value"},
]

2.创建一个 changefeed,将增量数据输出到 Kafka

tiup ctl:v<CLUSTER_VERSION> cdc changefeed 
create --server="http://127.0.0.1:8300" 
--sink-uri="kafka://127.0.0.1:9092/kafka-topic-name?protocol=canal-json" 
--changefeed-id="kafka-changefeed" 
--config="changefeed.conf"

如果命令执行成功,将会返回被创建的 changefeed 的相关信息,包含被创建的 changefeed 的 ID 以及相关信息,内容如下:

Create changefeed successfully!
ID: kafka-changefeed
Info: {... changfeed info json struct ...}

如果命令长时间没有返回,你需要检查当前执行命令所在服务器到 sink-uri 中指定的 Kafka 机器的网络可达性,保证二者之间的网络连接正常。

生产环境下 Kafka 集群通常有多个 broker 节点,你可以在 sink-uri 中配置多个 broker 的访问地址,这有助于提升 changefeed 到 Kafka 集群访问的稳定性,当部分被配置的 Kafka 节点故障的时候,changefeed 依旧可以正常工作。假设 Kafka 集群中有 3 个 broker 节点,地址分别为 127.0.0.1:9092 / 127.0.0.2:9092 / 127.0.0.3:9092,可以参考如下 sink-uri 创建 changefeed:

tiup ctl:v<CLUSTER_VERSION> cdc changefeed create 
--server="http://127.0.0.1:8300" 
--sink-uri="kafka://127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092/kafka-topic-name?protocol=canal-json&partition-num=3&replication-factor=1&max-message-bytes=1048576" 
--config="changefeed.conf"

3.Changefeed 创建成功后,执行如下命令,查看 changefeed 的状态

tiup ctl:v<CLUSTER_VERSION> cdc changefeed list --server="http://127.0.0.1:8300"

四、写入数据以产生变更日志

完成以上步骤后,TiCDC 会将上游 TiDB 的增量数据变更日志发送到 Kafka,下面对 TiDB 写入数据,以产生增量数据变更日志。

1.模拟业务负载

在测试实验环境下,可以使用 go-tpc 向上游 TiDB 集群写入数据,以让 TiDB 产生事件变更数据。如下命令,首先在上游 TiDB 创建名为 tpcc 的数据库,然后使用 TiUP bench 写入数据到这个数据库中。

tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 prepare
tiup bench tpcc -H 127.0.0.1 -P 4000 -D tpcc --warehouses 4 run --time 300s

2.消费 Kafka Topic 中的数据

changefeed 正常运行时,会向 Kafka Topic 写入数据,你可以通过由 Kafka 提供的 kafka-console-consumer.sh,观测到数据成功被写入到 Kafka Topic 中:

./bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic `${topic-name}`

至此,TiDB 的增量数据变更日志就实时地复制到了 Kafka。下一步,你可以使用 Flink 消费 Kafka 数据。当然,你也可以自行开发适用于业务场景的 Kafka 消费端。

五、配置 Flink 消费 Kafka 数据

1.安装 Flink Kafka Connector

在 Flink 生态中,Flink Kafka Connector 用于消费 Kafka 中的数据并输出到 Flink 中。Flink Kafka Connector 并不是内建的,因此在 Flink 安装完毕后,还需要将 Flink Kafka Connector 及其依赖项添加到 Flink 安装目录中。下载下列 jar 文件至 Flink 安装目录下的 lib 目录中,如果你已经运行了 Flink 集群,请重启集群以加载新的插件。

  • flink-connector-kafka-1.17.1.jar
  • flink-sql-connector-kafka-1.17.1.jar
  • kafka-clients-3.5.1.jar

2.创建一个表

可以在 Flink 的安装目录执行如下命令,启动 Flink SQL 交互式客户端:

[root@flink flink-1.15.0]# ./bin/sql-client.sh

随后,执行如下语句创建一个名为 tpcc_orders 的表:

CREATE TABLE tpcc_orders (
    o_id INTEGER,
    o_d_id INTEGER,
    o_w_id INTEGER,
    o_c_id INTEGER,
    o_entry_d STRING,
    o_carrier_id INTEGER,
    o_ol_cnt INTEGER,
    o_all_local INTEGER
) WITH (
'connector' = 'kafka',
'topic' = 'tidb_tpcc_orders',
'properties.bootstrap.servers' = '127.0.0.1:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json',
'scan.startup.mode' = 'earliest-offset',
'properties.auto.offset.reset' = 'earliest'
)

请将 topic 和 properties.bootstrap.servers 参数替换为环境中的实际值。

3.查询表内容

执行如下命令,查询 tpcc_orders 表中的数据:

SELECT * FROM tpcc_orders;

执行成功后,可以观察到有数据输出,如下图

在这里插入图片描述
至此,就完成了 TiDB 与 Flink 的数据集成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/76907.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

清除pip安装库时的缓存

目录 1、命令清除缓存 2、路径手动清除 在使用pip安装Python库时&#xff0c;如果之前已经下载过该库&#xff0c;pip会默认使用缓存来安装库&#xff0c;而不是重新从网络上下载。缓存文件通常存储在用户目录下的缓存文件夹中&#xff0c;具体位置因操作系统和Python版本而异…

剑指offer43.1~n整数中1出现的次数

看到这么大的数据规模就直到用暴力法肯定会超时&#xff0c;但是还是花一分钟写了一个试一下&#xff0c;果然超时 class Solution {public int countDigitOne(int n) {int count 0;for(int i1;i<n;i){countdigitOneInOneNum(i);}return count;}public int digitOneInOneNu…

【CSS】禁用元素鼠标事件(例如实现元素禁用效果)

文章目录 基本用法 基本用法 pointer-events 属性指定在什么情况下 (如果有) 某个特定的图形元素可以成为鼠标事件。实际运用中可以通过对auto 和none动态控制&#xff0c;来动态实现元素的禁用效果。 属性描述auto与pointer-events属性未指定时的表现效果相同&#xff0c;对…

Java版企业电子招投标采购系统源码之首页设计 tbms

​ 功能描述 1、门户管理&#xff1a;所有用户可在门户页面查看所有的公告信息及相关的通知信息。主要板块包含&#xff1a;招标公告、非招标公告、系统通知、政策法规。 2、立项管理&#xff1a;企业用户可对需要采购的项目进行立项申请&#xff0c;并提交审批&#xff0c;查…

Curson 编辑器

Curson 汉化与vacode一样 Curson 自带chat功能 1、快捷键ctrlk(代码中编辑) 2、快捷键ctrll 右侧打开窗口

spark的standalone 分布式搭建

一、环境准备 集群环境hadoop11&#xff0c;hadoop12 &#xff0c;hadoop13 安装 zookeeper 和 HDFS 1、启动zookeeper -- 启动zookeeper(11,12,13都需要启动) xcall.sh zkServer.sh start -- 或者 zk.sh start -- xcall.sh 和zk.sh都是自己写的脚本-- 查看进程 jps -- 有…

CentOS系统环境搭建(十五)——CentOS安装Kibana

centos系统环境搭建专栏&#x1f517;点击跳转 关于Elasticsearch的安装请看CentOS系统环境搭建&#xff08;十二&#xff09;——CentOS7安装Elasticsearch。 CentOS安装Kibana 1.下载 &#x1f517;https://www.elastic.co/downloads/past-releases/kibana-7-17-12 若你是…

Ansys Zemax | 手机镜头设计 - 第 1 部分:光学设计

本文是 3 篇系列文章的一部分&#xff0c;该系列文章将讨论智能手机镜头模组设计的挑战&#xff0c;从概念、设计到制造和结构变形的分析。本文是三部分系列的第一部分&#xff0c;将专注于OpticStudio中镜头模组的设计、分析和可制造性评估。&#xff08;联系我们获取文章附件…

【变形金刚01】attention和transformer所有信息

图1.来源&#xff1a;Arseny Togulev在Unsplash上的照片 一、说明 这是一篇 长文 &#xff0c;几乎讨论了人们需要了解的有关注意力机制的所有信息&#xff0c;包括自我注意、查询、键、值、多头注意力、屏蔽多头注意力和转换器&#xff0c;包括有关 BERT 和 GPT 的一些细节。因…

磁力线试验+多图

今天要磨制一个钢针工具。磨下来很多的铁屑&#xff0c;灵机一动&#xff0c;何不来试验一下磁铁的磁力线。这可是难得的材料。 下放7颗强力磁铁&#xff0c;可见强力磁铁的磁力线非常集中。 下放直径4CM的喇叭磁铁 强力磁铁U型铁 强力磁铁E型铁氧体磁芯&#xff0c;可见磁力线…

可视化绘图技巧100篇进阶篇(九)-三维百分比堆积条形图(3D Stacked Percentage Bar Chart)

目录 前言 适用场景 绘图工具及代码实现 帆软 实现思路 方案一&#xff1a;使用计算指标 上传数据 添加组件 生成图表 添加计算字段 生成分区柱形图 生成百分比堆积条形图 美化图表 设置标签 设置颜色 效果查看 PC 端 移动端 方案二&#xff1a;使用自助数…

tk切换到mac的code分享

文章目录 前言一、基础环境配置二、开发软件与扩展1.用到的开发软件与平替、扩展情况 总结 前言 最近换上了coding人生的第一台mac&#xff0c;以前一直偏好tk&#xff0c;近来身边的朋友越来越多的用mac了&#xff0c;win的自动更新越来越占磁盘了&#xff0c;而且win11抛弃了…

ReactNative进阶(三十四):ipa Archive 阶段报错error: Multiple commands produce问题修复及思考

文章目录 一、前言二、问题描述三、问题解决四、拓展阅读五、拓展阅读 一、前言 在应用RN开发跨平台APP阶段&#xff0c;从git中拉取项目&#xff0c;应用Jenkins进行组包时&#xff0c;发现最终生成的ipa安装包版本号始终与项目中设置的版本号不一致。 二、问题描述 经过仔…

svn 过滤文件

1. 右键点击&#xff0c;依次选择 TortoiseSVN -> Settings 2. 添加需要过滤的后缀/关键词【 *.iml *.idea *.jar *.class 】

01- vdom 和模板编译源码

组件渲染的过程 template --> ast --> render --> vDom --> 真实的Dom --> 页面 Runtime-Compiler和Runtime-Only的区别 - 简书 编译步骤 模板编译是Vue中比较核心的一部分。关于 Vue 编译原理这块的整体逻辑主要分三个部分&#xff0c;也可以说是分三步&am…

Nginx转发请求到后端服务报400 Bad Request

问题描述 系统部署好后&#xff0c;进行测试时发现有部分接口出错&#xff0c;项目采用Nginx作为后端代理服务器&#xff0c;有Nginx统一将请求转发到后端的网关服务&#xff0c;再由网关服务路由到具体的服务上&#xff0c;发布好后&#xff0c;大部分接口都是正常的&#xff…

时序预测 | MATLAB实现基于CNN-GRU卷积门控循环单元的时间序列预测-递归预测未来(多指标评价)

时序预测 | MATLAB实现基于CNN-GRU卷积门控循环单元的时间序列预测-递归预测未来(多指标评价) 目录 时序预测 | MATLAB实现基于CNN-GRU卷积门控循环单元的时间序列预测-递归预测未来(多指标评价)预测结果基本介绍程序设计参考资料 预测结果 基本介绍 MATLAB实现基于CNN-GRU卷积…

一.RocketMQ概念

RocketMQ概念 1.概念2.应用场景3.MQ的优点和缺点4.常见MQ对比 1.概念 MQ(Message Queue)&#xff0c;是一种提供消息队列服务的中间件&#xff0c;也称为消息中间件&#xff0c;是一套提供了消息生产、存储、消费全过程API的软件系统。 RocketMQ是阿里巴巴2016年MQ中间件&…

uniapp 上传比较大的视频文件就超时

uni.uploadFile&#xff0c;上传超过10兆左右的文件就报错err&#xff1a;uploadFile:fail timeout&#xff0c;超时 解决&#xff1a; 在manifest.json文件中做超时配置 uni.uploadFile({url: this.action,method: "POST",header: {Authorization: uni.getStorage…

Azure创建自定义VM镜像

创建一个虚拟机&#xff0c;参考 https://blog.csdn.net/m0_48468018/article/details/132267096&#xff0c;入站端口开启80&#xff0c;22 进行远程远程连接 使用CLI命令部署NGINX,输入如下命令 sudo su apt-get update -y apt-get install nginx git -y最后的效果 4. 关闭…