Flink Flink中的合流

一、Flink中的基本合流操作

在实际应用中,我们经常会遇到来源不同的多条流,需要将它们的数据进行联合处理。所以 Flink 中合流的操作会更加普遍,对应的 API 也更加丰富。

二、联合(Union)

最简单的合流操作,就是直接将多条流合在一起,叫作流的“联合”(union)。联合操作要求必须流中的数据类型必须相同,合并之后的新流会包括所有流中的元素,数据类型不变。
在这里插入图片描述
在代码中,我们只要基于 DataStream 直接调用.union()方法,传入其他 DataStream 作为参数,就可以实现流的联合了;得到的依然是一个 DataStream:

stream1.union(stream2, stream3, ...)

注意:union()的参数可以是多个 DataStream,所以联合操作可以实现多条流的合并。

代码实现:我们可以用下面的代码做一个简单测试:

package com.flink.DataStream.UnionStream;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkUnionStream {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        streamExecutionEnvironment.setParallelism(1);
        SingleOutputStreamOperator<Integer> source1 = streamExecutionEnvironment
                .socketTextStream("localhost", 1111)
                .map(a -> Integer.parseInt(a));
        SingleOutputStreamOperator<Integer> source2 = streamExecutionEnvironment
                .socketTextStream("localhost", 2222)
                .map(a -> Integer.parseInt(a));
        DataStreamSource<String> source3 = streamExecutionEnvironment.fromElements("3", "4", "5");
        DataStream<Integer> unionResult = source1.union(source2, source3.map(Integer::valueOf));
        unionResult.print();
        streamExecutionEnvironment.execute();
    }
}

在这里插入图片描述
在这里插入图片描述

三、连接(Connect)

为了处理更加灵活,连接操作允许流的数据类型不同。但我们知道一个DataStream中的数据只能有唯一的类型,所以连接得到的结果并不是DataStream,而是一个“连接流”。连接流可以看成是两条流形式上的“统一”,被放在了一个同一个流中;事实上内部仍保持各自的数据形式不变,彼此之间是相互独立的。要想得到新的DataStream,还需要进一步定义一个“同处理”(co-process)转换操作,用来说明对于不同来源、不同类型的数据,怎样分别进行处理转换、得到统一的输出类型。所以整体上来,两条流的连接就像是“一国两制”,两条流可以保持各自的数据类型、处理方式也可以不同,不过最终还是会统一到同一个DataStream中。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/197743.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

「我在淘天做技术」迈步从头越 - 阿里妈妈广告智能决策技术的演进之路

作者&#xff1a;妙临、霁光、玺羽 一、前言 在线广告对于大多数同学来说是一个既熟悉又陌生的技术领域。「搜广推」、「搜推广」等各种组合耳熟能详&#xff0c;但广告和搜索推荐有本质区别&#xff1a;广告解决的是“媒体-广告平台-广告主”等多方优化问题&#xff0c;其中媒…

手机便签app哪个比较好用?

手机便签类软件的种类是比较多的&#xff0c;不管是安卓手机品牌还是苹果手机品牌的手机&#xff0c;在手机的应用商店中搜索“便签”&#xff0c;大家会找到很多便签类软件。那么&#xff0c;手机便签APP哪个比较好用呢&#xff1f; 在选择手机便签APP时&#xff0c;大家比较…

详解原生Spring中的控制反转和依赖注入-构造注入和Set注入

&#x1f609;&#x1f609; 学习交流群&#xff1a; ✅✅1&#xff1a;这是孙哥suns给大家的福利&#xff01; ✨✨2&#xff1a;我们免费分享Netty、Dubbo、k8s、Mybatis、Spring...应用和源码级别的视频资料 &#x1f96d;&#x1f96d;3&#xff1a;QQ群&#xff1a;583783…

Java变量理解

成员变量VS局部变量的区别 语法形式&#xff1a;从语法形式上看&#xff0c;成员变量是属于类的&#xff0c;而局部变量是在代码块或方法中定义的变量或是方法的参数&#xff1b;成员变量可以被 public,private,static 等修饰符所修饰&#xff0c;而局部变量不能被访问控制修饰…

如何高效批量生成条形码?

条形码作为商品、库存和信息管理的基础工具&#xff0c;扮演着至关重要的角色。为了满足用户对于高效、专业、多样化的条形码生成需求&#xff0c;我们推出了一款专业高效的在线条形码生成工具。 网址&#xff1a;https://www.1txm.com/ 多样化条形码支持 易条形支持多种常见…

【算法萌新闯力扣】:旋转链表

力扣题目&#xff1a;旋转链表 开篇 今天是备战蓝桥杯的第25天和算法村开营第3天&#xff01;经过这3天的学习&#xff0c;感觉自己对链表的掌握程度大大地提升&#xff0c;尤其是在帮村里的同学讨论相关问题时。本篇文章&#xff0c;给大家带来一道旋转链表的题目&#xff0c…

leetcode:用队列实现栈(后进先出)

题目描述 题目链接&#xff1a;225. 用队列实现栈 - 力扣&#xff08;LeetCode&#xff09; 题目分析 我们先把之前写的队列实现代码搬过来 用队列实现栈最主要的是实现栈后进先出的特点&#xff0c;而队列的特点是先进先出&#xff0c;那么我们可以用两个队列来实现 一个队…

