49、Flink 的数据源的 SplitReader API 详解

SplitReader API
a)概述

核心的 SourceReader API 是完全异步的,但实际上,大多数 Sources 都会使用阻塞的操作,例如客户端(如 KafkaConsumer)的 poll() 阻塞调用,或者分布式文件系统(HDFS, S3等)的阻塞I/O操作;为了使其与异步 Source API 兼容,这些阻塞(同步)操作需要在单独的线程中进行,并在之后将数据提交给 reader 的异步线程。

SplitReader 是基于同步读取/轮询的 Source 的高级(high-level)API,例如 file source 和 Kafka source 的实现等

核心是上面提到的 SourceReaderBase 类,其使用 SplitReader 并创建提取器(fetcher)线程来运行 SplitReader,该实现支持不同的线程处理模型。

b)SplitReader

SplitReader API 只有以下三个方法:

  • 阻塞式的提取 fetch() 方法,返回值为 RecordsWithSplitIds。
  • 非阻塞式处理分片变动 handleSplitsChanges() 方法。
  • 非阻塞式的唤醒 wakeUp() 方法,用于唤醒阻塞中的提取操作。

SplitReader 仅需要关注从外部系统读取记录,因此比 SourceReader 简单得多。

c)SourceReaderBase

常见的 SourceReader 实现方式如下:

  • 有一个线程池以阻塞的方式从外部系统提取分片。
  • 解决内部提取线程与其他方法调用(如 pollNext(ReaderOutput))之间的同步。
  • 维护每个分片的水印(watermark)以保证水印对齐。
  • 维护每个分片的状态以进行 Checkpoint。

为了减少开发新的 SourceReader 所需的工作,Flink 提供了 SourceReaderBase 类作为 SourceReader 的基本实现。 SourceReaderBase 已经实现了上述需求,要重新编写新的 SourceReader,只需要让 SourceReader 继承 SourceReaderBase,而后完善一些方法并实现 SplitReader。

d)SplitFetcherManager

SourceReaderBase 支持几个开箱即用的线程模型,取决于 SplitFetcherManager 的行为模式; SplitFetcherManager 创建和维护一个分片提取器(SplitFetchers)池,同时每个分片提取器使用一个 SplitReader 进行提取,它还决定如何分配分片给分片提取器。

如下所示,一个 SplitFetcherManager 可能有固定数量的线程,每个线程对分配给 SourceReader 的一些分片进行抓取。

在这里插入图片描述

以下代码片段实现了此线程模型。

/**
 * 一个SplitFetcherManager,它具有固定数量的分片提取器,
 * 并根据分片ID的哈希值将分片分配给分片提取器。
 */
public class FixedSizeSplitFetcherManager<E, SplitT extends SourceSplit> 
        extends SplitFetcherManager<E, SplitT> {
    private final int numFetchers;

    public FixedSizeSplitFetcherManager(
            int numFetchers,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
        super(elementsQueue, splitReaderSupplier);
        this.numFetchers = numFetchers;
        // 创建 numFetchers 个分片提取器.
        for (int i = 0; i < numFetchers; i++) {
            startFetcher(createSplitFetcher());
        }
    }

    @Override
    public void addSplits(List<SplitT> splitsToAdd) {
        // 根据它们所属的提取器将分片聚集在一起。
        Map<Integer, List<SplitT>> splitsByFetcherIndex = new HashMap<>();
        splitsToAdd.forEach(split -> {
            int ownerFetcherIndex = split.hashCode() % numFetchers;
            splitsByFetcherIndex
                    .computeIfAbsent(ownerFetcherIndex, s -> new ArrayList<>())
                    .add(split);
        });
        // 将分片分配给它们所属的提取器。
        splitsByFetcherIndex.forEach((fetcherIndex, splitsForFetcher) -> {
            fetchers.get(fetcherIndex).addSplits(splitsForFetcher);
        });
    }
}

使用这种线程模型的SourceReader可以像下面这样创建:

