Flink双流Join

在离线 Hive 中,我们经常会使用 Join 进行多表关联。那么在实时中我们应该如何实现两条流的 Join 呢?Flink DataStream API 为我们提供了3个算子来实现双流 join,分别是:

  • join

  • coGroup

  • intervalJoin

下面我们分别详细看一下这3个算子是如何实现双流 Join 的。

1. Join

Joining | Apache Flink

Join 算子提供的语义为 “Window join”,即按照指定字段和(滚动/滑动/会话)窗口进行内连接(InnerJoin)。Join 将有相同 Key 并且位于同一窗口中的两条流的元素进行关联。

Join 可以支持处理时间和事件时间两种时间特征。

Join 通用用法如下:

stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>)

Join 语义类似与离线 Hive 的 InnnerJoin (内连接),这意味着如果一个流中的元素在另一个流中没有相对应的元素,则不会输出该元素。

下面我们看一下 Join 算子在不同类型窗口上的具体表现。

1.1 滚动窗口Join

当在滚动窗口上进行 Join 时,所有有相同 Key 并且位于同一滚动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 或 FlatJoinFunction 进行处理。

如上图所示,我们定义了一个大小为 2 秒的滚动窗口,最终产生 [0,1],[2,3],… 这种形式的数据。上图显示了每个窗口中橘色流和绿色流的所有元素成对组合。需要注意的是,在滚动窗口 [6,7] 中,由于绿色流中不存在要与橘色流中元素 6、7 相关联的元素,因此该窗口不会输出任何内容。

下面我们一起看一下如何实现上图所示的滚动窗口 Join:

:::color3 可以通过两个socket流,将数据合并为一个三元组,key,value1,value2

代码演示:

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Date;
public class _ShuangLiuJoinDemo {

    public static void main(String[] args) throws Exception {

        //1. env-准备环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        // 并行度不为1 ,效果很难出来,因为本地的并行度是16,只有16个并行度都触发才能看到效果
        env.setParallelism(1);

        //2. source-加载数据   key,0,2021-03-26 12:09:00
        DataStream<Tuple3<String, Integer, String>> greenStream = env.socketTextStream("localhost", 8888)
                .map(new MapFunction<String, Tuple3<String, Integer, String>>() {
                    @Override
                    public Tuple3<String, Integer, String> map(String line) throws Exception {
                        String[] arr = line.split(",");
                        System.out.println("绿色:"+ Arrays.toString(arr));
                        return Tuple3.of(arr[0], Integer.valueOf(arr[1]), arr[2]);
                    }
                })
                // 因为用到了EventTime 所以势必用到水印,否则报错
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
                                        Long timeStamp = 0L;

                                            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                        Date date = null;
                                        try {
                                            date = simpleDateFormat.parse(element.f2);
                                        } catch (ParseException e) {
                                            throw new RuntimeException(e);
                                        }
                                        timeStamp = date.getTime();
                                        System.out.println("绿色的时间:"+timeStamp);
                                        System.out.println(element.f0);
                                        return timeStamp;
                                    }
                                })
                );
                ;
        // 以后这个9999少用,因为kafka占用这个端口  key,0,2021-03-26 12:09:00
        DataStream<Tuple3<String, Integer, String>> orangeStream = env.socketTextStream("localhost", 7777)
                .map(new MapFunction<String, Tuple3<String,Integer,String>>() {
                    @Override
                    public Tuple3<String, Integer, String> map(String line) throws Exception {
                        String[] arr = line.split(",");
                        System.out.println("橘色:"+ Arrays.toString(arr));

                        return Tuple3.of(arr[0],Integer.valueOf(arr[1]),arr[2]);
                    }
                })
                // 因为用到了EventTime 所以势必用到水印,否则报错
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Tuple3<String, Integer, String>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner(new SerializableTimestampAssigner<Tuple3<String, Integer, String>>() {
                                    @Override
                                    public long extractTimestamp(Tuple3<String, Integer, String> element, long recordTimestamp) {
                                        Long timeStamp = 0L;

                                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                        Date date = null;
                                        try {
                                            date = simpleDateFormat.parse(element.f2);
                                        } catch (ParseException e) {
                                            throw new RuntimeException(e);
                                        }
                                        timeStamp = date.getTime();
                                        System.out.println("橘色的时间:"+timeStamp);

                                        return timeStamp;
                                    }
                                })
                );

        //3. transformation-数据处理转换
        DataStream resultStream = greenStream.join(orangeStream)
                .where(tup3 -> tup3.f0)
                .equalTo(tup3 -> tup3.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<Tuple3<String, Integer, String>, Tuple3<String, Integer, String>, Tuple3<String, Integer, Integer>>() {

                    @Override
                    public Tuple3<String, Integer, Integer> join(Tuple3<String, Integer, String> t1, Tuple3<String, Integer, String> t2) throws Exception {
                        System.out.println(t1.f2);
                        System.out.println(t2.f2);
                        return Tuple3.of(t1.f0, t1.f1, t2.f1);
                    }
                });

        //4. sink-数据输出
        resultStream.print();

        //5. execute-执行
        env.execute();
    }
}

