Fink CDC数据同步(一)环境部署

1 背景介绍

Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

Flink CDC 是 Apache Flink 的一组源连接器,基于数据库日志的 Change Data Caputre 技术,实现了全量和增量的一体化读取能力,并借助 Flink 优秀的管道能力和丰富的上下游生态,支持捕获多种数据库的变更,并将这些变更实时同步到下游存储。目前,Flink CDC 的上游已经支持了MySQL,MariaDB, RDS MySQL,Aurora MySQL,PolarDB MySQL,PostgreSQL,Oracle,MongoDB,SqlServer,OceanBase,PolarDB-X,TiDB 等丰富的数据源。Flink CDC 的下游则更加丰富,支持写入 Kafka、Pulsar 消息队列,也支持写入 Hudi、Iceberg 等数据湖,还支持写入各种数据仓库。同时,通过 Flink SQL 原生支持的 Changelog 机制,可以让 CDC 数据的加工变得非常简单。用户可以通过 SQL 便能实现数据库全量和增量数据的清洗、打宽、聚合等操作,极大地降低了用户门槛。 此外, Flink DataStream API 支持用户编写代码实现自定义逻辑,给用户提供了深度定制业务的自由度。

本文以Flink+FlinkCDC同步MySQL数据、数据入仓,数据入湖等测试为例,为日后云桥数据集成产品做准备。

框架软件版本如下:

软件

版本

Java

1.8.0_361

Mysql

8.0.32

Flink

1.16.2

Flink CDC

2.3.0

Hadoop

3.1.5.0

Hive

3.1.0.3.1.5.0-152

kafka

2.0.0.3.1.5.0-152

Hudi

0.13.0

环境部署

2 环境部署

2.1 Flink部署

本次部署以Flink单机版为例

2.1.1 下载Flink部署包并解压

# 下载Flink安装包(这里测试使用Flink16.2版本)
wget https://archive.apache.org/dist/flink/flink-1.16.2/flink-1.16.2-bin-scala_2.12.tgz

# 解压
tar -xzvf flink-1.16.2-bin-scala_2.12.tgz

2.1.2 修改配置文件

修改flink-conf.yaml

在flink目录的conf下

jobmanager.rpc.address: localhost

# The RPC port where the JobManager is reachable.

jobmanager.rpc.port: 6123

jobmanager.bind-host: localhost

jobmanager.memory.process.size: 6800m


taskmanager.bind-host: 192.168.1.1

taskmanager.host: 192.168.1.1

# The total process memory size for the TaskManager.
#
# Note this accounts for all memory usage within the TaskManager process, including JVM metaspace and other overhead.

taskmanager.memory.process.size: 6800m

# To exclude JVM metaspace and overhead, please, use total Flink memory size instead of 'taskmanager.memory.process.size'.
# It is not recommended to set both 'taskmanager.memory.process.size' and Flink memory.
#
taskmanager.memory.flink.size: 6280m

# The number of task slots that each TaskManager offers. Each slot runs one parallel pipeline.

taskmanager.numberOfTaskSlots: 4

# The parallelism used for programs that did not specify and other parallelism.

parallelism.default: 1

jobmanager.execution.failover-strategy: region

# The port to which the REST client connects to. If rest.bind-port has
# not been specified, then the server will bind to this port as well.
#
rest.port: 8787

# The address to which the REST client will connect to
#
rest.address: 192.168.1.1

rest.bind-address: 192.168.1.1

#设置checkpoint周期时间
execution.checkpointing.interval: 30000
#设置有且仅有一次模式 目前支持 EXACTLY_ONCE、AT_LEAST_ONCE  
execution.checkpointing.mode: EXACTLY_ONCE
#设置checkpoint的存储方式
state.backend: filesystem
#设置checkpoint的存储位置
state.checkpoints.dir: file:///opt/data/flink/checkpoint
#设置savepoint的存储位置
state.savepoints.dir: file:///opt/data/flink/checkpoint
#设置checkpoint的超时时间 即一次checkpoint必须在该时间内完成 不然就丢弃
execution.checkpointing.timeout: 600000
#设置两次checkpoint之间的最小时间间隔
execution.checkpointing.min-pause: 500
#设置并发checkpoint的数目
execution.checkpointing.max-concurrent-checkpoints: 1
#开启checkpoints的外部持久化这里设置了清除job时保留checkpoint,默认值时保留一个 假如要保留3个
state.checkpoints.num-retained: 3
#默认情况下,checkpoint不是持久化的,只用于从故障中恢复作业。当程序被取消时,它们会被删除。但是你可以配置checkpoint被周期性持久化到外部,类似于savepoints。这些外部的checkpoints将它们的元数据输出到外#部持久化存储并且当作业失败时不会自动清除。这样,如果你的工作失败了,你就会有一个checkpoint来恢复。
#ExternalizedCheckpointCleanup模式配置当你取消作业时外部checkpoint会产生什么行为:
#RETAIN_ON_CANCELLATION: 当作业被取消时,保留外部的checkpoint。注意,在此情况下,您必须手动清理checkpoint状态。
#DELETE_ON_CANCELLATION: 当作业被取消时,删除外部化的checkpoint。只有当作业失败时,检查点状态才可用。
execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# 该配置用于客户端 client 连接 Flink, 将此设置为 JobManager 运行的主机名(该配置决定WEB的地址)
rest.address: 192.168.1.1
# 客户端提供对外访问的地址和端口是rest.port和rest.address
# 如果没有配置rest.bind-port, 那么其他服务也使用rest.port端口,所以只要使用其中一个启动模式,其他模式在启动时就会报错端口无法启动
# 因此配置该项后, 其他 Job 启动后,就会在 rest.bind-address 和 rest.bind-port 随机选择并占用.
rest.bind-address: 192.168.1.1
classloader.check-leaked-classloader: false

