flink-cdc同步数据到doris中

1 创建数据库和表

1.1 数据库脚本

-- 创建数据库eayc
create database if not exists ods_eayc;
-- 创建数据表

2 数据同步

2.1 flnk-cdc

参考Flink CDC实时同步MySQL到Doris
Flink CDC 概述

2.1.1 最简单的单表同步

从下面的yml脚本可以看到,并没有doris中创建eayc_user表,应该是flink-cdc自动创建的。

#Mysql的参数配置
source:
  type: mysql
  hostname: 10.101.10.11
  port: 3306
  username: flink
  password: 123456
  tables: eayc.eayc_user
  server-id: 5400
  # server-time-zone: UTC
#Doris的参数配置
sink:
  type: doris
  fenodes: 10.101.11.2:8030,10.101.11.2:8030,10.101.11.3:8030
  username: root
  password: 123456
  table.create.properties.light_schema_change: true
  table.create.properties.replication_num: 1

route:
  - source-table: eayc.eayc_user
    sink-table: ods_eayc.eayc_user
pipeline:
  name: eayc to doris
  parallelism: 1

注意连接mysql的server-id的要唯一,否则提示下面的错误

A slave with the same server_uuid/server_id as this slave has connected to the master...
The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.

进入到flink的界面查看到错误日志,任务执行失败。下面报的错是mysql时区与flink配置不匹配。现在改生产库影响未知,不敢动,于是去掉server-time-zone: UTC设置。重新执行任务。
1

1
此时任务可以正常执行了,数据也可以正常过来了。因为flink-cdc是根据binlog,因此mysql变更,doris中的数据也实时更新过来。
1

2.1.2 多表同步

如下配置

source:
	tables: eayc.eayc_user,eayc.eayc_company,eayc.eayc_company_user
route:
  - source-table: eayc.eayc_user
    sink-table: ods_eayc.eayc_user
  - source-table: eayc.eayc_company
    sink-table: ods_eayc.eayc_company
  - source-table: eayc.eayc_company_user
    sink-table: ods_eayc.eayc_company_user