总结非常重要:

1) 要想测试这个效果,需要将并行度设置为1

2)窗口中数据的打印是需要触发的,没有触发的数据,窗口内是不会进行计算的,所以记得输入触发的数据。

假如使用了EventTime 作为时间语义,不管是窗口开始和结束时间还是触发的条件,都跟系统时间没有关系,而跟输入的数据有关系,举例:

假如你的第一条数据是:key,0,2021-03-26 12:09:01 窗口的大小是5s,水印是3秒 ,窗口的开始时间为:

2021-03-26 12:09:00 结束时间是 2021-03-26 12:09:05 ,触发时间是2021-03-26 12:09:08

为什么呢? 水印时间 >= 结束时间

水印时间是:2021-03-26 12:09:08 - 3 = 2021-03-26 12:09:05 >=2021-03-26 12:09:05

:::

如上代码所示为绿色流和橘色流指定 BoundedOutOfOrdernessWatermarks Watermark 策略,设置100毫秒的最大可容忍的延迟时间,同时也会为流分配事件时间戳。假设输入流为 格式,两条流输入元素如下所示:

绿色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,4,2021-03-26 12:09:04
key,5,2021-03-26 12:09:05
key,8,2021-03-26 12:09:08
key,9,2021-03-26 12:09:09
key,11,2021-03-26 12:09:11
​
橘色流:
key,0,2021-03-26 12:09:00
key,1,2021-03-26 12:09:01
key,2,2021-03-26 12:09:02
key,3,2021-03-26 12:09:03
key,4,2021-03-26 12:09:04
key,6,2021-03-26 12:09:06
key,7,2021-03-26 12:09:07
key,11,2021-03-26 12:09:11
1.2 滑动窗口Join [解释一下即 ]

当在滑动窗口上进行 Join 时,所有有相同 Key 并且位于同一滑动窗口中的两条流的元素两两组合进行关联,并最终传递到 JoinFunction 进行处理。

如上图所示,我们定义了一个窗口大小为 2 秒、滑动步长为 1 秒的滑动窗口。需要注意的是,一个元素可能会落在不同的窗口中,因此会在不同窗口中发生关联,例如,绿色流中的0元素。当滑动窗口中一个流的元素在另一个流中没有相对应的元素,则不会输出该元素。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/926215.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

WEB攻防-通用漏洞XSS跨站绕过修复http_onlyCSP标签符号

修复&#xff1a; 1、过滤一些危险字符&#xff1b; 2、HTTP-only Cookie; 3、设置CSP&#xff08;Content Security Policy&#xff09;; 4、输入内容长度限制&#xff0c;转义等&#xff1b; XSS绕过-CTFSHOW-316到331 关卡绕过WP XSS修复-过滤函数&http_only&C…