PTA-6-48 使用面向对象的思想编写程序描述动物

题目&#xff1a; 使用面向对象的思想编写程序描述动物&#xff0c;说明&#xff1a; &#xff08;1) 分析兔子和青蛙的共性&#xff0c;定义抽象的动物类&#xff0c;拥有一些动物共有的属性&#xff1a;名字、颜色、类别&#xff08;哺乳类、非哺乳类&#xff09;&#xff0c…

Redis 事件轮询

1 Redis 为什么快 数据存在内存中, 直接操作内存中的数据单线程处理业务请求避免了多线的上下文切换, 锁竞争等弊端使用 IO 多路复用支撑更高的网络请求使用事件驱动模型, 通过事件通知模式, 减少不必要的等待… 这些都是 Redis 快的原因。 但是这些到了代码层面是如何实现的呢…

出纳常用的月报表,熬夜做了这8份直接用!

做出纳&#xff0c;公司财务的日报表是必不可少的&#xff0c;收支了多少&#xff0c;支出了多少&#xff0c;这些都是要记录下来的&#xff01; 一份出纳日报表通常包含以下内容&#xff1a; 1. 日期&#xff1a;报告涵盖的具体日期&#xff0c;标明是哪一天的财务数据。 2. 收…

【论文阅读】An Experimental Survey of Missing Data Imputation Algorithms

论文地址&#xff1a;An Experimental Survey of Missing Data Imputation Algorithms | IEEE Journals & Magazine | IEEE Xplore 处理缺失数据最简单的方法就是是丢弃缺失值的样本&#xff0c;但这会使得数据更加不完整并且导致偏差或影响结果的代表性。因此&#xff0c;…

bash编程 数组和for循环的应用

bash编程 数组和for循环的应用 1、问题背景2、bash 定义数组3、for循环遍历输出数组所有元素4、编写bash脚本输出每个端口是否在监听状态 1、问题背景 linux服务器开机后&#xff0c;需要检查一组端口是否在监听&#xff0c;以便判断这些端口对应的服务是否在运行。可以考虑使…

别再让假的fiddler教程毒害你了,来看这套最全最新的fiddler全工具讲解

fiddler界面工具栏介绍 添加图片注释&#xff0c;不超过 140 字&#xff08;可选&#xff09; &#xff08;1&#xff09;WinConfig&#xff1a;windows 使用了一种叫做“AppContainer”的隔离技术&#xff0c;使得一些流量无法正常捕获&#xff0c;在 fiddler中点击 WinConfig…

轻巧高效的剃须好工具,DOCO黑刃电动剃须刀上手

剃须刀大家都用过&#xff0c;我比较喜欢电动剃须刀&#xff0c;尤其是多刀头的悬浮剃须刀&#xff0c;感觉用起来很方便&#xff0c;剃须效率也很高。最近我在用一款DOCO小蔻的黑刃电动剃须刀&#xff0c;这款剃须刀轻巧易用&#xff0c;而且性价比超高。 相比于同类产品&…

深度盘点:100 个 Python 数据分析函数总结

经过一段时间的整理&#xff0c;本期将分享我认为比较常用的100个实用函数&#xff0c;这些函数大致可以分为六类&#xff0c;分别是统计汇总函数、数据清洗函数、数据筛选、绘图与元素级运算函数、时间序列函数和其他函数。 技术交流 技术要学会交流、分享&#xff0c;不建议…

两个mongo表,A和B,以A中的_id记录的为准, 删掉B表中A表中没有的记录

可以使用 MongoDB 的聚合管道和 $lookup 操作符来实现这个需求。以下是一个示例的查询语句,假设集合 A 和集合 B 分别对应表 A 和表 B: db.B.aggregate([{$lookup: {from: "A",localField: "_id",foreignField:

单片机复位电路

有时候我们的代码会跑飞,这个时候基本上是一切推到重来.”推倒重来”在计算机术语上称为复位.复位需要硬件的支持,复位电路就是在单片机的复位管脚上产生一个信号&#xff0c;俗称复位信号.这个信号需要持续一定的时间,单片机收到该信号之后就会复位,从头执行。 复位原理: 那么…

【工业智能】Solutions

各类问题对应的解决方案 工艺参数推荐APC 排产调度智能算法强化学习 运筹优化空压机群控 预测 工艺参数推荐 APC 排产调度 智能算法 遗传算法 强化学习 DDQN 运筹优化 空压机群控 MIP混合整数规划 能耗优化 预测 电池容量预测 时序预测&#xff0c;回归预测 点击剩余…

【Vue】Vue3 配置全局 scss 变量

variables.scss $color: #0c8ce9;vite.config.ts // 全局css变量css: {preprocessorOptions: {scss: {additionalData: import "/styles/variables.scss";,},},},.vue 文件使用

创建一个带有背景图层和前景图层的渲染窗口

开发环境&#xff1a; Windows 11 家庭中文版Microsoft Visual Studio Community 2019VTK-9.3.0.rc0vtk-example demo解决问题&#xff1a; 创建一个带有背景图层和前景图层的渲染窗口&#xff0c;知识点&#xff1a;1. 画布转image&#xff1b;2. 渲染图层设置&#xff1b;3.…