public class FixedFetcherSizeSourceReader<E, T, SplitT extends SourceSplit, SplitStateT>
        extends SourceReaderBase<E, T, SplitT, SplitStateT> {

    public FixedFetcherSizeSourceReader(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitFetcherSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(
                elementsQueue,
                new FixedSizeSplitFetcherManager<>(
                        config.getInteger(SourceConfig.NUM_FETCHERS),
                        elementsQueue,
                        splitFetcherSupplier),
                recordEmitter,
                config,
                context);
    }

    @Override
    protected void onSplitFinished(Map<String, SplitStateT> finishedSplitIds) {
        // 在回调过程中对完成的分片进行处理。
    }

    @Override
    protected SplitStateT initializedState(SplitT split) {
        ...
    }

    @Override
    protected SplitT toSplitType(String splitId, SplitStateT splitState) {
        ...
    }
}

SourceReader 的实现还可以在 SplitFetcherManagerSourceReaderBase 的基础上编写自己的线程模型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/700512.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

商业智能(BI)期末复习

商业智能&#xff08;BI&#xff09;期末复习 商业智能&#xff08;BI&#xff09;期末复习 2024/06/17 13:30-15:00 1.工作簿包含工作表 2.tableau是一款轻型BI工具 3.敏捷BI成本比较低 因为可以不建立数据仓库 4.敏捷BI的能带来更高的质量系统 是在用户监督下建立起来的 …

Python易错点总结

目录 多分支选择结构 嵌套选择 用match模式识别 match与if的对比 案例&#xff1a;闰年判断 三角形的判断 用whlie循环 高斯求和 死循环 用for循环 ​编辑continue​编辑 whlie与else结合 pass 序列 列表&#xff08;有序&#xff09; 元组&#xff08;有序&…

高仿imtoken钱包源码/获取助记词/获取私钥/自动归集

高仿imtoken钱包/获取助记词/获取私钥/自动归集 带双端&#xff0c;无纯源码 源码下载&#xff1a;https://download.csdn.net/download/m0_66047725/89379118 更多资源下载&#xff1a;关注我。

【免杀】C2远控-APC注入-进程镂空

目录 进程镂空&傀儡进程&#xff08;主要过内存扫描&#xff09;代码 傀儡进程演示如何上线上线演示 APC注入&进程欺骗&#xff08;主要过内存扫描&#xff09;同步调用与异步调用代码演示 进程镂空&傀儡进程&#xff08;主要过内存扫描&#xff09; 进程镂空(Pro…

16.左侧导航菜单制作

左侧导航菜单制作 1. 修改路由&#xff0c;方便查看页面 index.ts import { RouteRecordRaw, createRouter, createWebHistory } from "vue-router"; import Layout from /layout/Index.vueconst routes: Array<RouteRecordRaw> [{path: /,name: home,comp…

记录大三上学期大数据课程设计:基于Hadoop和Spark的中文手写数字实时识别系统

我整理好了两个百度网盘链接&#xff0c;一个是模型文档和数据&#xff0c;一个是镜像&#xff0c;下载、导入虚拟机即可运行。 github地址&#xff1a;Li-Jihong/big-data: 用来记录大三上学期大数据课程设计&#xff1a;基于Hadoop和Spark的中文手写数字实时识别系统 (githu…

服务器如何远程桌面连接不上,服务器远程桌面连接不上解决办法

服务器远程桌面连接不上&#xff0c;是IT运维中常见的挑战之一。针对这一问题&#xff0c;专业的解决方法通常涉及以下几个方面的排查与操作&#xff1a; 首先&#xff0c;我们需要检查网络连接是否正常。远程桌面连接依赖于稳定的网络连接&#xff0c;因此&#xff0c;确认服务…

Rocky Linux 9.4 部署Zabbix 7.0

文章目录 Zabbix基本概念zabbix介绍zabbix特性zabbix结构 安装Zabbix主机名配置配置Zabbix-Server(1)禁用EPEL提供的Zabbix软件包(2)安装Zabbix Server、Web前端、Agent(3)创建初始数据库(4)Zabbix server配置数据库(5)为Zabbix前端配置PHP(6)启动Zabbix server和agent进程(7)放…

【JS重点知识05】正则表达式

目录 一&#xff1a;正则表达式简介 1 什么是正则表达式 2 正则表达式作用 二&#xff1a;语法格式&#xff1a; 1 定义正则表达式 2 检索、判断是否匹配 &#xff08;1&#xff09;test()方法 &#xff08;2&#xff09;exec()方法 三&#xff1a;元字符 普通字符&a…

【C++课程学习】:类和对象(拷贝构造和运算符重载)

