Flink多流转换(2)—— 双流连结

双流连结(Join):根据某个字段的值将数据联结起来,“配对”去做处理

窗口联结(Window Join)

可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理

代码逻辑

首先需要调用 DataStream 的.join()方法来合并两条流,得到一个 JoinedStreams;接着通过.where().equalTo()方法指定两条流中联结的 key;然后通过.window()开窗口,并调用.apply()传入联结窗口函数进行处理计算

stream1.join(stream2)
 .where(<KeySelector>)
 .equalTo(<KeySelector>)
 .window(<WindowAssigner>)
 .apply(<JoinFunction>)

对于JoinFunction

public interface JoinFunction<IN1, IN2, OUT> extends Function, Serializable {
 OUT join(IN1 first, IN2 second) throws Exception;
}

使用时需要实现内部的join方法,有两个参数,分别表示两条流中成对匹配的数据

JoinFunciton 并不是真正的“窗口函数”,它只是定义了窗口函数在调用时对匹配数据的具体处理逻辑

实现流程

两条流的数据到来之后,首先会按照 key 分组、进入对应的窗口中存储;当到达窗口结束时间时,算子会先统计出窗口内两条流的数据的所有组合,也就是对两条流中的数据做一个笛卡尔积(相当于表的交叉连接,cross join),然后进行遍历,把每一对匹配的数据,作为参数(first,second)传入 JoinFunction 的.join()方法进行计算处理。所以窗口中每有一对数据成功联结匹配,JoinFunction 的.join()方法就会被调用一次,并输出一个结果

实例分析

// 基于窗口的join
public class WindowJoinTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Tuple2<String, Long>> stream1 = env
                .fromElements(
                        Tuple2.of("a", 1000L),
                        Tuple2.of("b", 1000L),
                        Tuple2.of("a", 2000L),
                        Tuple2.of("b", 2000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                                                return stringLongTuple2.f1;
                                            }
                                        }
                                )
                );

        DataStream<Tuple2<String, Long>> stream2 = env
                .fromElements(
                        Tuple2.of("a", 3000L),
                        Tuple2.of("b", 3000L),
                        Tuple2.of("a", 4000L),
                        Tuple2.of("b", 4000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                                                return stringLongTuple2.f1;
                                            }
                                        }
                                )
                );

        stream1
                .join(stream2)
                .where(r -> r.f0)
                .equalTo(r -> r.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    @Override
                    public String join(Tuple2<String, Long> left, Tuple2<String, Long> right) throws Exception {
                        return left + "=>" + right;
                    }
                })
                .print();

        env.execute();
    }
}

运行结果如下:

间隔联结(Interval Join)

统计一段时间内的数据匹配情况,不应该用滚动窗口或滑动窗口来处理——因为匹配的两个数据有可能刚好“卡在”窗口边缘两侧

因此需要采用”间隔联结“的操作,针对一条流的每个数据,开辟出其时间戳前后的一段时间间隔,看这期间是否有来自另一条流的数据匹配

原理

给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);

于是对于一条流(不妨叫作 A)中的任意一个数据元素 a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以 a 的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围

所以对于另一条流(不妨叫 B)中的数据元素 b,如果它的时间戳落在了这个区间范围内,a 和 b 就可以成功配对,进而进行计算输出结果

匹配条件为:a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

举例分析如下:

下方的流 A 去间隔联结上方的流 B,所以基于 A 的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2 毫秒,上界为 1 毫秒。于是对于时间戳为 2 的 A 中元素,它的可匹配区间就是[0, 3],流 B 中有时间戳为 0、1 的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A 中时间戳为 3 的元素,可匹配区间为[1, 4],B 中只有时间戳为 1 的一个数据可以匹配,于是得到匹配数据对(3, 1)

代码逻辑

基于KeyedStream进行联结操作:


①DataStream 通过 keyBy 得到KeyedStream

②调用.intervalJoin()来合并两条流,得到的是一个 IntervalJoin 类型

③通过.between()方法指定间隔的上下界,再调用.process()方法,定义对匹配数据对的处理操作

④调用.process()需要传入一个处理函数:ProcessJoinFunction