python 生成tts语音

之前一直使用微软、或者国内大厂的接口&#xff0c;网页操作比较麻烦&#xff0c;最近发现一个python库可以完美解决&#xff0c;在这里分享给大家 在这里 GitHub - rany2/edge-tts: Use Microsoft Edges online text-to-speech service from Python WITHOUT needing Microsof…

嵌入式硬件实战提升篇(三)商用量产电源设计方案 三路电源输入设计 电源管理 多输入供电自动管理 DCDC降压

引言&#xff1a;本文你能实际的了解到实战量产产品中电源架构设计的要求和过程&#xff0c;并且从实际实践出发搞懂电源架构系统&#xff0c;你也可以模仿此架构抄板到你自己的项目&#xff0c;并结合硬件篇之前的项目以及理论形成正真的三路电源输入设计与开发板电源架构块供…

SAP SD学习笔记16 - 请求书的取消 - VF11

上一章讲了 返品处理流程中的 参照请求传票&#xff08;发票&#xff09;来生成返品传票。 SAP SD学习笔记15 - 返品处理流程2 - 参照请求传票&#xff08;发票&#xff09;来生成返品传票-CSDN博客 本章讲 请求传票的取消。 目录 1&#xff0c;请求书取消的概要 2&#xf…

CLIP-MMA: Multi-Modal Adapter for Vision-Language Models

当前的问题 CLIP-Adapter仅单独调整图像和文本嵌入&#xff0c;忽略了不同模态之间的交互作用。此外&#xff0c;适应性参数容易过拟合训练数据&#xff0c;导致新任务泛化能力的损失。 动机 图1所示。多模态适配器说明。 通过一种基于注意力的 Adapter &#xff0c;作者称之…

51单片机快速入门之中断的应用 2024/11/23 串口中断

51单片机快速入门之中断的应用 基本函数: void T0(void) interrupt 1 using 1 { 这里放入中断后需要做的操作 } void T0(void)&#xff1a; 这是一个函数声明&#xff0c;表明函数 T0 不接受任何参数&#xff0c;并且不返回任何值。 interrupt 1&#xff1a; 这是关键字和参…

【Spring】聊聊@EventListener注解原理

1.一个Demo出发 在平时的开发中&#xff0c;其实编写同步线程代码是比较容易的&#xff0c;但是如何将一些操作和另外一些操作进行解除耦合&#xff0c;而事件方式 是一种很好的解耦合方式&#xff0c;比如当一个用户注销一个APP之后&#xff0c;需要发送一些短信 让他引流回来…

【和春笋一起学C++】使用new创建动态数组

目录 1. 什么是动态数组 2. 怎么使用动态数组 1. 什么是动态数组 char name[20]; 上面这种方式创建的数组在程序编译时将为它分配内存空间&#xff0c;不管程序最终是否使用数组&#xff0c;数组都在那里&#xff0c;它占用了内存空间。在编译时给数组分配内存被称为静态联编…

2-2-18-9 QNX系统架构之文件系统(一)

阅读前言 本文以QNX系统官方的文档英文原版资料为参考&#xff0c;翻译和逐句校对后&#xff0c;对QNX操作系统的相关概念进行了深度整理&#xff0c;旨在帮助想要了解QNX的读者及开发者可以快速阅读&#xff0c;而不必查看晦涩难懂的英文原文&#xff0c;这些文章将会作为一个…

ECharts柱状图-极坐标系下的堆叠柱状图,附视频讲解与代码下载

引言&#xff1a; 在数据可视化的世界里&#xff0c;ECharts凭借其丰富的图表类型和强大的配置能力&#xff0c;成为了众多开发者的首选。今天&#xff0c;我将带大家一起实现一个柱状图图表&#xff0c;通过该图表我们可以直观地展示和分析数据。此外&#xff0c;我还将提供…

Android复习简答题

