Flink双流(join)

 一、介绍

Join大体分类只有两种:Window Join和Interval Join

Window Join有可以根据Window的类型细分出3种:Tumbling(滚动) Window Join、Sliding(滑动) Window Join、Session(会话) Widnow Join。

        🌸Window 类型的join都是利用window的机制,先将数据缓存在Window State中,当窗口触发计算时,执行join操作。

        🌸Interval join也是利用state存储数据再处理,区别在于state中的数据有失效机制,依靠数据触发数据清理,目前Stream join的结果是数据的卡尔积。

二、Window Join

✨Tumbling Window Join

        执行翻滚窗口联接时,具有公共键和公告翻滚窗口的所有元素将成对组合联接,并传递JoinFunction或FlatJoinFunction。因为它的行为类似于内部连接,所以一个流中的元素在其滚动窗口中没有来自另一个流的元素,因此不会被发射。

        如图所示,我们定义了一个为2毫秒的翻滚窗口,结果窗口的形式为[0,1]、[2,3]..............该图显示了每个窗口中所以元素的成对组合,这些元素将传递给JoinFunction。注意在翻滚窗口[6,7]中没有发射任何东西,因为绿色流中不存在与橙色元素⑥和⑦结合的元素。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 ...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

✨Sliding Window Join

        在执行滑动窗口联接时,具有公共键和公共滑动窗口的所以元素将作为成对组合联接,并传递JoinFunction或FlatJoinFunction。在当前滑动窗口中,一个流的元素没有来自另一个流的元素,则不会发射!请注意,某些元素可能会联接到一个滑动窗口中,但不会联接到另一个滑动窗口中!

        在本例中,我们使用大小为2毫秒的滑动窗口,并将其滑动1毫秒,从而产生滑动窗口[-1,0],[1,2],[2,3]...........x轴下方的连续元素时传递给每个滑动窗口的Join Function的元素。在这里,你还可以看到,例如在窗口[2,3]中,橙色②和绿色③连接,但在窗口[1,2]中没有与任何对象连接。

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

✨Session Window Join

        在执行会话窗口联接时,具有相同键(当“组合”满足会话条件)的所有元素以成对组合方式联接,并传递给JoinFunction或FlatJoinFunction。同样,这执行一个内部连接,所以如果有一个会话窗口只包含来自一个流的元素,则不会发出任何输出

        这里,我们定义一个会话窗口连接,其中每个会话被至少1毫秒的时间分割。有三个会话,在前两个会话中,来自两个流的连接元素被传递给JoinFunction。在第三个会话中,绿色流中没有元素,所以⑧和⑨没有连接!

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
 ...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream.join(greenStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
    .apply (new JoinFunction<Integer, Integer, String> (){
        @Override
        public String join(Integer first, Integer second) {
            return first + "," + second;
        }
    });

三、Interval Join

        前面学习的Window Join必须要在一个Window中进行Join,那如果没有Window如何处理呢?interval join也是使用相同的key来join两个流(流A、流B),并且流B中的元素中的时间戳,和流A元素的时间戳,有一个时间间隔。

b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] or

a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

也就是:流B的元素的时间戳 ≥ 流A的元素时间戳 + 下界,且,流B的元素的时间戳 ≤ 流A的元素时间戳

 

在上面的示例中,我们将两个流“orange”和“green”连接起来,其下限为-2毫秒,上限为+1毫秒。默认情况下,这些边界是包含的,但是可以应用.lowerBoundExclusive()和.upperBoundExclusive来更改行为orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound 

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...DataStream<Integer> greenStream = ...
orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(first + "," + second);
        }
    });

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/400905.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

蜂鸣器实验

1.有源蜂鸣器简介 蜂鸣器常用于计算机、打印机、报警器、电子玩具等电子产品中&#xff0c;常用的蜂鸣器有两种&#xff1a; 有源蜂鸣器和无源蜂鸣器&#xff0c;这里的有“源”不是电源&#xff0c;而是震荡源&#xff0c;有源蜂鸣器内部带有震荡 源&#xff0c;所以有源蜂…

