Flink的多流转换(分流-侧输出流、合流-union、connect、join)

        在实际应用中,我们可能要将多个不同来源的数据连接合并在一起进行处理,也有可能要将一条流拆分成多条流进行处理,这就涉及到了Flink的多流转换问题。简单来说,就是分流和合流两大操作,分流主要通过侧输出流实现,合流的算子就比较丰富了,有union、connect、join等。

一、分流

        所谓分流,就是通过定义一些筛选条件,将一个dataStream拆分成多个子dataStream的过程,每条子数据流之间完全独立。Flink中的分流主要通过侧输出流来实现。

        通过调用底层的处理函数,可以获取到上下文信息,调用上下文的.output方法就可以实施分流操作了。.output方法需要传入一个“输出标签"(OutputTag),用来标记侧输出流(相当于给侧输出流盖了个戳,指明他的名称和类型),之后也可以通过.getSideOutput()方法传入OutputTag获取到相应的侧输出流。

二、合流

        对多个来源的多条流进行联合处理时,需要用到合流操作,具体有如下几种合流算子:

1. union

        union操作要求不同流中的数据类型必须一致, 类似sql语言中的union,是纵向的合并。对datastream调用.union方法即可实现多流合并,合并后的流类型仍然是datastream。这里要注意,多条流合并后的水位线应以最小的那个为准(类似多个并行子任务向下游传递)。

stream1.union(stream2, stream3, ...)

2. connect

        union操作简单,但要求流的数据类型一致,实际应用中实用性不高。针对两条数据类型不一样的流,Flink还提供了connect合流操作,connect操作只能连接两条流。

(1) 两个dataStream进行connect -> 连接流(ConnectedStreams)

        对于两条数据类型不一致的dataStream进行连接,调用.connect()方法,所得到的是一个连接流ConnectedStreams,然后再调用同处理方法分别对两条流进行处理,得到一个统一类型的dataStream。这里的同处理方法可以是map、flatmap也可以是底层的处理函数process,只是在传入参数时跟以往的单流不同,如map方法传入的不再是MapFunction而是CoMapFunction,可以实现对两条流分别做map操作。

        对ConnectedStreams也可以先调用keyBy进行按键分区操作后,再调用同处理方法。这里调用KeyBy后得到的仍然是ConnectedStreams,keyBy要传入两个参数keySelector1和keySelector2类似于sql中两表之间的 join操作的关联字段。

connectedStreams.keyBy(keySelector1, keySelector2);

(2) dataStream与广播流(broadcastStream)进行connect -> 广播连接流

       当需要动态定义某些规则或配置时,如维度表配置信息是动态变化的,存储在MySQL数据库中,我们用maxwell实时对它进行了监控,当发生变化时,这个配置信息是要完整的告知原始数据流的(从业务数据库中抽取的原始数据),即若原始数据流分为了多个并行子任务,则每个并行子任务上都应该知道配置信息的变化,因此需要对配置信息进行广播连接。

        对dataStream调用.broadcast()方法就可以得到广播流,将要处理的数据流与这条广播流进行connect,得到的就是广播连接流,可以调用.process方法进行动态处理,同样要实现的是一个类似CoProcessFunction的抽象类,对两条流分别进行处理。

3. join

        connect方法已经能够实现各种需求了,但是其支持的处理函数太过于底层,在很多场景下太过于抽象了,flink还为datastream提供了内置的join算子和coGroup算子来简化一些特定场景下的合流操作。

(1) 窗口联结(window join)

        当我们不仅需要对两条流进行连接,还需要对连接后的流进行窗口操作,Flink为这种场景专门提供了一个窗口联结算子。如下操作可将两条流基于联结字段进行配对,并将key相同的放入一个窗口进行窗口计算。

stream1.join(stream2)
    .where(<KeySelector>)    // stream1的联结字段
    .equalTo(<KeySelector>)    // stream2的联结字段
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

        注意 这里调用窗口函数只能通过.apply()方法。

        窗口join的具体流程如下:两条流根据key进行分组,分别进入对应的窗口存储;到达窗口时间时,会先统计窗口内两条流的笛卡尔积,然后进行遍历,遍历到一对匹配的数据就调用一次窗口函数并输出结果。

(2) 间隔联结(interval join)

        间隔联结为数据流中的每一条数据单独开辟属于自己的时间窗口。试想这样一个场景,对于一条流A中的一条数据a,它只想和自己时间戳的前后一段时间间隔的B数据流进行连接,这样窗口联结就无法做到,需要间隔联结。

        间隔联结的两条流必须基于相同的key,且需要给定间隔上界和间隔下界,则数据a的窗口大小就是[a.timestamp+lowbound, a.timestamp+upperbound],其中lowbound<upperbound,两者都可正可负。

