46、Flink 的 异步 I/O 算子详解

异步 I/O
1.需求

在与外部系统交互(用数据库中的数据扩充流数据)时,需要考虑与外部系统的通信延迟对整个流处理应用的影响。

同步交互:使用 MapFunction访问外部数据库的数据, MapFunction 向数据库发送一个请求然后一直等待,直到收到响应;大多数情况下,等待占据了函数运行的大部分时间。

异步交互:一个并行函数实例可以并发地处理多个请求和接收多个响应,使函数在等待的时间可以发送其他请求和接收其他响应,使等待的时间可以被多个请求摊分;大多数情况下,异步交互可以大幅度提高流处理的吞吐量。

在这里插入图片描述

注意: 提高 MapFunction 的并行度(parallelism)在有些情况下也可以提升吞吐量,但是这样做通常会导致非常高的资源消耗:更多的并行 MapFunction 实例意味着更多的 Task、更多的线程、更多的 Flink 内部网络连接、 更多的与数据库的网络连接、更多的缓冲和更多程序内部协调的开销。

2.前提
  • 需要支持异步请求的数据库客户端
  • 如果没有支持异步请求的客户端,可以通过创建多个客户端并使用线程池处理同步调用的方法,将同步客户端转换为有限并发的客户端,比正规的异步客户端效率低。
3.异步 I/O API
a)代码模版

Flink 的异步 I/O API 允许用户在流处理中使用异步请求客户端,API 处理与数据流的集成,同时还能处理好顺序、事件时间和容错等。

在具备异步数据库客户端的基础上,实现数据流转换操作与数据库的异步 I/O 交互需要以下三部分:

  • 实现分发请求的 AsyncFunction
  • 获取数据库交互的结果并发送给 ResultFuture回调 函数
  • 将异步 I/O 操作应用于 DataStream 作为 DataStream 的一次转换操作,启用或者不启用重试。
// 使用 Java 8 的 Future 接口(与 Flink 的 Future 相同)实现了异步请求和回调。

/**
 * 实现 'AsyncFunction' 用于发送请求和设置回调。
 */
class AsyncDatabaseRequest extends RichAsyncFunction<String, Tuple2<String, String>> {

    /** 能够利用回调函数并发发送请求的数据库客户端 */
    private transient DatabaseClient client;

    @Override
    public void open(OpenContext openContext) throws Exception {
        client = new DatabaseClient(host, post, credentials);
    }

    @Override
    public void close() throws Exception {
        client.close();
    }

    @Override
    public void asyncInvoke(String key, final ResultFuture<Tuple2<String, String>> resultFuture) throws Exception {

        // 发送异步请求,接收 future 结果
        final Future<String> result = client.query(key);

        // 设置客户端完成请求后要执行的回调函数
        // 回调函数只是简单地把结果发给 future
        CompletableFuture.supplyAsync(new Supplier<String>() {

            @Override
            public String get() {
                try {
                    return result.get();
                } catch (InterruptedException | ExecutionException e) {
                    // 显示地处理异常。
                    return null;
                }
            }
        }).thenAccept( (String dbResult) -> {
            resultFuture.complete(Collections.singleton(new Tuple2<>(key, dbResult)));
        });
    }
}

// 创建初始 DataStream
DataStream<String> stream = ...;

// 应用异步 I/O 转换操作,不启用重试
DataStream<Tuple2<String, String>> resultStream =
    AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100);

// 应用异步 I/O 转换操作并启用重试
// 通过工具类创建一个异步重试策略, 或用户实现自定义的策略
AsyncRetryStrategy asyncRetryStrategy =
	new AsyncRetryStrategies.FixedDelayRetryStrategyBuilder(3, 100L) // maxAttempts=3, fixedDelay=100ms
		.ifResult(RetryPredicates.EMPTY_RESULT_PREDICATE)
		.ifException(RetryPredicates.HAS_EXCEPTION_PREDICATE)
		.build();

// 应用异步 I/O 转换操作并启用重试
DataStream<Tuple2<String, String>> resultStream =
	AsyncDataStream.unorderedWaitWithRetry(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100, asyncRetryStrategy);

注意: 第一次调用 ResultFuture.completeResultFuture 就完成了,后续的 complete 调用都将被忽略。

下面两个参数控制异步操作:

  • Timeout: 超时参数定义了异步操作执行多久未完成、最终认定为失败的时长,如果启用重试,则可能包括多个重试请求,可以防止一直等待得不到响应的请求。
  • Capacity: 容量参数定义了可以同时进行的异步请求数;即使异步 I/O 通常带来更高的吞吐量,执行异步 I/O 操作的算子仍然可能成为流处理的瓶颈,限制并发请求的数量可以确保算子不会持续累积待处理的请求进而造成积压,而是在容量耗尽时触发反压
  • AsyncRetryStrategy: 重试策略参数定义了什么条件会触发延迟重试以及延迟的策略,例如,固定延迟、指数后退延迟、自定义实现等。