一、基础入门 Android程序架构 &#xff08;1&#xff09;app:用于存放程序的代码和资源等内容。包含很多子目录 libs:存放第三方jar包 src/androidTest&#xff1a;存放调试的代码文件 src/main/androidMainfest.xml 整个程序的配置文件&#xff0c;可配置程序所需要的权…

PaddleOCR:一款高性能的OCR工具介绍

一、引言 随着人工智能技术的不断发展&#xff0c;光学字符识别&#xff08;OCR&#xff09;技术在各行各业得到了广泛应用。OCR技术能够将图片、扫描件等非结构化数据中的文字信息提取出来&#xff0c;转换为可编辑的文本格式。在我国&#xff0c;百度开源了一款优秀的OCR工具…

HTML5好看的音乐播放器多种风格(附源码)

文章目录 1.设计来源1.1 音乐播放器风格1效果1.2 音乐播放器风格2效果1.3 音乐播放器风格3效果1.4 音乐播放器风格4效果1.5 音乐播放器风格5效果 2.效果和源码2.1 动态效果2.2 源代码 源码下载万套模板&#xff0c;程序开发&#xff0c;在线开发&#xff0c;在线沟通 作者&…

11、数组

1、数组概念 数组就是存储多个相同数据类型的数据。 比如&#xff1a;存储26个字母&#xff0c;存储一个班级的学生成绩。 2、数组使用 数组要遵循先定义再使用 2.1、数组定义的格式 存储数据---空间 ---- 数据类型 多少个 --- 数据个数 >> 数据类型 数…

C底层 函数栈帧

文章目录 一&#xff0c;什么是寄存器 二&#xff0c;栈和帧 前言 我们在学习c语言程序的时候&#xff0c;是不是有很多的疑问&#xff0c;如 1&#xff0c;为什么形参不可以改变实参 2&#xff0c;为什么我们编写程序的时候会出现烫烫烫......这个乱码 3&#xff0c;那些局…

MATLAB期末复习笔记(二)

三、MATLAB函数和程序结构 1.MATLAB文件 两种类型的M文件&#xff1a; • 脚本 &#xff0c;不接受输入参数或返回输出参数。它们处理工作区中的数据。 • 函数 &#xff0c;可接受输入参数&#xff0c;并返回输出参数。内部变量是函数的局部变量。 ① 函数文件是另一类 m 文…

redis的应用----缓存

redis的应用----缓存 一、缓存的概念二、使用redis作为缓存2.1使用redis作为缓存的原因2.2缓存机制的访问步骤 三、缓存的更新策略3.1定期更新3.2实时更新3.3淘汰策略 四、缓存常见的问题4.1缓存预热(Cache preheating)4.2缓存穿透(Cache penetration)4.3缓存雪崩(Cache avalan…

2025年Java面试八股文大全

很多人会问Java面试八股文有必要背吗&#xff1f; 我的回答是&#xff1a;很有必要。你可以讨厌这模式&#xff0c;但你一定要去背&#xff0c;因为不背你就进不了大厂。 国内的互联网面试&#xff0c;恐怕是现存的、最接近科举考试的制度。 而且&#xff0c;我国的八股文确…

JiaJia-CP-1,2,3的WP(2)

一.JiaJia-CP-2 一看题目&#xff0c;聊天软件&#xff0c;用的什么聊天软件直接userassist看运行过什么程序 vol -f JiaJia_Co.raw --profileWin7SP1x64 userassist 发现Telegram.exe(小飞机) 可能性很大啊(真是个摸鱼大神) 除此之外&#xff0c;filescan也能看到&#xff0…

小F的矩阵值调整

问题描述 小F得到了一个矩阵。如果矩阵中某一个格子的值是偶数&#xff0c;则该值变为它的三倍&#xff1b;如果是奇数&#xff0c;则保持不变。小F想知道调整后的矩阵是什么样子的。 测试样例 样例1&#xff1a; 输入&#xff1a;a [[1, 2, 3], [4, 5, 6]] 输出&#xff1a…