2.1.3 启动服务

进入bin目录

# 启动Flink集群
./start-cluster.sh

# 停止Flink集群
#./stop-cluster.sh

会启动

StandaloneSessionClusterEntrypoint

TaskManagerRunner

  • 如果StandaloneSessionClusterEntrypoint 没有启动,则检查flink-conf.yaml有地址和端口有没有填写好,
  • TaskManagerRunner没有启动则检查

        flink/comf/masters

                192.168.1.1:8787

        taskmanager.sh

2.1.4 访问Flink UI

http://x.x.x.x:8787/#/overview

2.2 FlinkCDC

Flink CDC是Flink的一组连接器,需要连接哪个组件,则需要将对应的连接jar包放在flink安装目录下的lib即可,

以下几种情况需要进行源码编译:

  • 用户对 Flink CDC 源码进行了修改
  • Flink CDC 某依赖项的版本与运行环境不一致
  • 官方未提供最新版本 Flink CDC 二进制安装包

FlinkCDC源码地址:

GitHub - ververica/flink-cdc-connectors: CDC Connectors for Apache Flink®

如果不需要编译,选择对应的连接器和版本,可以直接下载打包好的jar

Central Repository: com/ververica

将jar包放到flink安装目录下的lib即可。

FlinkCDC 与Flink 对应关系:

Flink® CDC Version

Flink® Version

1.0.0

1.11.*

1.1.0

1.11.*

1.2.0

1.12.*

1.3.0

1.12.*

1.4.0

1.13.*

2.0.*

1.13.*

2.1.*

1.13.*

2.2.*

1.13.*, 1.14.*

2.3.*

1.13.*, 1.14.*, 1.15.*, 1.16.0

2.4.*

1.13.*, 1.14.*, 1.15.*, 1.16.*, 1.17.0


系列文章

Fink CDC数据同步(一)环境部署icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017355?spm=1001.2014.3001.5502​​​​​​​​​​​​​​
Fink CDC数据同步(二)MySQL数据同步icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017472?spm=1001.2014.3001.5501
Fink CDC数据同步(三)Flink集成Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017571?spm=1001.2014.3001.5501
Fink CDC数据同步(四)Mysql数据同步到Kafkaicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023747?spm=1001.2014.3001.5501
Fink CDC数据同步(五)Kafka数据同步Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023837?spm=1001.2014.3001.5501

Fink CDC数据同步(六)数据入湖Hudiicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023939?spm=1001.2014.3001.5502

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/369711.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

MySQL进阶45讲【13】为什么表数据删掉一半,表文件大小不变?

1 前言 有些小伙伴在删数据库数据时,会产生一个疑问,我的数据库占用空间大,我把一个最大的表删掉了一半的数据,怎么表文件的大小还是没变? 那么这篇文章,就介绍一下数据库表的空间回收,看看如…

智安网络2023年度回顾:我与您共存、信任与安全的一年

在2023年这一全球格局加速演变、经济复苏的关键时期,网络安全威胁呈现出前所未有的复杂性。作为中国网络安全行业的新兴企业,智安网络凭借其卓越的安全策略、技术创新和客户服务,书写了企业发展的辉煌篇章。 智安网络在应对网络安全挑战方面…

17- OpenCV:图像矩(Image Moments)和点多边形测试

目录 一、图像矩 1、矩的概念介绍 2、相关的API 3、代码演示 二、点多边形测试 1、概念介绍-点多边形测试 2、cv::pointPolygonTest 3、代码演示 一、图像矩 引言 在数字图像处理、计算机视觉与相关领域中,图像矩(Image moments)是指图像的某些特定像素灰…

嵌入式中物联网核心技术有哪些

IoT军事技术 物联网军事技术是一项利用IoT感知技术在军事活动中获取人、装备、作战环境状态的信息特征,从而实现在军事活动中做出智能化决策和控制局势的军事方针。 据悉,早于2012年10月军方联合了社会研究机构合力创建了“军事物联网联合实验室”。 …

论文阅读-在分布式数据库环境中对哈希算法进行负载均衡基准测试

论文名称:Benchmarking Hashing Algorithms for Load Balancing in a Distributed Database Environment 摘要 现代高负载应用使用多个数据库实例存储数据。这样的架构需要数据一致性,并且确保数据在节点之间均匀分布很重要。负载均衡被用来实现这些目…