stream1
 .keyBy(<KeySelector>)
 .intervalJoin(stream2.keyBy(<KeySelector>))
 .between(Time.milliseconds(-2), Time.milliseconds(1))
 .process (new ProcessJoinFunction<Integer, Integer, String(){
 	@Override
 	public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
 	out.collect(left + "," + right);
 }
 });

实例分析

// 基于间隔的join
public class IntervalJoinTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple3<String, String, Long>> orderStream = env.fromElements(
                Tuple3.of("Mary", "order-1", 5000L),
                Tuple3.of("Alice", "order-2", 5000L),
                Tuple3.of("Bob", "order-3", 20000L),
                Tuple3.of("Alice", "order-4", 20000L),
                Tuple3.of("Cary", "order-5", 51000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple3<String, String, Long>>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, String, Long>>() {
                    @Override
                    public long extractTimestamp(Tuple3<String, String, Long> element, long recordTimestamp) {
                        return element.f2;
                    }
                })
        );

        SingleOutputStreamOperator<Event> clickStream = env.fromElements(
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Alice", "./prod?id=200", 3500L),
                new Event("Bob", "./prod?id=2", 2500L),
                new Event("Alice", "./prod?id=300", 36000L),
                new Event("Bob", "./home", 30000L),
                new Event("Bob", "./prod?id=1", 23000L),
                new Event("Bob", "./prod?id=3", 33000L)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                    @Override
                    public long extractTimestamp(Event element, long recordTimestamp) {
                        return element.timestamp;
                    }
                })
        );

        orderStream.keyBy(data -> data.f0)
                .intervalJoin(clickStream.keyBy(data -> data.user))
                .between(Time.seconds(-5), Time.seconds(10))
                .process(new ProcessJoinFunction<Tuple3<String, String, Long>, Event, String>() {
                    @Override
                    public void processElement(Tuple3<String, String, Long> left, Event right, Context ctx, Collector<String> out) throws Exception {
                        out.collect(right + " => " + left);
                    }
                })
                .print();

        env.execute();
    }}

运行结果如下:

窗口同组联结(Window CoGroup)

用法跟 window join非常类似,也是将两条流合并之后开窗处理匹配的元素,调用时只需要将.join()换为.coGroup()就可以了

代码逻辑

stream1.coGroup(stream2)
 .where(<KeySelector>)
 .equalTo(<KeySelector>)
 .window(TumblingEventTimeWindows.of(Time.hours(1)))
 .apply(<CoGroupFunction>)

与 window join 的区别在于,调用.apply()方法定义具体操作时,传入的是一个CoGroupFunction

public interface CoGroupFunction<IN1, IN2, O> extends Function, Serializable {
 void coGroup(Iterable<IN1> first, Iterable<IN2> second, Collector<O> out) throws Exception;
}

coGroup方法与FlatJoinFunction.join()方法类似,其中的三个参数依旧是两条流中的数据以及用于输出的收集器;但前两个参数不再是单独的配对数据,而是可遍历的数据集合;

因此该方法中直接把收集到的所有数据一次性传入,然后自定义配对方式,不需要再计算窗口中两条流数据集的笛卡尔积;

实例分析

// 基于窗口的join
public class CoGroupTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Tuple2<String, Long>> stream1 = env
                .fromElements(
                        Tuple2.of("a", 1000L),
                        Tuple2.of("b", 1000L),
                        Tuple2.of("a", 2000L),
                        Tuple2.of("b", 2000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                                                return stringLongTuple2.f1;
                                            }
                                        }
                                )
                );

        DataStream<Tuple2<String, Long>> stream2 = env
                .fromElements(
                        Tuple2.of("a", 3000L),
                        Tuple2.of("b", 3000L),
                        Tuple2.of("a", 4000L),
                        Tuple2.of("b", 4000L)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<Tuple2<String, Long>>forMonotonousTimestamps()
                                .withTimestampAssigner(
                                        new SerializableTimestampAssigner<Tuple2<String, Long>>() {
                                            @Override
                                            public long extractTimestamp(Tuple2<String, Long> stringLongTuple2, long l) {
                                                return stringLongTuple2.f1;
                                            }
                                        }
                                )
                );

        stream1
                .coGroup(stream2)
                .where(r -> r.f0)
                .equalTo(r -> r.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new CoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<String, Long>> iter1, Iterable<Tuple2<String, Long>> iter2, Collector<String> collector) throws Exception {
                        collector.collect(iter1 + "=>" + iter2);
                    }
                })
                .print();

        env.execute();
    }
}

