[尚硅谷 flink] 基于时间的合流——双流联结(Join)

文章目录

      • 8.1 窗口联结(Window Join)
      • 8.2 **间隔联结(Interval Join)**

8.1 窗口联结(Window Join)

Flink为基于一段时间的双流合并专门提供了一个窗口联结算子,可以定义时间窗口,并将两条流中共享一个公共键(key)的数据放在窗口中进行配对处理。

package org.example.watermark;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class WindowJoinDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        SingleOutputStreamOperator<Tuple2<String, Integer>> ds1 = env
            .fromElements(
                Tuple2.of("a", 1),
                Tuple2.of("a", 2),
                Tuple2.of("b", 3),
                Tuple2.of("c", 4)
            )
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Tuple2<String, Integer>>forMonotonousTimestamps()
                    .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
            );


        SingleOutputStreamOperator<Tuple3<String, Integer, Integer>> ds2 = env
            .fromElements(
                Tuple3.of("a", 1, 1),
                Tuple3.of("a", 11, 1),
                Tuple3.of("b", 2, 1),
                Tuple3.of("b", 12, 1),
                Tuple3.of("c", 14, 1),
                Tuple3.of("d", 15, 1)
            )
            .assignTimestampsAndWatermarks(
                WatermarkStrategy
                    .<Tuple3<String, Integer, Integer>>forMonotonousTimestamps()
                    .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
            );

        // TODO window join
        // 1. 落在同一个时间窗口范围内才能匹配
        // 2. 根据keyby的key,来进行匹配关联
        // 3. 只能拿到匹配上的数据,类似有固定时间范围的inner join
        DataStream<String> join = ds1.join(ds2)
            .where(r1 -> r1.f0)  // ds1的keyby
            .equalTo(r2 -> r2.f0) // ds2的keyby
            .window(TumblingEventTimeWindows.of(Time.seconds(10)))
            .apply(new JoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                /**
                 * 关联上的数据,调用join方法
                 * @param first  ds1的数据
                 * @param second ds2的数据
                 * @return
                 * @throws Exception
                 */
                @Override
                public String join(Tuple2<String, Integer> first, Tuple3<String, Integer, Integer> second) throws Exception {
                    return first + "<----->" + second;
                }
            });

        join.print();

        env.execute();
    }
}

其实仔细观察可以发现,窗口join的调用语法和我们熟悉的SQL中表的join非常相似:
SELECT * FROM table1 t1, table2 t2 WHERE t1.id = t2.id;
这句SQL中where子句的表达,等价于inner join … on,所以本身表示的是两张表基于id的“内连接”(inner join)。而Flink中的window join,同样类似于inner join。也就是说,最后处理输出的,只有两条流中数据按key配对成功的那些;如果某个窗口中一条流的数据没有任何另一条流的数据匹配,那么就不会调用JoinFunction的.join()方法,也就没有任何输出了。

8.2 间隔联结(Interval Join)

  • 间隔联结的原理

间隔联结具体的定义方式是,我们给定两个时间点,分别叫作间隔的“上界”(upperBound)和“下界”(lowerBound);于是对于一条流(不妨叫作A)中的任意一个数据元素a,就可以开辟一段时间间隔:[a.timestamp + lowerBound, a.timestamp + upperBound],即以a的时间戳为中心,下至下界点、上至上界点的一个闭区间:我们就把这段时间作为可以匹配另一条流数据的“窗口”范围。所以对于另一条流(不妨叫B)中的数据元素b,如果它的时间戳落在了这个区间范围内,a和b就可以成功配对,进而进行计算输出结果。所以匹配的条件为:
a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound
这里需要注意,做间隔联结的两条流A和B,也必须基于相同的key;下界lowerBound应该小于等于上界upperBound,两者都可正可负;间隔联结目前只支持事件时间语义。
如下图所示,我们可以清楚地看到间隔联结的方式:
image.png
下方的流A去间隔联结上方的流B,所以基于A的每个数据元素,都可以开辟一个间隔区间。我们这里设置下界为-2毫秒,上界为1毫秒。于是对于时间戳为2的A中元素,它的可匹配区间就是[0, 3],流B中有时间戳为0、1的两个元素落在这个范围内,所以就可以得到匹配数据对(2, 0)和(2, 1)。同样地,A中时间戳为3的元素,可匹配区间为[1, 4],B中只有时间戳为1的一个数据可以匹配,于是得到匹配数据对(3, 1)。

所以我们可以看到,间隔联结同样是一种内连接(inner join)。与窗口联结不同的是,interval join做匹配的时间段是基于流中数据的,所以并不确定;而且流B中的数据可以不只在一个区间内被匹配。

stream1
    .keyBy(<KeySelector>)
    .intervalJoin(stream2.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(left + "," + right);
        }
    });
  • 处理迟到数据
 //2. 调用 interval join