环形链表(快慢指针)

给你单链表的头节点 head ,请你反转链表,并返回反转后的链表 给你一个链表的头节点 head ,判断链表中是否有环。 如果链表中有某个节点,可以通过连续跟踪 next 指针再次到达,则链表中存在环。 为了表示给定链表中的环…

LINE官方账号全攻略:设置流程与基本功能

LINE官方账号是专为企业和品牌而设计,提供了更多的商业功能和定制选项。在中国台湾、日本和东南亚这些地区,LINE相比其他社交媒体软件具有更大的用户群体和更广泛的影响力,尤其在台湾和泰国地区,有90%的人都在使用LINE。而且LINE官…

1Panel应用推荐:青龙定时任务管理平台

1Panel(github.com/1Panel-dev/1Panel)是一款现代化、开源的Linux服务器运维管理面板,它致力于通过开源的方式,帮助用户简化建站与运维管理流程。为了方便广大用户快捷安装部署相关软件应用,1Panel特别开通应用商店&am…

769933-15-5,Biotin aniline,可以合成多种有机化合物和聚合物

您好,欢迎来到新研之家 文章关键词:769933-15-5,Biotin aniline,生物素苯胺,生物素-苯胺 一、基本信息 产品简介:Biotin Aniline,一种具有重要生物学功能的化合物,不仅参与了维生…

PHP入门指南:进阶篇

PHP入门指南:进阶篇 PHP入门指南:进阶篇1. 面向对象编程(OOP)1.1 类和对象的基本概念1.2 构造函数和析构函数1.3 属性和方法的访问控制1.4 继承与多态 2. 错误和异常处理2.1 错误处理机制2.2 异常处理机制2.3 自定义异常类 3. PHP…

免费ai绘画软件选择哪个?

对于免费AI绘画软件的选择,因为每个软件都有其独特的优点和适用场景,可以根据个人的需求和技能水平来决定。以下是被广泛认可的AI绘画软件: 1、建e网AI-一款为建筑室内设计师提供AI绘图的智能工具,具有文字生图,方案优…

Pytroch 自写训练模板适合入门版 包含十五种经典的自己复现的一维模型 1D CNN

训练模板 在毕业之前,决定整理一下手头的代码,自己做1D-CNN这吗久,打算开源一下自己使用的1D-CNN的代码,包括用随机数生成一个模拟的数据集,到自己写的一个比较好的适合入门的基础训练模板,以及自己复现的…

松软香甜的贝果面包,碱水清香可口饱腹

最近尝试了一款碱趣贝果面包,是手工制作的美味面包,它非常蓬松,口感熟软,而且食用方便快捷,只需加热一下就可以食用。 这款面包的表皮清香有韧性,里面松软可口,加热后散发出清新的香气&#xff…

负载均衡下webshell连接

目录 一、什么是负载均衡 分类 负载均衡算法 分类介绍 分类 均衡技术 主要应用 安装docker-compose 2.1上传的文件丢失 2.2 命令执行时的漂移 2.3 大工具投放失败 2.4 内网穿透工具失效 3.一些解决方案 总结 一、什么是负载均衡 负载均衡(Load Balanc…

51单片机编程应用(C语言):矩阵键盘

16个按键只要8个I/O口,本来16个按键要16个I/O口。 矩阵键盘可以按行扫描也可以按列扫描,扫描原理很简单,变成之前的独立按键,比如 按行扫描,看原理图如下,我们P170,另外三个置1,那么第一行就选…

手写RPC框架

RPC框架核心组件 对于RPC框架简洁模式下,主要有以下角色,暂且抛开心跳机制以及负载均衡等复杂策略,我们先来自己实现一个RPC框架,后面我们再深入理解。 注册中心 RegisterServiceVo package com.cover.rpc.remote.vo;import …

J6 - ResNeXt50模型的实现

🍨 本文为🔗365天深度学习训练营 中的学习记录博客🍖 原作者:K同学啊 | 接辅导、项目定制 目录 环境代码在之前的章节都有,我后续只贴出模型设计构建过程如下打印模型结构打印参数量 训练过程与结果总结 环境 系统: L…

聊聊并发编程,另送5本Golang并发编程新书

大家好,我是飞哥! 并发编程并不是一个新话题,但是我觉得在近几年以及未来的时间里,并发编程将显得越来越重要。 为什么这样讲,让我们先回到一个基本的问题上来,为什么我们要采用并发编程?关于这…

数据分析基础之《pandas(2)—基本数据操作》

一、读取一个真实的股票数据 1、读取数据 # 基本数据操作 data pd.read_csv("./stock_day.csv")data# 删除一些列,使数据简洁点 data data.drop([ma5,ma10,ma20,v_ma5,v_ma10,v_ma20], axis1)data 二、索引操作 1、numpy当中我们已经讲过使用索引选取…

Unity 读取指定目录所占内存大小

public static class TxxTool{#region 读取文件大小private static List<string> DirList new List<string>();public static long GetFileSize(string path){DirList new List<string>();DirList.Add(path);GetAllDirecotries(path);long fileSize 0;for…