运行结果如下:

学习课程链接:【尚硅谷】Flink1.13实战教程(涵盖所有flink-Java知识点)_哔哩哔哩_bilibili  

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/349754.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

软考培训机构哪家比较好?各软考培训机构排名如何?

先放上机构测评图 一、机构情况 &#xff08;1&#xff09;主营业务 大多数软考培训机构主要致力于IT培训或者软件行业。这些机构的课程更加专业&#xff0c;因为他们起源于该行业。我相信报考软考的同学大部分也是从事这个行业的。个人认为选择这类机构进行培训会有更多好处…

图片保存后多了个水印?教你如何用华为手机保存无水印图片

对于各类生活App的深度用户来说&#xff0c;有时候碰到实用的生活技巧、攻略&#xff0c;甚至是一张好看的风景照&#xff0c;都会第一时间想要长按把图片保存到手机相册&#xff0c;有时候还会分享给朋友、朋友圈。 但是有些图片在App上显示的时候是干净的&#xff0c;保存下…

day30_回溯总结

文章目录 回溯的问题总结&#xff1a;1. 回溯三部曲&#xff1a;2. 回溯的模板3. 回溯题型4. 回溯的概念&#xff1a;5. 回溯的重点问题&#xff1a;组合和去重。[5.1 组合问题&#xff1a;](https://programmercarl.com/0077.%E7%BB%84%E5%90%88.html)剪枝优化[5.2 去重问题—…

接口自动化中如何完成接口加密与解密?

加密是一种限制对网络上传输数据的访问权的技术。将密文还原为原始明文的过程称为解密&#xff0c;它是加密的反向处理。在接口开发中使用加密、解密技术&#xff0c;可以防止机密数据被泄露或篡改。在接口自动化测试过程中&#xff0c;如果要验证加密接口响应值正确性的话&…

temu跨境电商怎么样?做temu蓝海项目有哪些优势?

在全球电商市场激烈的竞争中&#xff0c;Temu跨境电商平台以其独特的优势和策略&#xff0c;逐渐崭露头角。对于许多想要拓展海外市场的商家来说&#xff0c;Temu的蓝海项目提供了一个充满机遇的新平台。本文将深入探讨Temu跨境电商的优势以及在蓝海市场中的发展前景。 全球化市…

SpringBoot的自动装配原理

一、SpringBootConfiguration注解的作用 SpringBootApplication注解是SpringBoot项目的核心注解,加在启动引导类上。点击进去可以发现SpringBootApplication注解是一个组合注解。其中SpringBootConfiguration和EnableAutoConfiguration是由Spring提供的,剩下的注解是由JDK提供的…

牛客BC52 判断整数奇偶性(C语言)

#include <stdio.h> int main() {int a;while ((scanf("%d", &a)) ! EOF){ if (a%20)printf("Even\n");elseprintf("Odd\n");}return 0; }

跨平台Recorder录音插件:支持多种格式、音频可视化、实时上传、语音识别

视频教程地址&#xff1a;【跨平台Recorder录音插件&#xff1a;支持多种格式、音频可视化、实时上传、语音识别】 https://www.bilibili.com/video/BV1jQ4y1c7e4/?share_sourcecopy_web&vd_sourcee66c0e33402a09ca7ae1f0ed3d5ecf7c /** 先引入Recorder &#xff08; 需先…

Programming Abstractions in C阅读笔记:p254-p257

《Programming Abstractions in C》学习第70天&#xff0c;p254-p257总结&#xff0c;总计4页。 一、技术总结 1.minimax strategy(极小化极大算法) p255, This idea–finding the position that leaves your opponent with the worst possible best move–is called the mi…

MiniTab的单值的变量控制图——I-MR 控制图分析

单值的变量控制图分为&#xff1a;I-MR 控制图、Z-MR 控制图、单值控制图、移动极差控制图4种。 I-MR 控制图 功能菜单请选择&#xff1a;统计>控制图>单值的变量控制图>I-MR。 使用 I-MR 控制图 可以在拥有连续数据且这些数据是不属于子组的单个观测值的情况下监视…