b)超时处理

当异步 I/O 请求超时的时候,默认会抛出异常并重启作业。

如果想处理超时,可以重写 AsyncFunction#timeout 方法;重写 AsyncFunction#timeout 时需要调用 ResultFuture.complete() 或者 ResultFuture.completeExceptionally() 以通知 Flink 这条记录的处理已经完成;如果超时发生时不想发出任何记录,可以调用 ResultFuture.complete(Collections.emptyList())

c)结果的顺序

AsyncFunction 发出的并发请求经常以不确定的顺序完成,这取决于请求得到响应的顺序;Flink 提供两种模式控制结果记录以何种顺序发出。

  • 无序模式: 异步请求一结束就立刻发出结果记录。 流中记录的顺序在经过异步 I/O 算子之后发生了改变。 当使用 处理时间 作为基本时间特征时,这个模式具有最低的延迟和最少的开销。 此模式使用 AsyncDataStream.unorderedWait(...) 方法。
  • 有序模式: 保持了流的顺序。发出结果记录的顺序与触发异步请求的顺序(记录输入算子的顺序)相同;算子将缓冲一个结果记录直到这条记录前面的所有记录都发出(或超时),因为记录或者结果要在 checkpoint 的状态中保存更长的时间,所以与无序模式相比,有序模式通常会带来额外的延迟和 checkpoint 开销。此模式使用 AsyncDataStream.orderedWait(...) 方法。
d)事件时间

当流处理应用使用事件时间时,异步 I/O 算子会正确处理 watermark。

  • 无序模式: Watermark 既不超前于记录也不落后于记录,即 watermark 建立了顺序的边界。 只有连续两个 watermark 之间的记录是无序发出的。 在一个 watermark 后面生成的记录只会在这个 watermark 发出以后才发出。 在一个 watermark 之前的所有输入的结果记录全部发出以后,才会发出这个 watermark。

    在 watermark 的情况下,无序模式 会引入一些与有序模式 相同的延迟和管理开销。开销大小取决于 watermark 的频率。

  • 有序模式: 连续两个 watermark 之间的记录顺序也被保留了。开销与使用处理时间 相比,没有显著的差别。

注意:摄入时间是一种特殊的事件时间,它基于数据源的处理时间自动生成 watermark。

e)容错保证

异步 I/O 算子提供了完全的精确一次容错保证,它将异步请求的记录保存在 checkpoint 中,在故障恢复时重新触发请求。

f)重试支持

重试支持为异步 I/O 操作引入了一个内置重试机制,它对用户的异步函数实现逻辑是透明的。

  • AsyncRetryStrategy: 异步重试策略包含了触发重试条件 AsyncRetryPredicate 定义,以及根据当前已尝试次数判断是否继续重试、下次重试间隔时长的接口方法。 在满足触发重试条件后,有可能因为当前重试次数超过预设的上限放弃重试,或是在任务结束时被强制终止重试(此时系统以最后一次执行的结果或异常作为最终状态)。
  • AsyncRetryPredicate: 触发重试条件可以选择基于返回结果、 执行异常来定义条件,两种条件是或的关系,满足其一即会触发。
g)实现提示

在实现使用 Executor(或者 Scala 中的 ExecutionContext)和回调的 Futures 时,建议使用 DirectExecutor,因为通常回调的工作量很小,DirectExecutor 避免了额外的线程切换开销;回调通常只是把结果发送给 ResultFuture,也就是把它添加进输出缓冲。从这里开始,包括发送记录和与 chenkpoint 交互在内的繁重逻辑都将在专有的线程池中进行处理。

DirectExecutor 可以通过 org.apache.flink.util.concurrent.Executors.directExecutor()com.google.common.util.concurrent.MoreExecutors.directExecutor() 获得。

h)警告

Flink 不以多线程方式调用 AsyncFunction

AsyncFunction 不是以多线程方式调用的;只有一个 AsyncFunction 实例,它被流中相应分区内的每个记录顺序地调用。除非 asyncInvoke(...) 方法快速返回并且依赖于(客户端的)回调,否则无法实现正确的异步 I/O。

例如,以下情况导致阻塞的 asyncInvoke(...) 函数,从而使异步行为无效

  • 使用同步数据库客户端,它的查询方法调用在返回结果前一直被阻塞。
  • asyncInvoke(...) 方法内阻塞等待异步客户端返回的 future 类型对象。