stream1.keyBy(<KeySelector>)
    .intervalJoin(stream2.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process(new ProcessJoinFunction(){})

4. coGroup

        coGroup 与窗口联结类似,也是将两条流合并后开窗处理匹配元素,调用时只需将.join()方法换成.coGroup()方法即可。

stream1.coGroup(stream2)
    .where(<KeySelector>)    // stream1的联结字段
    .equalTo(<KeySelector>)    // stream2的联结字段
    .window(<WindowAssigner>)
    .apply(<CoGroupFunction>)

        在window join中,数据在窗口中是先做笛卡尔积,再遍历是否匹配, 只有匹配的数据才会去调用apply方法,因此,window join实现的是类似sql中的inner join功能。而在coGroup函数中,数据不会做笛卡尔积,而是将所有搜集到的数据都传入到apply方法中,用户可以自定义匹配逻辑,因此可以实现任意外连接或是其他用户想要的连接方式。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/944468.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

QT集成IntelRealSense双目摄像头2,集成OpenGL

上一篇文章写了如何把IntelRealSense摄像头的SDK集成到QT项目&#xff0c;并成功采集数据&#xff0c;在没有用OpenCV的情况下完成色彩数据&#xff0c;以及深度数据的显示。 具体地址&#xff1a;https://blog.csdn.net/qujia121qu/article/details/144734163 本次主要写如何…

数据分析的分类和EDIT思维框架

为了服务于企业不同层次的决策&#xff0c;商业数据分析过程需要提供相应的数据科学产出物。 一般而言&#xff0c;数据分析需要经历从需求层、数据层、分析层到输出层四个阶段。 第一个阶段是需求层——确定目标&#xff0c;具体目标需要依据具体的层次进行分析&#xff1a…

面试场景题系列:设计URL短链

1.场景需求界定 1.缩短URL&#xff1a;提供一个长URL&#xff0c;返回一个短很多的URL。 2.重定向URL&#xff1a;提供一个缩短了的URL&#xff0c;重定向到原URL。 3.高可用、可扩展性和容错性考量。 •写操作&#xff1a;每天生成1亿个URL。 •每秒的写操作数&#xff1a…

Linux 基本指令

目录 1.常见指令 1.1 ls指令 1.2 pwd指令 1.3 cd指令 1.4 touch指令 1.5 mkdir指令 1.6 rm和rmdir指令 1.7 man指令 1.8 cp指令 1.9 mv指令 ​编辑 1.10 cat指令 1.11 more指令 1.12 less指令 1.13 head指令 1.14.tail指令 1.15 时间相关的指令 1.16 cal…

WEB UI 创建视图

1 视图名称 (点第1创建视图) 2 模型节点 可以空 3 上下文节点 4 新增节点下的属性 &#xff0c;参考结构(先建好的结构) 5 选择视图类型&#xff1a;&#xff08;表单&#xff0c; 列表&#xff09; 表单 &#xff1a;单条数据 列表 &#xff1a;多条数据&#xff08;表格…

redis cluster实验详解

华子目录 实验环境准备部署redis cluster添加节点删除节点redis cluster集群维护 实验 环境准备 再开3台主机 先把之前3台源码编译的redis删除 [rootredis-node1 ~]# cd /usr/local/redis/ [rootredis-node1 redis]# make uninstall[rootredis-node2 ~]# cd /usr/local/redi…

【详细讲解】hive优化

1、开启本地模式 大多数的Hadoop Job是需要Hadoop提供的完整的可扩展性来处理大数据集的。不过&#xff0c;有时Hive的输入数据量是非常小的。在这种情况下&#xff0c;为查询触发执行任务消耗的时间可能会比实际job的执行时间要多的多。对于大多数这种情况&#xff0c;Hive可…

Unity3d UGUI如何优雅的实现Web框架(Vue/Rect)类似数据绑定功能(含源码)

前言 Unity3d的UGUI系统与Web前端开发中常见的数据绑定和属性绑定机制有所不同。UGUI是一个相对简单和基础的UI系统&#xff0c;并不内置像Web前端&#xff08;例如 Vue.js或React中&#xff09;那样的双向数据绑定或自动更新UI的机制。UGUI是一种比较传统的 UI 系统&#xff…

828华为云征文|使用sysbench对Flexus X实例对mysql进行性能测评

目录 一、Flexus X实例概述 1.1?Flexus X实例 1.2?在mysql方面的优势 二、在服务器上安装MySQL 2.1 在宝塔上安装docker 2.2 使用宝塔安装mysql 2.3 准备测试数据库和数据库表 三、安装sysbench并进行性能测试 3.1 使用yum命令sysbench 3.2?运行?sysbench 并进行…

影刀进阶指令 | Kimi (对标ChatGPT)

文章目录 影刀进阶指令 | Kimi &#xff08;对标ChatGPT&#xff09;一. 需求二. 流程三. 实现3.1 流程概览3.2 流程步骤讲解1\. 确定问题2\. 填写问题并发送3\. 检测答案是否出完 四. 运维 影刀进阶指令 | Kimi &#xff08;对标ChatGPT&#xff09; 简单讲讲RPA调用kimi实现…

【教程】通过Docker运行AnythingLLM

转载请注明出处&#xff1a;小锋学长生活大爆炸[xfxuezhagn.cn] 如果本文帮助到了你&#xff0c;欢迎[点赞、收藏、关注]哦~ 官方教程&#xff1a;Local Docker Installation ~ AnythingLLM 1、先创建一个目录用于保存anythingllm的持久化文件&#xff1a; sudo mkdir /app su…

游戏引擎学习第65天

回顾我们在模拟区域更改方面的进展 目前我们正在进行游戏的架构调整&#xff0c;目标是建立一个引擎架构。我们正在实施的一个关键变化是引入模拟区域的概念&#xff0c;这样我们可以创建非常大的游戏世界&#xff0c;而这些世界的跨度不必受限于单个浮点变量。 通过这种方式…

【从零开始入门unity游戏开发之——C#篇35】C#自定义类实现Sort自定义排序

文章目录 一、List<T>自带的排序方法1、List<T>调用Sort()排序2、 能够使用 Sort() 方法进行排序的本质 二、自定义类的排序1、通过实现泛型IComparable<T> 接口&#xff08;1&#xff09;示例&#xff08;2&#xff09;直接调用 int 类型的 CompareTo 方法进…

YOLO系列正传(五)YOLOv4论文精解(上):从CSPNet、SPP、PANet到CSPDarknet-53

系列文章 YOLO系列基础 YOLO系列基础合集——小白也看得懂的论文精解-CSDN博客 YOLO系列正传 YOLO系列正传&#xff08;一&#xff09;类别损失与MSE损失函数、交叉熵损失函数-CSDN博客 YOLO系列正传&#xff08;二&#xff09;YOLOv3论文精解(上)——从FPN到darknet-53-C…

Redis 实战篇 ——《黑马点评》(上)

《引言》 在进行了前面关于 Redis 基础篇及其客户端的学习之后&#xff0c;开始着手进行实战篇的学习。因内容很多&#xff0c;所以将会分为【 上 中 下 】三篇记录学习的内容与在学习的过程中解决问题的方法。Redis 实战篇的内容我写的很详细&#xff0c;为了能写的更好也付出…

DevOps实战:用Kubernetes和Argo打造自动化CI/CD流程(2)

DevOps实战&#xff1a;用Kubernetes和Argo打造自动化CI/CD流程&#xff08;2&#xff09; 背景 Tips 翻遍国内外的文档&#xff0c;关于 Argo 作为 CI/CD 当前所有开源的文档&#xff0c;博客&#xff0c;argo官方文档。得出的结论是&#xff1a; argo官方给出的例子都相对…

探索Flink动态CEP:杭州银行的实战案例

摘要&#xff1a;本文撰写自杭州银行大数据工程师唐占峰、欧阳武林老师。将介绍 Flink 动态 CEP的定义与核心概念、应用场景、并深入探讨其技术实现并介绍使用方式。主要分为以下几个内容&#xff1a; Flink动态CEP简介 Flink动态CEP的应用场景 Flink动态CEP的技术实现 Flin…

STM32F103RCT6学习之三:串口

1.串口基础 2.串口发送 1&#xff09;基本配置 注意&#xff1a;实现串口通信功能需在keil中设置打开Use Micro LIB&#xff0c;才能通过串口助手观察到串口信息 2)编辑代码 int main(void) {/* USER CODE BEGIN 1 *//* USER CODE END 1 *//* MCU Configuration-------------…

Python中构建终端应用界面利器——Blessed模块

在现代开发中&#xff0c;命令行应用已经不再仅仅是一个简单的文本输入输出工具。随着需求的复杂化和用户体验的重视&#xff0c;终端界面也逐渐成为一个不可忽视的设计环节。 如果你曾经尝试过开发终端UI&#xff0c;可能对传统的 print() 或者 input() 函数感到不满足&#…

OpenHarmony-5.PM 子系统(2)

电池服务组件OpenHarmony-4.1-Release 1.电池服务组件 Battery Manager 提供了电池信息查询的接口&#xff0c;同时开发者也可以通过公共事件监听电池状态和充放电状态的变化。电池服务组件提供如下功能&#xff1a; 电池信息查询。充放电状态查询。关机充电。 电池服务组件架…