OD(8)之Mermaid流程图(flowcharts)使用详解

OD(8)之Mermaid流程图(flowcharts)使用详解 Author: Once Day Date: 2024年2月20日 漫漫长路才刚刚开始… 全系列文章可参考专栏: Linux实践记录_Once_day的博客-CSDN博客 参考文章: 关于 Mermaid | Mermaid 中文网 (nodejs.cn)Mermaid | Diagramming and charting tool‍…

Maven 私服 Nexus3

一、Maven和Nexus3 简介 Maven是一个采用纯Java编写的开源项目管理工具&#xff0c;采用一种被称之为Project Object Model(POM)概念来管理项目&#xff0c;所有的项目配置信息都被定义在一个叫做POM.xml的文件中, 通过该文件Maven可以管理项目的整个生命周期&#xff0c;包括…

maven 打包命令

Maven是基于项目对象模型(POM project object model)&#xff0c;可以通过一小段描述信息&#xff08;配置&#xff09;来管理项目的构建&#xff0c;报告和文档的软件项目管理工具。 Maven的核心功能便是合理叙述项目间的依赖关系&#xff0c;通俗点讲&#xff0c;就是通过po…

06 分频器设计

分频器简介 实现分频一般有两种方法&#xff0c;一种方法是直接使用 PLL 进行分频&#xff0c;比如在 FPGA 或者 ASIC 设计中&#xff0c;都可以直接使用 PLL 进行分频。但是这种分频有时候受限于 PLL 本身的特性&#xff0c;无法得到频率很低的时钟信号&#xff0c;比如输入 …

Predis Multi-Zone

A Data Flow Framework with High Throughput and Low Latency for Permissioned Blockchains 联盟链的吞吐瓶颈由共识层和网络层的数据分发过程共同决定。 Predis 协议利用了共识节点的空闲带宽&#xff0c;提前分发区块中的内容即bundle&#xff0c;减少了共识区块中的内容&…

2024龙年特别篇 -- 魔法指针 之 assert断言 传址调用 传值调用

你是否为 assert断言&#xff0c;传址调用&#xff0c;传值调用而进一步加深印象&#xff0c;接下来就让白子寰同学为你详细讲解&#xff01;&#xff01;&#xff01; 目录 assert断言 概念 assert介绍 #define NDEBUG的使用 注意事项 传值调用 VS 传址调用 传值调用…

C语言推荐书籍

本书详细讲解了C语言的基本概念和编程技巧。全书共17章。第1章、第2章介绍了C语言编程的预备知识。第3章&#xff5e;第15章详细讲解了C语言的相关知识&#xff0c;包括数据类型、格式化输入/输出、运算符、表达式、语句、循环、字符输入和输出、函数、数组和指针、字符和字符串…

CS_Smb_Beacon上线不出网机器

当我们想上线不出网的机器的时候&#xff0c;我们可以通过上传工具来实现&#xff0c;但是有没有不用上传工具的方法呢&#xff1f;&#xff1f;&#xff1f; 有&#xff01;&#xff01;&#xff01; 而且cs会自带&#xff01;&#xff01;&#xff01; 1.基础的网络拓扑 以下…

MYSQL数据库详解

一、数据库的基本概念 数据&#xff08;data&#xff09;&#xff1a;指对客观事物进行描述并可以鉴别的符号。这些符号是可识别的&#xff0c;抽象的。 比如数字、图片、音频等。 数据库管理系统&#xff08;DBMS&#xff09;&#xff1a;数据库极其管理它的软件组成。 数据库…

RocketMQ-架构与设计

RocketMQ架构与设计 一、简介二、框架概述1.设计特点 三、架构图1.Producer2.Consumer3.NameServer4.BrokerServer 四、基本特性1.消息顺序性1.1 全局顺序1.2 分区顺序 2.消息回溯3.消息重投4.消息重试5.延迟队列&#xff08;定时消息&#xff09;6.重试队列7.死信队列8.消息语…