OutputTag<Tuple2<String, Integer>> ks1LateTag = new OutputTag<>("ks1-late", Types.TUPLE(Types.STRING, Types.INT));
OutputTag<Tuple3<String, Integer, Integer>> ks2LateTag = new OutputTag<>("ks2-late", Types.TUPLE(Types.STRING, Types.INT, Types.INT));

SingleOutputStreamOperator<String> process = ks1.intervalJoin(ks2)
        .between(Time.seconds(-2), Time.seconds(2))
        .sideOutputLeftLateData(ks1LateTag)  // 将 ks1的迟到数据,放入侧输出流
        .sideOutputRightLateData(ks2LateTag) // 将 ks2的迟到数据,放入侧输出流
        .process(
                new ProcessJoinFunction<Tuple2<String, Integer>, Tuple3<String, Integer, Integer>, String>() {
                    /**
                     * 两条流的数据匹配上,才会调用这个方法
                     * @param left  ks1的数据
                     * @param right ks2的数据
                     * @param ctx   上下文
                     * @param out   采集器
                     * @throws Exception
                     */
                    @Override
                    public void processElement(Tuple2<String, Integer> left, Tuple3<String, Integer, Integer> right, Context ctx, Collector<String> out) throws Exception {
                        // 进入这个方法,是关联上的数据
                        out.collect(left + "<------>" + right);
                    }
                });

process.print("主流");
process.getSideOutput(ks1LateTag).printToErr("ks1迟到数据");
process.getSideOutput(ks2LateTag).printToErr("ks2迟到数据");



env.execute();

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/523692.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

鲸鱼优化算法(Whale Optimization Algorithm)

注意&#xff1a;本文引用自专业人工智能社区Venus AI 更多AI知识请参考原站 &#xff08;[www.aideeplearning.cn]&#xff09; 算法背景 鲸鱼优化算法&#xff08;Whale Optimization Algorithm, WOA&#xff09;是一种模拟鲸鱼捕食行为的优化算法。想象一下&#xff0c;你…

一文学会Semaphore(信号量)

// 空出来椅子 semaphore.release(count); } } catch (Exception e){ } } }; t.setName("Thread --> " i); t.start(); } } 程序将一直执行下去&#xff0c;不会漏单&#xff0c;也不会出现椅子占用数量大于20的情况。 AQS基础 Semaphore是一种共享锁&#xf…

若依框架mysql 搜索中文等于不生效

背景&#xff0c;字段存储的是中文 不生效代码如下 <if test"constellation ! null and constellation ! ">AND u.constellation #{constellation}</if> 正确生效的代码如下 <if test"constellation ! null and constellation ! ">A…

kubernetes集群添加到jumpserver堡垒机里管理

第一步、在kubernetes集群中获取一个永久的token。 jumpserver堡垒机用api的来管理kubernetes&#xff0c;所以需要kubernetes的token&#xff0c;这个token还需要是一个永久的token&#xff0c;版本变更&#xff1a;Kubernetes 1.24基于安全方面的考虑&#xff08;特性门控Le…

C语言自定义类型变量——结构体(二)

前言&#xff1a;上一篇内容中我们概述了结构体的基本内容&#xff0c;包括结构体的定义&#xff0c;声明&#xff0c;创建和初始化&#xff0c;内存对齐等问题&#xff0c;本篇将进一步深入结构体相关内容&#xff0c;包括为什么会存在内存对齐&#xff0c;结构体如何传参&…

【JavaWeb】Day35.MySQL概述——数据库设计-DDL(二)

表操作 关于表结构的操作也是包含四个部分&#xff1a;创建表、查询表、修改表、删除表。 1.创建 语法 create table 表名( 字段1 字段1类型 [约束] [comment 字段1注释 ], 字段2 字段2类型 [约束] [comment 字段2注释 ], ...... 字段n 字段n类型 [约束] [comment …

Matlab应用层生成简述

基础软件层 目前接触到的几款控制器&#xff0c;其厂商并没有提供simulink的基础软件库一般为底层文件被封装为lib&#xff0c;留有供调用API接口虽然能根据API接口开发基础软件库&#xff0c;但耗费时间过长得不偿失 应用层 所以可以将应用层封装为一个子系统&#xff0c;其…

二. CUDA编程入门-双线性插值计算

目录 前言0. 简述1. 执行一下我们的第十个CUDA程序2. Bilinear interpolation3. 代码分析总结参考 前言 自动驾驶之心推出的 《CUDA与TensorRT部署实战课程》&#xff0c;链接。记录下个人学习笔记&#xff0c;仅供自己参考 Note&#xff1a;关于 CUDA 加速双线程插值的内容博主…

