FlinkPipelineComposer 详解

FlinkPipelineComposer 详解

原文

背景

在flink-cdc 3.0中引入了pipeline机制,提供了除Datastream api/flink sql以外的一种方式定义flink 任务

通过提供一个yaml文件,描述source sink transform等主要信息

由FlinkPipelineComposer解析,自动调用DataStream api进行构建

官方样例

 source:
     type: mysql
     hostname: localhost
     port: 3306
     username: root
     password: 123456
     tables: app_db.\.*

   sink:
     type: doris
     fenodes: 127.0.0.1:8030
     username: root
     password: ""

   pipeline:
     name: Sync MySQL Database to Doris
     parallelism: 2

目前可以通过source配置的源只有mysql 和 values

values是调试用的,所以可以说当前这个功能是专门为“mysql同步数据到各个sink”的场景使用的

目前可以使用的sink有

  1. doris
  2. elasticsearch
  3. kafka
  4. paimon
  5. starrocks
  6. values

FlinkPipelineComposer

我们以mysql -> values来观察 FlinkPipelineComposer 是如何通过读取yaml文件的定义来构建DataStream的

values会将mysql产生的cdc消息打印到stdout上

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:
 type: mysql
 hostname: x.x.x.x
 port: 3306
 username: username
 password: password
 tables: test.t1
 server-id: 5400-5404
 server-time-zone: UTC+8

sink:
  type: values
  name: values Sink

pipeline:
 name: Sync Mysql Database to Values
 parallelism: 2

首先来观察一下这个任务提交到flink集群后具体的链路构成

在这里插入图片描述

结合官方给出的架构

在这里插入图片描述

可以看出,“一个source,一个sink”的yaml定义,最终会生成5个operator

  1. Souce: Flink CDC Event Source: mysql
  2. SchemaOperator
  3. PrePartition

-------------- shuffle --------------

  1. PostPartion
  2. Sink Writer: values Sink

Souce: Flink CDC Event Source: mysql

负责

  1. 创建枚举器
  2. 创建reader
  3. 枚举split分发给reader
  4. reader读取数据生成事件

SchemaOperator

负责和JobMaster上的coodinator沟通,执行schema evolution 相关逻辑,见Flink CDC Schema Evolution 详解

PrePartition

负责

  1. 广播FlushEvent
  2. 广播SchemaChangeEvent
  3. shuffle普通消息到下游

PostPartion

Sink Writer: values Sink

写入下游,values sink当前到实现是打印到stdout

源码解析

接下来分析,FlinkPipelineComposer 读取 yaml 构造DataStream的细节

CliFrontend#main

CliFrontend.java:54

args

在这里插入图片描述

createExecutor 创建 executor CliFrontend.java:76

调用CliExecutor#run CliExecutor.java:70

看一下解析得到的pipelineDef
在这里插入图片描述

这里已经从yaml文件中解析出了source和sink的配置了

composer.compose 调用compose方法开始使用DataStream api进行构建

FlinkPipelineComposer.java:92 FlinkPipelineComposer#compose

声明了5个translator,其中第一个sourceTranslator会生成DataStream<Event> stream,而其他的translator基于这个stream作为input,调用transform方法,放入对应阶段的operator

DataSourceTranslator sourceTranslator = new DataSourceTranslator();
...
TransformTranslator transformTranslator = new TransformTranslator();
...
SchemaOperatorTranslator schemaOperatorTranslator =...
...
DataSinkTranslator sinkTranslator = new DataSinkTranslator();
...
PartitioningTranslator partitioningTranslator = new PartitioningTranslator();
...

translate的调用顺序如下

