Flink回撤流

1.回撤流定义(RetractStream)

Flink 的回撤流是指在 Flink 的流处理算法中,撤回已经发送到下游节点的数据。这是因为在实际应用场景中,有些错误数据可能会发送到下游节点,因此需要回撤流以保证数据的准确性。

回撤流可以理解为流式场景下对数据进行更新,这里的更新数据并不是将发往下游的历史数据进行更改,要知道,已经发往下游的消息是追不回来的。更新历史数据的含义是,在得知某个Key(接在Key BY / Group By后的字段)对应数据已经存在的情况下,如果该Key对应的数据再次到来,会生成一条delete消息和一条新的insert消息发往下游。

在 Flink 中,回撤流的功能可以通过 Flink 提供的事务性 API 来实现。该 API 可以对数据流进行事务支持,以确保数据的准确性。在发生错误时,可以回撤事务中的数据,以保证数据的准确性。
总的来说,Flink 的回撤流是一个非常有用的功能,可以用于保证数据准确性和可靠性,同时也可以提高 Flink 的稳定性和可靠性。

2.回撤流示例

流场景下的一个词频统计例子

 没有retract会导致最终结果不正确

3.聚合算子回撤

聚合算子中包含两种状态,state 存储中间结果状态(如count(id)值)、cntState存储key对应的消息数量(聚合消息+1,回撤消息-1)。state用于不断更新中间聚合状态,cntState用于判断向下游发送当前新的聚合消息,还是上一次聚合消息对应的回撤消息。

4. Sink算子回撤

官方对于sink的插入模式有以下三种描述:

  • Append 模式:该模式用户在定义Sink的DDL时候不定义PK,在Apache Flink内部生成的所有只有INSERT语句;
  • Upsert 模式:该模式用户在定义Sink的DDL时候可以定义PK,在Apache Flink内部会根据事件打标(retract机制)生成INSERT/UPDATE和DELETE 语句,其中如果定义了PK, UPDATE语句按PK进行更新,如果没有定义PK UPDATE会按整行更新;
  • Retract 模式:该模式下会产生INSERT和DELETE两种信息,Sink Connector 根据这两种信息构造对应的数据操作指令;

Sink算子是否支持回撤流,要根据sink数据源的特性而定。例如kafka sink只支持append模式,jdbc sink在Flink1.11中只支持upsert(不配置primary key会报错)。这都跟sink数据源的特性密切相关。

以Kafka Sink为例,Kafka是利用log中顺序追加消息的方式存储消息,因此只支持append模式,网上有修改kafka sink connector以支持upsert的方法:将聚合算子中的回撤消息(false)过滤掉,只留下聚合消息(true),并写入kafka,带来的现象就是一个聚合结果会多次出现在kafka中,算是一种阉割版的upsert模式。

结论:聚合算子和Sink算子关于回撤的概念相似,但原理不同且使用场景也不同,聚合算子的回撤用于聚合状态的更新,Sink算子的回撤则更多的是应用于CDC场景。
聚合算子的撤回机制,保证了FlinkSQL持续查询/增量查询的正确语义;而Sink算子的回撤机制,保证了CDC场景下的正确语义。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/53013.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【Docker】Docker应用部署之Docker容器安装Tomcat

目录 一、搜索镜像 二、拉取镜像 三、创建容器 四、测试使用 一、搜索镜像 docker search tomcat 二、拉取镜像 docker pull tomcat:版本 三、创建容器 首先在宿主机创建数据卷的目录 mkdir /root/tomcat # 创建目录 cd /root/tomcat # 进入目录 docker run -id -…

【腾讯云 Cloud Studio 实战训练营】永不宕机的IDE,Coding Everywhere

【腾讯云 Cloud Studio 实战训练营】永不宕机的IDE,随时随地写代码! 写在最前视频讲解:Cloud Studio活动简介何为腾讯云 Cloud Studio?Cloud Studio简介免费试用,上手无忧Cloud Studio 特点及优势云端开发多种预制环境可选metawo…

怎么学习C语言,才能快速掌握?

有多年软件行业经验,期间参与过多个C语言项目。要掌握一门编程语言,仅仅投入时间学习是不够的,关键在于实际项目经验。在没有真正实战经验之前,不宜轻易声称掌握某种编程语言,因为编程是积累性的工作,理论知…

本地Git仓库和GitHub仓库SSH传输

SSH创建命令解释 ssh-keygen 用于创建密钥的程序 -m PEM 将密钥的格式设为 PEM -t rsa 要创建的密钥类型,本例中为 RSA 格式 -b 4096 密钥的位数,本例中为 4096 -C “azureusermyserver” 追加到公钥文件末尾以便于识别的注释。 通常以电子邮件地址…

微服务契约测试框架Pact-Python实战

Pact是一个契约测试框架,有多种语言实现,本文以基于pact-python探究契约测试到底是什么?以及如何实现 官网:自述文件 |契约文档 (pact.io) 契约测试步骤 1、为消费者写一个单元测试,让它通过,并生成契约…

中小企业如何低成本实施MES管理系统

中小企业在市场竞争中需要有高效的管理体系来支持其运营和发展。中小企业MES管理系统是一种先进的管理系统,可以提升工厂智能化水平,提高生产效率,是中小企业必须采取的有效管理工具。然而,由于资金和技术的限制,中小企…

