1 创建数据库和表
1.1 数据库脚本
-- 创建数据库eayc
create database if not exists ods_eayc;
-- 创建数据表
2 数据同步
2.1 flnk-cdc
参考Flink CDC实时同步MySQL到Doris
Flink CDC 概述
2.1.1 最简单的单表同步
从下面的yml脚本可以看到,并没有doris中创建eayc_user表,应该是flink-cdc自动创建的。
#Mysql的参数配置
source:
type: mysql
hostname: 10.101.10.11
port: 3306
username: flink
password: 123456
tables: eayc.eayc_user
server-id: 5400
# server-time-zone: UTC
#Doris的参数配置
sink:
type: doris
fenodes: 10.101.11.2:8030,10.101.11.2:8030,10.101.11.3:8030
username: root
password: 123456
table.create.properties.light_schema_change: true
table.create.properties.replication_num: 1
route:
- source-table: eayc.eayc_user
sink-table: ods_eayc.eayc_user
pipeline:
name: eayc to doris
parallelism: 1
注意连接mysql的server-id的要唯一,否则提示下面的错误
A slave with the same server_uuid/server_id as this slave has connected to the master...
The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.
进入到flink的界面查看到错误日志,任务执行失败。下面报的错是mysql时区与flink配置不匹配。现在改生产库影响未知,不敢动,于是去掉server-time-zone: UTC设置。重新执行任务。
此时任务可以正常执行了,数据也可以正常过来了。因为flink-cdc是根据binlog,因此mysql变更,doris中的数据也实时更新过来。
2.1.2 多表同步
如下配置
source:
tables: eayc.eayc_user,eayc.eayc_company,eayc.eayc_company_user
route:
- source-table: eayc.eayc_user
sink-table: ods_eayc.eayc_user
- source-table: eayc.eayc_company
sink-table: ods_eayc.eayc_company
- source-table: eayc.eayc_company_user
sink-table: ods_eayc.eayc_company_user
下面这种方式不支持,会报下面的错误:
Caused by: org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize value of type `java.lang.String` from Array value (token `JsonToken.START_ARRAY`)
at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: java.util.LinkedHashMap["tables"])
2.1.3 分表导入
taskmanager.numberOfTaskSlots
默认为1,slot不够,就报下面的错误,因为是16C32G,于是我改成了8,parallelism.default
默认也是1,我也改成了8,启动之后,没有报下面的错误,但是之前执行的任务没有了。
2025-02-19 15:05:07
java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
at
如果mysql的表没有主键,则报下面的错误,这个时候就需要修正原mysql表数据。
Caused by: org.apache.flink.table.api.ValidationException: 'scan.incremental.snapshot.chunk.key-column' must be set when the table doesn't have primary keys.
doris权限问题,这个是FE集群有问题,更改过来就好了。
reason: SchemaChange request error with Failed to schemaChange, response: {"msg":"Unauthorized","code":401,"data":"Access denied for user 'root@10.101.12.90' (using password: YES)","count":0}
可以看到下面,要获取acc的全部表,但是有一些是做了分表,需合并到其中doris的一张表里面,这个规则是有效的,开始parallelism: 1
,我以为有一异常,只同步了一张表,过了几分钟才发现其他表也陆续进来。
source:
tables: acc.\.*
route:
- source-table: acc.acc_account_balance_\.*
sink-table: acc.acc_account_balance
- source-table: acc.acc_account_subject_\.*
sink-table: acc.acc_account_subject
- source-table: acc.acc_initial_balance_\.*
sink-table: acc.acc_initial_balance
- source-table: acc.acc_voucher_\.*
sink-table: acc.acc_voucher
- source-table: acc.acc_voucher_entry_\.*
sink-table: acc.acc_voucher_entry
于是将parallelism: 4
,很快后台又抛异常。
java.util.concurrent.CompletionException: java.util.concurrent.CompletionException: org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: Could not acquire the minimum required resources.
于是调整
taskmanager.memory.process.size: 8192m # 增加 TaskManager 的内存
Flink CDC并行执行,会出现数据越界的问题。
Flink CDC报错ArrayIndexOutOfBoundsException解决思路
2.2 flink安装
2.2.1 单节点
tar -zxvf flink-1.18.0-bin-scala_2.12.tgz
# 配置环境变量
vi /etc/profile
export JAVA_HOME=/appdata/jdk1.8.0_181
export CLASSPATH=$JAVA_HOME/lib
export FLINK_HOME=/appdata/flink/flink-1.18.0
export PATH=$JAVA_HOME/bin:$FLINK_HOME/bin:$PATH
# 生效
source /etc/profile
# flink配置
vim conf/flink-conf.yaml
execution.checkpointing.interval: 3000
rest.bind-address: 0.0.0.0
cd bin
./start-cluster.sh
#
tar -zxvf flink-cdc-3.0.0-bin.tar.gz
# 执行任务
cd /appdata/flink/flink-cdc-3.0.0
bash bin/flink-cdc.sh /appdata/flink/job/eayc_to_doris.yml
flink-1.18.0
flink-cdc-3.0.0
mysql pipeline connector 3.0.0
doris pipeline connector 3.0.0
将上面两个connector放到cdc的lib目录