DataStream<Event> stream =
                sourceTranslator.translate(
                  ...
stream =
                transformTranslator.translatePreTransform(
                  ...
stream =
                transformTranslator.translatePostTransform(
                  ...
stream =
                schemaOperatorTranslator.translate(
                  ...
stream =
                partitioningTranslator.translate(
                  ...
sinkTranslator.translate(
                pipelineDef.getSink(), stream, dataSink, schemaOperatorIDGenerator.generate());

return new FlinkPipelineExecution(env...)
                  ...

逐一说明

  1. sourceTranslator.translate 通过source名字获取sourceProvider,关联到stream中
  • sourceProvider.getSource ->
    • MysqlSource ->
      • createReader
      • createEnumerator
  1. stream = transformTranslator.translatePreTransform
if (transforms.isEmpty()) {
    return input;
}

由于有如上代码,我们的yaml中没有涉及,所以忽略这个transform

  1. stream = transformTranslator.translatePostTransform
  • 同上
  1. stream = schemaOperatorTranslator.translate
  • 插入一个schemaOperator节点,在收到schemaChangeEvent的时候
    1. 停住当前流
    2. 上报coodinator
    3. flush下游数据,让sink消耗完已有数据
    4. sink 通知coodinator flush完成
    5. coodinator调用sink注册的MetaApplier完成schema变更,变更完成后通知schemaOperator
    6. schemaOperator重新放通数据
  1. stream = partitioningTranslator.translate
  • 构建prePartition postPartition节点
  1. sinkTranslator.translate
  • 构建sink节点
  1. FlinkPipelineExecution 中的 execute 方法调用 env.executeAsync(jobName)

总结

flink-cdc 3.0 提供的pipeline模式,通过定义yaml,自动构建了一条cdc pipeline,避免手动调用datastream api,并且支持schema evolution

构建的主要逻辑集中在 FlinkPipelineComposer

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/916412.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Python中如何获取HTTP请求的Response Body

目录 一、引言 二、使用urllib库获取Response Body 1. 基本用法 2. 发送POST请求 三、使用requests库获取Response Body 1. 安装requests库 2. 基本用法 3. 发送POST请求 4. 处理JSON响应 四、高级用法 1. 处理请求头 2. 设置超时 3. 处理Cookies 五、案例&#…

从华为到创业公司

我有一个朋友&#xff0c;在华为工作了很长一段时间&#xff0c;一年多前&#xff0c;他从华为出来到了一家创业公司。 周末趁着有时间&#xff0c;我跟他聊了下关于从华为到创业公司的一些问题&#xff0c;总结给大伙看看。 ▎1 在华为工作和在创业公司工作最大的差别是什么呢…

如何解决由于找不到d3dx9_43.dll导致游戏启动失败?这里是如何解决的完整指南

遇到“由于找不到d3dx9_43.dll”错误时&#xff0c;很多用户可能会感到困惑和无助。这个问题通常发生在尝试启动游戏或使用基于DirectX的应用程序时。d3dx9_43.dll是Microsoft DirectX软件的一部分&#xff0c;专门用于处理复杂的图形计算&#xff0c;缺少它意味着某些图形功能…

Matlab2022b安装MinGW64

1 问题引入 能找到这个问题的&#xff0c;一定就是在matlab中用mex这个编译命令的时候出现下面的错误&#xff0c;才会来找解决的办法。 首先在网上众多资料中一定是让你先去matlab窗口的这个Add-Ons进行添加&#xff0c;但是很多情况下因为大家装的版本问题&#xff0c;都会…

低代码可视化-uniapp开关选择组件-低码生成器

开关&#xff08;Switch&#xff09;选择组件是一种用户界面元素&#xff0c;允许用户在两种状态&#xff08;通常是开/关、是/否、启用/禁用等&#xff09;之间进行切换。这种组件在移动应用、桌面软件、网页以及物联网设备中广泛应用。以下是对开关Switch选择组件的详细介绍&…

蓝桥杯每日真题 - 第11天

题目&#xff1a;&#xff08;合并数列&#xff09; 题目描述&#xff08;14届 C&C B组D题&#xff09; 解题思路&#xff1a; 题意理解&#xff1a;给定两个数组&#xff0c;目标是通过若干次合并操作使两个数组相同。每次合并操作可以将数组中相邻的两个数相加&#xff…

【2024软考架构案例题】你知道什么是 RESTful 风格吗?

&#x1f449;博主介绍&#xff1a; 博主从事应用安全和大数据领域&#xff0c;有8年研发经验&#xff0c;5年面试官经验&#xff0c;Java技术专家&#xff0c;WEB架构师&#xff0c;阿里云专家博主&#xff0c;华为云云享专家&#xff0c;51CTO 专家博主 ⛪️ 个人社区&#x…

面试经典 150 题:20、2、228

20. 有效的括号 参考代码 #include <stack>class Solution { public:bool isValid(string s) {if(s.size() < 2){ //特判&#xff1a;空字符串和一个字符的情况return false;}bool flag true;stack<char> st; //栈for(int i0; i<s.size(); i){if(s[i] ( |…

NFS-Ganesha 核心架构解读

NFSv4 简要概述 NFS 这个协议( NFSv2 )最初由 Sun Microsystems 在 1984 年设计提出&#xff0c;由于存在一些不足&#xff0c;因此在随后由几家公司联合推出了 NFSv3。到了 NFSv4 时&#xff0c;开发完全由 IETF 主导&#xff0c;设计目标是&#xff1a; 提高互联下的 NFS 访…

由播客转向个人定制的音频频道(1)平台搭建

项目的背景 最近开始听喜马拉雅播客的内容&#xff0c;但是发现许多不方便的地方。 休息的时候收听喜马拉雅&#xff0c;但是还需要不断地选择喜马拉雅的内容&#xff0c;比较麻烦&#xff0c;而且黑灯操作反而伤眼睛。 喜马拉雅为代表的播客平台都是VOD 形式的&#xff0…

mysql数据库(五)多表查询

多表查询 文章目录 多表查询一、链表查询1.1交叉连接1.2 内连接1.3 左连接1.4 右连接1.5 全连接1.6 例子 二、子查询2.1 in与not in2.2 any/some2.3 all2.4 比较运算符2.5 exists 三、例子 查询中使用的表如下所示 ------------ | id | name | ------------ | 1 | IT | …

Redis设计与实现 学习笔记 第十七章 集群

Redis集群是Redis提供的分布式数据库方案&#xff0c;集群通过分片&#xff08;sharding&#xff0c;水平切分&#xff09;来进行数据共享&#xff0c;并提供复制和故障转移功能。 17.1 节点 一个Redis集群通常由多个节点&#xff08;node&#xff09;组成&#xff0c;在刚开…

分布式----Ceph部署

目录 一、存储基础 1.1 单机存储设备 1.2 单机存储的问题 1.3 商业存储解决方案 1.4 分布式存储&#xff08;软件定义的存储 SDS&#xff09; 1.5 分布式存储的类型 二、Ceph 简介 三、Ceph 优势 四、Ceph 架构 五、Ceph 核心组件 #Pool中数据保存方式支持两种类型&…

【Qt聊天室客户端】消息功能--发布程序

1. 获取文件内容 主要目标是实现获取内容二进制数据的接口&#xff0c;主要是为后面的消息功能提供服务 具体实现 客户端发送请求 服务端处理请求&#xff0c;同时支持三种数据类型 客户端处理服务端的响应 2. 发送图片消息 客户端与服务端的通信约定 客户端从服务器中获取图片…

ab (Apache Bench)的使用

Apache Bench&#xff08;ab&#xff09;是一个用于基准测试HTTP Web服务器的命令行工具&#xff0c;广泛用于评估和优化Web服务器的性能。以下是关于Apache Bench的详细介绍&#xff0c;包括其功能、使用方法、常用参数和输出结果解析。 功能 性能测试&#xff1a;通过模拟多…

【HarmonyNext】显示提示文字的方法

【HarmonyNext】显示提示文字的方法 本文介绍在 HarmonyNext 中显示提示文字的两种常见方法&#xff1a;使用自定义弹窗 CustomDialog 和使用 promptAction 的 showToast 方法。 一、使用自定义弹窗 CustomDialog 在 HarmonyNext 中&#xff0c;自定义弹窗是实现复杂提示信…

第三十一天|贪心算法| 56. 合并区间,738.单调递增的数字 , 968.监控二叉树

目录 56. 合并区间 方法1&#xff1a;fff 看方法2&#xff1a;fff优化版 方法3&#xff1a; 738.单调递增的数字 968.监控二叉树&#xff08;贪心二叉树&#xff09; 56. 合并区间 判断重叠区间问题&#xff0c;与452和435是一个套路 方法1&#xff1a;fff 看方法2&am…

【自用】0-1背包问题与完全背包问题的Java实现

引言 背包问题是计算机科学领域的一个经典优化问题&#xff0c;分为多种类型&#xff0c;其中最常见的是0-1背包问题和完全背包问题。这两种问题的核心在于如何在有限的空间内最大化收益&#xff0c;但它们之间存在一些关键的区别&#xff1a;0-1背包问题允许每个物品只能选择…

【Unity】ScriptableObject的应用:利用配方合成新物体

前一篇已经使用ScriptableObject(SO)类配置可放置物体&#xff0c;本篇探索更多的SO类应用场景。 需求分析 将若干指定物体放在工作台上&#xff0c;可以生成新的物体。 成果展示 Scene部分 准备工作台&#xff0c;放在工作台上的物体全部放在指定PlacedObjects空物体下。 …

STM32设计学生宿舍监测控制系统

目录 前言 一、本设计主要实现哪些很“开门”功能&#xff1f; 二、电路设计原理图 电路图采用Altium Designer进行设计&#xff1a; 三、实物设计图 四、程序源代码设计 五、获取资料内容 前言 随着科技的飞速发展和智能化时代的到来&#xff0c;学生宿舍的安全、舒适…