&#x1f381;个人主页&#xff1a;我们的五年 &#x1f50d;系列专栏&#xff1a;C课程学习 &#x1f389;欢迎大家点赞&#x1f44d;评论&#x1f4dd;收藏⭐文章 目录 ✍拷贝构造&#xff1a; &#x1f349;特点一&#xff1a; &#x1f349;特点二&#xff1a; &…

消息中间件比较:Redis,Kafka和RabbitMQ

对微服务使用异步通信时&#xff0c;通常使用消息代理。代理确保不同微服务之间的通信可靠且稳定&#xff0c;消息在系统内得到管理和监控&#xff0c;并且消息不会丢失。您可以从几个消息代理中进行选择&#xff0c;它们的规模和数据功能各不相同。这篇博文将比较三种最受欢迎…

基于深度图像的无监督目标跟踪

概要 大致的步骤 深度图像获取:通过深度传感器(例如ToF相机、双目相机等)获取场景的深度图像。深度图转scanscan转pointcloud点云聚类卡尔曼滤波预测匈牙利算法匹配目标ID更新深度图转scan 参考这篇博客 scan转pointcloud

科技云报道:“元年”之后,生成式AI将走向何方?

科技云报道原创。 近两年&#xff0c;以大模型为代表的生成式AI技术&#xff0c;成为引爆数字原生最重要的技术奇点&#xff0c;人们见证了各类文生应用的进展速度。Gartner预测&#xff0c;到2026年&#xff0c;超过80%的企业将使用生成式AI的API或模型&#xff0c;或在生产环…

C++基础(二)

选择结构 选择结构是用来控制程序流程&#xff0c;使得程序可以根据不同的条件执行不同的代码块。 if语句 简单结构 if (表达式) { // 表达式为真时执行的语句。 } else { // 表达式为假时执行的语句。 } #include <iostream> #include <string>using namespace s…

卫星通讯传输电力运维巡检EasyCVR视频汇聚平台智能监控方案

随着科技的快速发展&#xff0c;视频监控技术已广泛应用于各个领域。而卫星通讯作为一种高效、稳定的通信方式&#xff0c;为视频监控系统的远程传输提供了有力支持。 一、方案背景 随着电力行业的快速发展&#xff0c;电力运维巡检工作变得愈发重要。传统的巡检方式往往受到…

知识图谱的应用---新零售

文章目录 新零售知识图谱构建过程典型应用 新零售 新零售&#xff0c;即个人、企业以互联网为依托&#xff0c;通过运用大数据、人工智能等先进技术手段并运用心理学知识&#xff0c;对商品的生产、流通与销售过程进行升级改造&#xff0c;进而重塑业态结构与生态圈&#xff0c…

镜舟科技携手中通快运,入选 2024 爱分析·数据库应用实践报告

典型案例&#xff1a;中通快运重构数据中心&#xff0c;满足业务多种复杂分析需求 中通快运成立于2016年&#xff0c;是中通品牌旗下快运企业&#xff0c;聚焦数智物流新趋势&#xff0c; 提供面向企业及个人客户的全链路一站式物流服务。目前中通快运全国揽派件网点有21000 余…

MySQL存储引擎详述:InnoDB为何胜出?

MySQL作为当前最流行的开源关系型数据库之一,其强大的功能和良好的性能使其广泛应用于各种规模的应用系统中。其中,存储引擎的设计理念是MySQL数据库灵活高效的关键所在。 一、什么是存储引擎 存储引擎是MySQL架构的重要组成部分,负责MySQL中数据的存储和提供了视图,存储过程等…

大疆智图_空三二维重建成果传输

一、软件环境 1.1 所需软件 1、 大疆智图&#xff1a;点击下载&#xff1b;   2、 ArcGIS Pro 3.1.5&#xff1a;点击下载&#xff0c;建议使用IDM或Aria2等多线程下载器&#xff1b;   3、 IDM下载器&#xff1a;点击下载&#xff0c;或自行搜索&#xff1b;   4、 Fas…

初出茅庐的小李博客之CJSON库解析心知天气数据

心知天气数据JSON格式介绍 JSON格式介绍http://t.csdnimg.cn/pJX1n 下面代码是利用CJSON库进行数据解析 解析代码 #include <stdio.h> #include <string.h> #include "cJSON.h" // 假设你的CJSON库头文件路径是正确的int main(void) {// 提供的JSON…