默认情况下,AsyncFunction 的算子(异步等待算子)可以在作业图的任意处使用,但它不能与SourceFunction/SourceStreamTask组成算子链

启用重试后可能需要更大的缓冲队列容量

新的重试功能可能会导致更大的队列容量要求,最大数量可以近似地评估如下。

inputRate * retryRate * avgRetryDuration

例如,对于一个输入率=100条记录/秒的任务,其中1%的元素将平均触发1次重试,平均重试时间为60秒,额外的队列容量要求为:

100条记录/秒 * 1% * 60s = 60

即在无序输出模式下,给工作队列增加 60 个容量可能不会影响吞吐量; 而在有序模式下,头部元素是关键点,它未完成的时间越长,算子提供的处理延迟就越长;在相同的超时约束下,如果头元素事实上获得了更多的重试,那重试功能可能会增加头部元素的处理时间即未完成时间,也就是说在有序模式下,增大队列容量并不是总能提升吞吐。

当队列容量增长时( 可以缓解背压),OOM 的风险会随之增加;对于 ListState 存储来说,理论的上限是 Integer.MAX_VALUE, 虽然队列容量的限制是一样的,但在生产中不能把队列容量增加到太大,此时增加任务的并行性也许更可行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/673955.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

企业软件产品和服务 之 设计保证安全 七项承诺

1. 引言 公司如何保护自己免受数据泄露的影响&#xff1f;标准答案就是&#xff1a; “启用多因素身份验证”——MTA&#xff08;Enable multifactor authentication&#xff09;。 但是&#xff0c;目前很多公司仍然盲目地只使用密码作为唯一的身份来源。 网络安全的核心是…

IPD推行成功的核心要素(九)需求管理助力产品从一次成功走向一直成功

在当今竞争激烈的商业环境中&#xff0c;项目的成功与否往往取决于其能否满足用户和利益相关者的需求。然而&#xff0c;理解、捕捉和有效管理这些需求并非易事。因此&#xff0c;需求管理在项目管理中扮演着至关重要的角色。需求管理是一个系统性的过程&#xff0c;旨在确保项…

直播分享|深入解析ts-morph:通过注释生成类型文档

♪ ♫ 你看小狗在叫 树叶会笑 风声在呢喃♫ ♪ 乘风追梦&#xff0c;童心未泯 OpenTiny 预祝所有大朋友、小朋友儿童节快乐~ 与此同时&#xff0c;OpenTiny 贡献者直播也即将开启啦~ 直播主题&#xff1a;【深入解析ts-morph&#xff1a;通过注释生成类型文档】 6月1日&am…

前驱图,程序执行和进程状态

目录 前驱图 程序的执行 顺序执行 并发执行 进程的定义 进程的状态 总结 前驱图 现在有两个任务分别为p1,p2; 只有执行了p1,才可以执行p2&#xff0c;此时可以称p1为p2的前驱。通过符号语言表示如下&#xff1a; p1->p2 程序的执行 下面引进一段代码来理解进程的概念…

IDEA 学习之 疑难杂症系列

IDEA 学习之 疑难杂症系列 1. Mapstruct 编译空指针问题 1.1. 现象 NullPointerException at org.mapstruct.ap.internal.processor.DefaultVersionInformation.createManifest1.2. 原因 MapStruct 在 IDEA 2020.3 版本编译 NPE 问题 1.3. 解决办法 2. IDEA 学习之 编译内…

什么牌子的开放式耳机质量好?2024超强实力派品牌推荐!

耳机对于一个音乐人有重要这个不必多说&#xff0c;我朋友是个音乐编辑&#xff0c;他经常需要长时间佩戴耳机进行音频编辑和混音工作。在尝试过多款开放式耳机后&#xff0c;都没找到合适的。今天&#xff0c;我将从专业角度为大家带来几款热门开放式耳机的测评报告&#xff0…

Python 高级数据类型

列表List 定义列表 可以将不同的基本数据类型或者列表装到一个列表里 my_list [1,2,3,4,5] print(my_list) # [1, 2, 3, 4, 5] 直接打印出列表的内容 print(type(my_list)) # <class list>my_list ["1","2","3","4","…

MYSQL之安装

一&#xff0c;下载仓库包 wget -i -c https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm二&#xff0c;安装仓库 yum -y install mysql80-community-release-el7-3.noarch.rpmsed -i s/gpgcheck1/gpgcheck0/g mysql-community.repo三&#xff0c;安装MY…

免费SSL证书的安全性与获取指南