王权与自由国际服加速器推荐 一键下载王权与自由加速器分享

《王权与自由》&#xff0c;还记得第一天进游戏各个服务器爆满锁服的盛况&#xff0c;好不容易挤进去&#xff0c;地图上人山人海&#xff0c;NPC埋没在玩家的人海里&#xff0c;差点找不到。迎来了12月20日的大型更新后&#xff0c;再进入游戏&#xff0c;服务器每一个都空闲无…

【C语言】函数递归编程题

目录 题目一&#xff1a; 题目二&#xff1a; 题目三&#xff1a; 题目四&#xff1a; 总结 题目一&#xff1a; 题目&#xff1a;接受一个整型值&#xff08;无符号&#xff09;&#xff0c;按照顺序打印它的每一位。&#xff08;递归完成&#xff09; 列如&#xff1a; …

0104练习与思考题-算法基础-算法导论第三版

2.3-1 归并示意图 问题&#xff1a;使用图2-4作为模型&#xff0c;说明归并排序再数组 A ( 3 , 41 , 52 , 26 , 38 , 57 , 9 , 49 ) A(3,41,52,26,38,57,9,49) A(3,41,52,26,38,57,9,49)上的操作。图示&#xff1a; tips:&#xff1a;有不少在线算法可视化工具&#xff08;软…

C#基础:类,对象,类成员简介(第四节课)

本节内容&#xff1a; 类与对象的关系 什么时候叫“对象”&#xff0c;什么时候叫实例引用变量与实例的关系 类的三大成员 属性方法事件 类的静态成员与实例成员 关于“绑定” 1.什么是类&#xff1a;&#xff08;再详细一点&#xff09; 类是对现实世界事物进行抽象所…

网络基础三——初识IP协议

网络基础三 ​ 数据通过应用层、传输层将数据传输到了网络层&#xff1b; ​ 传输层协议&#xff0c;如&#xff1a;TCP协议提供可靠性策略或者高效性策略&#xff0c;UDP提供实时性策略&#xff0c;保证向下层交付的数据是符合要求的的&#xff1b;而网络层&#xff0c;如&a…

59 使用 uqrcodejs 生成二维码

前言 这是一个最近的一个来自于朋友的需求, 然后做了一个 基于 uqrcodejs 来生成 二维码的一个 demo package.json 中增加以依赖 "uqrcodejs": "^4.0.7", 测试用例 <template><div class"hello"><canvas id"qrcode&qu…

2023天梯赛

2023天梯赛 L1-089 最好的文档 分数 5 作者 陈越 单位 浙江大学 有一位软件工程师说过一句很有道理的话&#xff1a;“Good code is its own best documentation.”&#xff08;好代码本身就是最好的文档&#xff09;。本题就请你直接在屏幕上输出这句话。 输入格式&…

豆瓣9.7,这部Java神作第3版重磅上市!

Java 程序员们开年就有重磅好消息&#xff0c;《Effective Java 中文版&#xff08;原书第 3 版&#xff09;》要上市啦&#xff01; 该书的第1版出版于 2001 年&#xff0c;当时就在业界流传开来&#xff0c;受到广泛赞誉。时至今日&#xff0c;已热销近20年&#xff0c;本书…

使用GDAL进行简单的坐标系转换

使用GDAL进行简单的坐标系转换 使用python GDAL进行简单的坐标系转换&#xff0c;暂时不考虑不同基准坐标系转换的精度问题。 安装环境 使用UbuntuAnaconda python 环境 conda install gdal 定义坐标系 from osgeo import gdal from osgeo import osrsrs_wgs84 osr.Spati…

【边缘智能】00_边缘计算发展背景

本系列是个人学习《边缘就算基础知识入门》的笔记&#xff0c;仅为个人学习记录&#xff0c;欢迎交流&#xff0c;感谢批评指正 移动物联设备产生海量数据&#xff0c;数据密集型移动智能应用&#xff0c;计算密集、动态性高&#xff0c;实时性强 传统云计算架构 基于广域互联…

文件上传【1】

1.文件上传更改上传类型 上传文件时存在上传类型固定&#xff08;jpg、png、gif&#xff09;如果是前端确定&#xff08;弹窗&#xff0c;后端未出现请求确定是前端&#xff09;只需要在设置中禁用js代码或抓包更改文件后缀名就可以上传其他类型的文件&#xff08;亦可用于复制…

我关注的测试仪表厂商之Sifos,PoE测试

#最近看看行业各个厂商的网站&#xff0c;看看他们都在做什么# 先从Sifos开始&#xff0c;一直觉得这是家很特别的公司&#xff0c;在PoE测试这块是个无敌的存在。之前在上一家台资测试仪表公司的时候&#xff0c;也有推出过类似的基于产线验证的解决方案&#xff0c;最后因为…