2.21C语言学习

Floyd算法原理 Floyd算法是一个经典的动态规划算法&#xff0c;它又被称为插点法。该算法名称以创始人之一、1978年图灵奖获得者、斯坦福大学计算机科学系教授罗伯特弗洛伊德命名。Floyd算法是一种利用动态规划的思想寻找给定的加权图中多源点之间最短路径的算法,算法目标是寻找…

工厂方法模式Factory Method

1.模式定义 定义一个用于创建对象的接口&#xff0c;让子类决定实例化哪一个类。Factory Method 使得一个类的实例化延迟到子类 2.使用场景 1.当你不知道改使用对象的确切类型的时候 2.当你希望为库或框架提供扩展其内部组件的方法时 主要优点&#xff1a; 1.将具体产品和创建…

解决NPE的三种方式

解决NPE的三种方式 NullPointerException&#xff08;空指针异常&#xff0c;NPE&#xff09;是Java编程中常见的错误。解决NPE的方法可以从以下三个方面考虑&#xff1a; 明确处理空引用情况&#xff1a; 在某些情况下&#xff0c;无法避免使用可能为空的引用对象。此时&…

【漏洞复现】H3C SecParh堡垒机任意用户登录漏洞

Nx01 产品简介 H3C SecParh堡垒机是一款专业用于安全管理的堡垒机产品&#xff0c;它通过强大的访问控制功能和安全审计功能&#xff0c;实现对网络服务器的远程安全管理和监控。 Nx02 漏洞描述 H3C SecParh堡垒机的get_detail_view.php中存在任意用户登录漏洞。攻击者可以构建…

计算机体系架构初步入门

&#x1f3ac;个人简介&#xff1a;一个全栈工程师的升级之路&#xff01; &#x1f4cb;个人专栏&#xff1a;高性能&#xff08;HPC&#xff09;开发基础教程 &#x1f380;CSDN主页 发狂的小花 &#x1f304;人生秘诀&#xff1a;学习的本质就是极致重复! 目录 1 计算机五大…

备战蓝桥杯---动态规划(应用2(一些十分巧妙的优化dp的手段))

好久不见&#xff0c;甚是想念&#xff0c;最近一直在看过河这道题&#xff08;感觉最近脑子有点宕机QAQ&#xff09;&#xff0c;现在算是有点懂了&#xff0c;打算记录下这道又爱又恨的题。&#xff08;如有错误欢迎大佬帮忙指出&#xff09; 话不多说&#xff0c;直接看题&…

2024三掌柜赠书活动第十一期:精通区块链开发技术(第2版)

目录 前言关于区块链开发技术关于《精通区块链开发技术(第2版)》编辑推荐内容简介作者简介图书目录书中前言/序言《精通区块链开发技术(第2版)》全书速览结束语 前言 作为开发者经常在技术圈活动&#xff0c;会接触各种前沿技术&#xff0c;比如区块链技术的崛起引发了全球范…

MySQL初识——安装配置

文章目录 1. MySQL卸载2. 获取MySQL官方yum源安装包3. 安装4. 启动MySQL5. 登录6. 配置配置文件 Tips&#xff1a; 本章是Centos 7安装配置myql&#xff0c;配置操作用的是root权限 1. MySQL卸载 首先我们先查看一下系统中是否有mysql服务 ps axj | grep mysql如果有&#xf…

部署安装有道QanyThing

前提条件&#xff1a; 1、win10系统更新到最新的版本&#xff0c;系统版本最好为专业版本 winver 查看系统版本&#xff0c;内部版本要大于19045 2、CPU开启虚拟化 3、开启虚拟化功能&#xff0c;1、2、3每步完成后均需要重启电脑&#xff1b; 注&#xff1a;windows 虚拟…