下面这种方式不支持,会报下面的错误:

Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.lang.String` from Array value (token `JsonToken.START_ARRAY`)
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: java.util.LinkedHashMap["tables"])

1

2.1.3 分表导入

taskmanager.numberOfTaskSlots默认为1,slot不够,就报下面的错误,因为是16C32G,于是我改成了8,parallelism.default默认也是1,我也改成了8,启动之后,没有报下面的错误,但是之前执行的任务没有了。

2025-02-19 15:05:07
java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
	at 

如果mysql的表没有主键,则报下面的错误,这个时候就需要修正原mysql表数据。

Caused by: org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.

doris权限问题,这个是FE集群有问题,更改过来就好了。

reason: SchemaChange request error with Failed to schemaChange, response: {"msg":"Unauthorized","code":401,"data":"Access denied for user 'root@10.101.12.90' (using password: YES)","count":0}

可以看到下面,要获取acc的全部表,但是有一些是做了分表,需合并到其中doris的一张表里面,这个规则是有效的,开始parallelism: 1,我以为有一异常,只同步了一张表,过了几分钟才发现其他表也陆续进来。

source:
	tables: acc.\.*
route:
  - source-table: acc.acc_account_balance_\.*
    sink-table: acc.acc_account_balance
  - source-table: acc.acc_account_subject_\.*
    sink-table: acc.acc_account_subject
  - source-table: acc.acc_initial_balance_\.*
    sink-table: acc.acc_initial_balance
  - source-table: acc.acc_voucher_\.*
    sink-table: acc.acc_voucher
  - source-table: acc.acc_voucher_entry_\.*
    sink-table: acc.acc_voucher_entry    

于是将parallelism: 4,很快后台又抛异常。

java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.

于是调整

taskmanager.memory.process.size: 8192m  # 增加 TaskManager 的内存

Flink CDC并行执行,会出现数据越界的问题。
Flink CDC报错ArrayIndexOutOfBoundsException解决思路

2.2 flink安装

2.2.1 单节点
tar -zxvf flink-1.18.0-bin-scala_2.12.tgz
# 配置环境变量
vi /etc/profile
export JAVA_HOME=/appdata/jdk1.8.0_181
export CLASSPATH=$JAVA_HOME/lib
export FLINK_HOME=/appdata/flink/flink-1.18.0
export PATH=$JAVA_HOME/bin:$FLINK_HOME/bin:$PATH
# 生效
source /etc/profile
# flink配置
vim conf/flink-conf.yaml
execution.checkpointing.interval: 3000
rest.bind-address: 0.0.0.0
cd bin
./start-cluster.sh
#
tar -zxvf flink-cdc-3.0.0-bin.tar.gz
# 执行任务
cd /appdata/flink/flink-cdc-3.0.0
bash bin/flink-cdc.sh /appdata/flink/job/eayc_to_doris.yml

flink-1.18.0
flink-cdc-3.0.0
mysql pipeline connector 3.0.0
doris pipeline connector 3.0.0
将上面两个connector放到cdc的lib目录
1

2.2.2 监控

1

1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/972528.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CUDA兼容NVIDA版本关系

CUDA组成 兼容原则 CUDA 驱动(libcuda.so)兼容类型要求比CUDA新向后兼容无主版本一致,子版本旧兼容需要SASS、NVCC比CUDA老向前兼容提取对应兼容包 向后兼容:新版本支持旧版本的内容,关注的是新版本能否处理旧版本的内容。 向前兼容&#…

要配置西门子G120AX变频器实现**端子启停**和**Modbus RTU(485)频率给定

要配置西门子G120AX变频器实现端子启停和Modbus RTU(485)频率给定,需调整以下关键参数: 1. 端子启停控制 P29652[0]:设置启停信号源 (例:P29652 [0] 722.0 表示用DI0端子作为启动/停止信号&…

撕碎QT面具(3):解决垂直布局的内容显示不全

问题:内容显示不全 解决方案:增加Vertical Spacer,它会把Group Box控件挤上去,让内容显示完全。 结果展示:

LabVIEW 中的 ax - events.llb 库

ax - events.llb 库位于C:\Program Files (x86)\National Instruments\LabVIEW 2019\vi.lib\Platform目录,它是 LabVIEW 平台下与特定事件处理相关的重要库。该库为 LabVIEW 开发者提供了一系列工具,用于有效地处理和管理应用程序中的各种事件&#xff0…

Macos机器hosts文件便捷修改工具——SwitchHosts

文章目录 SwitchHosts软件下载地址操作添加方案切换方案管理方案快捷键 检测 SwitchHosts SwitchHosts 是一款 Mac 平台上的免费软件,它可以方便地管理和切换 hosts 文件,支持多种 hosts 文件格式。 软件下载地址 SwitchHosts 操作 添加方案 添加 …

【算法】双指针(下)

目录 查找总价格为目标值的两个商品 暴力解题 双指针解题 三数之和 双指针解题(左右指针) 四数之和 双指针解题 双指针关键点 注意事项 查找总价格为目标值的两个商品 题目链接:LCR 179. 查找总价格为目标值的两个商品 - 力扣(LeetCode&#x…

嵌入式linux利用标准字符驱动模型控制多个设备方法

一、驱动模型概述 Linux标准字符设备驱动模型基于以下核心组件: 设备号:由主设备号(Major)和次设备号(Minor)组成 cdev结构体:表征字符设备的核心数据结构 文件操作集合:file_operations结构体定义设备操作 sysfs接口:提供用户空间设备管理能力 传统单设备驱动与多设…

【可实战】Linux 常用统计命令:排序sort、去重uniq、统计wc

在 Linux 系统中,有一些常用的命令可以用来收集和统计数据。 一、常用统计命令的使用场景 日志分析和监控:通过使用 Linux 统计命令,可以实时监控和分析系统日志文件,了解系统的运行状况和性能指标。例如,使用 tail 命…

在 macOS 的 ARM 架构上按住 Command (⌘) + Shift + .(点)。这将暂时显示隐藏文件和文件夹。

在 macOS 的 ARM 架构(如 M1/M2 系列的 Mac)上,设置 Finder(访达)来显示隐藏文件夹的步骤如下: 使用快捷键临时显示隐藏文件: 在Finder中按住 Command (⌘) Shift .(点&#xff…

分享一个解梦 Chrome 扩展 —— 周公 AI 解梦

一、插件简介 周公 AI 解梦是一款基于 Chrome 扩展的智能解梦工具,由灵机 AI 提供技术支持。它能运用先进的 AI 技术解析梦境含义,为用户提供便捷、智能的解梦服务。无论你是对梦境充满好奇,还是想从梦境中获取一些启示,这款插件都…

Git命令行入门

诸神缄默不语-个人CSDN博文目录 之前写过一篇VSCode Git的博文:VSCode上的Git使用手记(持续更新ing…) 现在随着开发经历增加,感觉用到命令行之类复杂功能的机会越来越多了,所以我专门再写一篇Git命令行的文章。 G…

【时时三省】(C语言基础)用N-S流程图表示算法

山不在高,有仙则名。水不在深,有龙则灵。 ----CSDN 时时三省 N-S流程图 既然用基本结构的顺序组合可以表示任何复杂的算法结构,那么,基本结构之间的流程线就是多余的了。1973年,美国学者I.Nassi和B .Shneiderman提出…

单元测试junit5

一、idea 安装自动化生成插件jcode5 安装可能不成功&#xff0c;尝试多次安装&#xff1b; 安装成功后&#xff0c;重启idea&#xff0c;再次确认安装是否成功&#xff1b; 二、在需要生成单元测试代码的模块的pom中引入依赖 ......<parent><groupId>org.springf…

Matlab写入点云数据到Rosbag

最近有需要读取一个点云并做处理后&#xff0c;重新写回rosbag。网上有很多读取的教程&#xff0c;但没有写入。自己写入时也遇到了很多麻烦&#xff0c;踩了一堆坑进行记录。 1. rosbag中一个lidar的msg有哪些信息&#xff1f; 通过如下代码&#xff0c;先读取一个rosbag的l…

c++标准io与线程,互斥锁

封装一个 File 类&#xff0c; 用有私有成员 File* fp 实现以下功能 File f "文件名" 要求打开该文件 f.write(string str) 要求将str数据写入文件中 string str f.read(int size) 从文件中读取最多size个字节&#xff0c; 并将读取到的数据返回 析构函数 #…

springboot-ffmpeg-m3u8-convertor nplayer视频播放弹幕效果

学习链接 ffmpeg-cli-wrapper - 内部封装了操作ffmpeg命令的java类库&#xff0c;它提供了一些类和方法&#xff0c;可以方便地构建和执行 ffmpeg 命令&#xff0c;而不需要直接操作字符串或进程。并且支持异步执行和进度监听 springboot-ffmpeg-m3u8-convertor - gitee代码 …

在做题中学习(90):螺旋矩阵II

解法&#xff1a;模拟 思路&#xff1a;创建相同大小的一个二维数组&#xff08;矩阵&#xff09;&#xff0c;用变量标记原矩阵的行数和列数&#xff0c;每次遍历完一行或一列&#xff0c;相应行/列数--&#xff0c;进行对应位置的赋值即可。此题是正方形矩阵&#xff0c;因此…

FreeRTOS任务调度介绍

FreeRTOS 操作系统支持三种调度方式:抢占式调度,时间片调度和合作式调度。 实际应用主要是抢占式调度和时间片调度,合作式调度用到的很少 (1)抢占式调度 每个任务都有不同的优先级,任务会一直运行直到被高优先级任务抢占或者遇到阻塞式的 API 函数,比如vTaskDelay,执…

微信小程序image组件mode属性详解

今天学习微信小程序开发的image组件&#xff0c;mode属性的属性值不少&#xff0c;一开始有点整不明白。后来从网上下载了一张图片&#xff0c;把每个属性都试验了一番&#xff0c;总算明白了。现总结归纳如下&#xff1a; 1.使用scaleToFill。这是mode的默认值&#xff0c;sc…

关于Unity的一些基础知识点汇总

1.Prefab实例化后&#xff0c;哪些资源是共用的&#xff1f;哪些资源是拷贝的&#xff1f; 共用资源 脚本组件&#xff1a;实例化后的 Prefab 共享脚本组件的代码。若脚本中无状态数据&#xff0c;多个实例对脚本方法的调用会有相同逻辑。比如一个控制物体移动的脚本&#xff0…