Git分布式版本控制工具和GitHub(二)--Git指令入门

一.指令入门前的准备 1.Git全局设置 2.获取Git仓库 例如:将我GitHub上的first_resp仓库克隆到本地。 点击进入first_rep,后面本地仓库操作的学习就是在这个界面右键打开Git Bash 3.工作区,暂存区,版本库概念 注:如果空…

Ansible自动化运维工具

Ansible是一个基于Python开发的配置管理和应用部署工具,现在也在自动化管理领域大放异彩。它融合了众多老牌运维工具的优点,Pubbet和Saltstack能实现的功能,Ansible基本上都可以实现。 Ansible能批量配置、部署、管理上千台主机。比如以前需要…

CAN转ETHERCAT网关can协议是什么意思

大家好,今天要跟大家分享一款自主研发的通讯网关,JM-ECT-CAN。这款产品能够将各种CAN总线和ETHERCAT网络连接起来,实现高效的数据传输和通信。那么,这款通讯网关具体有哪些功能和特点呢?接下来,我们就一起来…

<el-date-picker>组件选择开始时间,结束时间自动延长30min

背景&#xff1a;选择开始时间&#xff0c;结束时间自动增加30分钟&#xff0c;结束时间也可重新选择&#xff0c;如图&#xff1a; <el-form-item label"预约开始时间" prop"value1"><el-date-pickersize"large"v-model"ruleForm…

SpringBoot2.5.6整合Elasticsearch7.12.1

SpringBoot2.5.6整合Elasticsearch7.12.1 下面将通过SpringBoot整合Elasticseach&#xff0c;SpringBoot的版本是2.5.6&#xff0c;Elasticsearch的版本是7.12.1。 SpringBoot整合Elasticsearch主要有三种方式&#xff0c;一种是通过elasticsearch-rest-high-level-client&am…

【MySQL】模具更新方案

系列文章 C#底层库–MySQLBuilder脚本构建类&#xff08;select、insert、update、in、带条件的SQL自动生成&#xff09; 本文链接&#xff1a;https://blog.csdn.net/youcheng_ge/article/details/129179216 C#底层库–MySQL数据库操作辅助类&#xff08;推荐阅读&#xff0…

简化Java单元测试数据

用EasyModeling简化Java单元测试 EasyModeling 是我在2021年圣诞假期期间开发的一个 Java 注解处理器&#xff0c;采用 Apache-2.0 开源协议。它可以帮助 Java 单元测试的编写者快速构造用于测试的数据模型实例&#xff0c;简化 Java 项目在单元测试中准备测试数据的工作&…

C++ ——STL容器【list】模拟实现

代码仓库&#xff1a; list模拟实现 list源码 数据结构——双向链表 文章目录 &#x1f347;1. 节点结构体&#x1f348;2. list成员&#x1f349;3. 迭代器模板&#x1f34a;4. 迭代器&#x1f34b;5. 插入删除操作&#x1f34c;5.1 insert & erase&#x1f34c;5.2 push_…

flask处理表单数据

flask处理表单数据 处理表单数据在任何 web 应用开发中都是一个常见的需求。在 Flask 中&#xff0c;你可以使用 request 对象来获取通过 HTTP 请求发送的数据。对于 POST 请求&#xff0c;可以通过 request.form 访问表单数据。例如&#xff1a; from flask import Flask, r…

电子鼻毕业论文

面向压埋探测的人体代谢气体识别方法的研究与应用 实现对非目标气体的检测 数据预处理 &#xff08;1a&#xff09;标准化 将采集到的数据先进行变换&#xff0c;统一数量级。其中&#xff0c;xij为第j个传感器的第i个采样值&#xff1b;xj为第 j 个气体传感器的所有采样值&…

Java课题笔记~Maven基础

2、Maven 基础 2.1 Maven安装与配置 下载安装 配置&#xff1a;修改安装目录/conf/settings.xml 本地仓库&#xff1a;存放的是下载的jar包 中央仓库&#xff1a;要从哪个网站去下载jar包 - 阿里云的仓库 2.2 创建Maven项目

计算机网络——学习笔记

付费版&#xff1a;直接在上面的CSDN资源下载 免费版&#xff1a;https://wwsk.lanzouk.com/ijkcj13tqmyb 有疑问或者错误的地方可以在评论区指出&#xff0c;我会尽快回复 示例图&#xff1a;

SOC FPGA之HPS模型设计(一)

目录 一、建立HPS硬件系统模型 1.1 GHRD 1.2 从0开始搭建HPS 1.2.1 FPGA Interfaces 1.2.1.1 General 1.2.1.2 AXI Bridge 1.2.1.3 FPGA-to-HPS SDRAM Interface 1.2.1.4 DMA Peripheral Request 1.2.1.5 Interrupts 1.2.1.6 EMAC ptp interface 1.2.2 Peripheral P…

[JAVAee]线程池

目录 线程池的作用 线程池的使用 线程池的创建方式 线程池的解析 ①Executors与ThreadPoolExecutor ②ThreadPoolExecutor线程池的构造方法 ③RejectedExecutionHandler线程池的拒绝策略 固定线程数量线程池的简单模拟实现 线程池的作用 对于线程的使用,可能会频繁的创建…