Supervised Contrastive 损失函数详解

有什么不对的及时指出&#xff0c;共同学习进步。(●’◡’●) 有监督对比学习将自监督批量对比方法扩展到完全监督设置&#xff0c;能够有效地利用标签信息。属于同一类的点簇在嵌入空间中被拉到一起&#xff0c;同时将来自不同类的样本簇推开。这种损失显示出对自然损坏很稳…

专业远程控制软件有哪些品牌

远程办公、远程控制类的软件很多&#xff0c;主打方向和面向的客户人群也不一样。个人用户可能更在意便捷、免费等因素&#xff1b;专业用户会更注重安全性、管理功能等。今天我们介绍几个在全球知名的专业商业远程软件。 1、TeamViewer 简介&#xff1a;TeamViewer &#xf…

EXCEL VBA调用adobe的api识别电子PDF发票里内容并登记台账

EXCEL VBA调用adobe的api识别电子PDF发票里内容并登记台账 代码如下 使用须知&#xff1a; 1、工具--引用里勾选[Adobe Acrobat 10.0 Type Library] 2、安装Adobe Acrobat pro软件Dim sht As Worksheet Function BrowseFolders() As String 浏览目录Dim objshell As ObjectDim…

暗藏危险,警惕钓鱼邮件!

叮 您有一份福利待查收 您的信息资产需要排查 您的账户异常需要验证 这些看似“重要”的邮件 都藏着攻击者的恶意嘴脸 随着网络安全防护和建设的重要性日益凸显&#xff0c;国家安全、企业安全、合规需求及业务驱动等各个方面都亟需将网络安全作为基石。在企业业务转型发展…

【C++中STL】stack和queue容器

stack和queue stack基本概念常用接口 quque基本概念常用接口 stack 基本概念 stack是一种先进后出的数据结构&#xff0c;它只有一个出口 栈中只有顶端的元素可以被外界使用&#xff0c;因此栈不允许由遍历行为 可以判断是否为空empty(),和统计个数size(); 常用接口 1、st…

服务器是什么?(四种服务器类型)

服务器 服务器定义广义: 专门给其他机器提供服务的计算机。狭义:一台高性能的计算机&#xff0c;通过网络提供外部计算机一些业务服务 个人PC内存大概8G&#xff0c;服务器内存128G起步 服务器是什么 服务器指的是 网络中能对其他机器提供某些服务的计算机系统 &#xff0c;相对…

用Yara对红队工具“打标”

前言: YARA 通常是帮助恶意软件研究人员识别和分类恶意软件样本的工具&#xff0c;它基于文本或二进制模式创建恶意样本的描述规则&#xff0c;每个规则由一组字符串和一个布尔表达式组成&#xff0c;这些表达式决定了它的逻辑。 但是这次我们尝试使用 YARA 作为一种扫描工具…

【好书推荐-第五期】《互联网大厂推荐算法实战》(异步图书出品)

&#x1f60e; 作者介绍&#xff1a;我是程序员洲洲&#xff0c;一个热爱写作的非著名程序员。CSDN全栈优质领域创作者、华为云博客社区云享专家、阿里云博客社区专家博主、前后端开发、人工智能研究生。公粽号&#xff1a;程序员洲洲。 &#x1f388; 本文专栏&#xff1a;本文…

机器学习 | 深入探索Numpy的高性能计算能力

目录 初识numpy numpy基本操作 数组的基本操作 ndarray运算 数组间运算 矩阵 初识numpy Numpy&#xff08;Numerical Python&#xff09;是一个开源的Python科学计算库&#xff0c;用于快速处理任意维度的数组。Numpy支持常见的数组和矩阵操作。对于同样的数值计算任务&…

k8s 进阶实战笔记 | Pod 创建过程详解

Pod 创建过程详解 ​ 初始状态0 controller-manager、scheduler、kubelet组件通过 list-watch 机制与 api-server 通信并检查资源变化 第一步 用户通过 CLI 或者 WEB 端等方式向 api-server 发送创建资源的请求&#xff08;比如&#xff1a;我要创建一个replicaset资源&…