SSL证书是一种数字凭证&#xff0c;用于加密用户与网站之间的信息交换&#xff0c;以确保传输的数据不被第三方窃取。它像是一个数字版的密封印章&#xff0c;为数据的传输过程提供了一层保护膜。 免费的SSL证书通常由CA机构提供&#xff0c;它们同样可以提供基础数据的加密服…

瑞吉外卖项目整体介绍

黑马程序员瑞吉外卖 文章目录 一、项目介绍二、产品原型展示三、技术选型四、功能架构五、角色 一、项目介绍 二、产品原型展示 产品原型&#xff0c;就是一款韩品成型之前的一个简单的框架&#xff0c;就是将页面的排版布局展现出来&#xff0c;使产品的初步构思有一个可视化…

跟着大佬学RE(一)

学了一个 map&#xff08;&#xff09;函数的使用 import base64rawData "e3nifIH9b_CndH" target list(map(ord, rawData)) # map 函数将 rawData 中的每个字符传递给 ord 函数。ord 函数返回给定字符的 Unicode 码点 print(target) # 打印 map 对象的内存地址&…

Prism 入门02,区域介绍

一.区域概念和使用方式 什么是区域(Region)?区域,在Prism 框架中,区域是模块化的核心功能之一,其主要作用是降低应用程序和模块之间的耦合度 。使用方式:在应用程序的界面中,划分出某块区域,并为这个区域定义一个唯一的区域名称。那么通过这个区域名称,应用程序就可以…

Android Display Graphics #1 整体框架介绍一

软件基础 Android的framework层提供了一系列的图像渲染API&#xff0c;可绘制2D和3D。简单理解就是上层开发APP的小伙伴提供了接口&#xff0c;开发者可以直接显示对应的自己内容。但如果掌握了Display底层逻辑再写上层app&#xff0c;会有掌控力&#xff0c;出问题可以根据lo…

【Mybatis】源码分析-自定义框架

1、自定义持久层框架 1.1、分析JDBC操作问题 package blnp.net.cn.jvm.demos;import java.sql.*;/*** <p></p>** author lyb 2045165565qq.com* createDate 2024/5/24 14:24*/ public class JdbcTest {public static void main(String[] args) {Connection conne…

大模型+RAG,全面介绍!

1 介绍 大型语言模型&#xff08;LLMs&#xff09;在处理特定领域或高度专业化的查询时存在局限性**&#xff0c;如生成不正确信息或“幻觉”。**缓解这些限制的一种有前途的方法是检索增强生成&#xff08;RAG&#xff09;&#xff0c;RAG就像是一个外挂&#xff0c;将外部数…

环路检测00

题目链接 环路检测 题目描述 注意点 返回环路的开头节点 解答思路 快慢指针确认链表中是否有环&#xff0c;如果无环则快指针最终会到达链表尾部&#xff0c;如果有环则快慢指针会相遇&#xff0c;当快慢指针相遇&#xff0c;此时新的节点node从head开始出发&#xff0c;与…

探索数据结构:便捷的双向链表

&#x1f511;&#x1f511;博客主页&#xff1a;阿客不是客 &#x1f353;&#x1f353;系列专栏&#xff1a;渐入佳境之数据结构与算法 欢迎来到泊舟小课堂 &#x1f618;博客制作不易欢迎各位&#x1f44d;点赞⭐收藏➕关注 ​​ 前言 前面我们学习了单链表&#xff0c;它解…

微服务中feign远程调用相关的各种超时问题

1. 引言 在spring cloud微服中&#xff0c;feign远程调用可能是大家每天都接触到东西&#xff0c;但很多同学却没咋搞清楚这里边的各种超时问题&#xff0c;生产环境可能会蹦出各种奇怪的问题。 首先说下结论&#xff1a; 1)只使用feign组件&#xff0c;不使用ribbion组件&…

FuTalk设计周刊-Vol.045

#AI漫谈 热点捕手 1、新模型 Stable Diffusion 3 与 Stable Cascade 全面解析 最近 Stability AI 又接连推出了 2 个新的模型&#xff1a;Stable Diffusion 3 和 Stable Cascade&#xff0c;在图像生成效率和质量上比半年前推出的 SDXL 1.0 有了明显提升&#xff0c;今天就为…

PyQt5学习系列之ui转py报错问题

PyQt5学习系列之ui转py报错问题 前言解决步骤一步骤二步骤三 总结 前言 设计好的界面出现ui转py问题&#xff0c;一直出现报错。 一开始通过pycharm进行转化&#xff0c;采用tool工具&#xff0c;一直报错。后用vscode进行转化&#xff0c;转化成功。最终发现问题是语句问题。…