Flink多流处理之Broadcast(广播变量)

写过Spark批处理的应该都知道,有一个广播变量broadcast这样的一个算子,可以优化我们计算的过程,有效的提高效率;同样在Flink中也有broadcast,简单来说和Spark中的类似,但是有所区别,首先Spark中的broadcast是静态的数据,而Flink中的broadcast是动态的,也就是源源不断的数据流.在Flink中会将广播的数据存到state中.
在这里插入图片描述
在Flink中主流数据可以获取state中的所有状态数据,使用过window的应该都清楚,当两个streamData中的数据到达窗口的时间刚好错过时就会发生关联不上的情况,如window2S,sreamData1到达窗口的时间刚好卡在这个2S窗口的尾端,而streamData到达窗口时,这个窗口已经结束了,这种情况就算这两条数据有相同id也无法进行关联了.
但是broadcast会将到达的数据都存储在state中,这样主流到达的每一条数据都可以和state中的广播流数据进行关联比较.
在这里插入图片描述
流程图内容可能不够准确,只是为了看起来方便理解.

  • 数据源
    # 主流数据
    ➜  ~ nc -lk 1234
    101,浏览商品,2023-08-02
    102,浏览商品,2023-08-02
    103,查看商品价格,2023-08-04
    101,商品加入购物车,2023-08-03
    101,从购物车删除商品,2023-08-03
    102,下单,2023-08-02
    102,申请延期发货,2023-08-03
    103,点击商品详情页,2023-08-04
    104,点击收藏,2023-08-05
    104,下单,2023-08-05
    104,付款,2023-08-06
    105,浏览商品,2023-08-07
    106,浏览商品,2023-08-07
    106,加入购物车,2023-08-08
    107,浏览商品,2023-08-10
    
    # 广播流数据
    ➜  ~ nc -lk 5678
    101,小明
    102,张丽
    103,公孙飞天
    104,王二虎
    106,李四
    108,赵屋面
    
  • 代码
    import org.apache.flink.api.common.state.BroadcastState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.tuple.Tuple3;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
    import org.apache.flink.util.Collector;
    
    /**
     * @Author: J
     * @Version: 1.0
     * @CreateTime: 2023/8/11
     * @Description: 多流操作-广播流
     **/
    public class FlinkBroadcast {
        public static void main(String[] args) throws Exception {
            // 构建流环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置并行度
            env.setParallelism(3);
            // 数据集源1作为主流数据(用户行为日志[id,behavior,date])
            DataStreamSource<String> sourceStream1 = env.socketTextStream("localhost", 1234);
            // 将字符串切割处理
            SingleOutputStreamOperator<Tuple3<String, String, String>> mainSourceStream = sourceStream1.map(str -> Tuple3.of(str.split(",")[0], str.split(",")[1], str.split(",")[2])).returns(new TypeHint<Tuple3<String, String, String>>() {
            });
            // 数据源2作为广播流数据(用户信息(id,name))
            DataStreamSource<String> sourceStream2 = env.socketTextStream("localhost", 5678);
            // 将字符串切割处理
            SingleOutputStreamOperator<Tuple2<String, String>> mapStream2 = sourceStream2.map(str -> Tuple2.of(str.split(",")[0], str.split(",")[1])).returns(new TypeHint<Tuple2<String, String>>() {
            });
            // 将广播流数据源进行广播
            /**
             *参数说明
             * 这里需要我们传入一个MapStateDescriptor,其实就是一个Map结构的数据<k,v>
             * <String, Tuple2<String, String>>,第一个String类型就是广播流和主流连接的字段,在这个代码中就是id,由实际业务决定
             * <String, Tuple2<String, String>>,第二个Tuple2<String, String>就是实际广播数据流的数据,由实际业务决定
             * "userInfo"就是给一个名字,这个自定义无强制要求
             **/
            // 先构建一个状态,后面也会使用
            MapStateDescriptor<String, Tuple2<String, String>> userInfoState = new MapStateDescriptor<>("userInfo", TypeInformation.of(String.class), TypeInformation.of(new TypeHint<Tuple2<String, String>>() {
            }));
            BroadcastStream<Tuple2<String, String>> userInfoBroadStream = mapStream2.broadcast(userInfoState);
    
            // 将主流数据和广播流数据使用connect连接
            /**
             * 我们将数据转变成广播流之后,在Flink中也不知哪个数据流需要使用这个广播流(userInfoBroadStream),
             * 这个时候就需要我们自己将主流数据和该广播流数据进行连接
             **/
            BroadcastConnectedStream<Tuple3<String, String, String>, Tuple2<String, String>> connectedStream = mainSourceStream.connect(userInfoBroadStream);
    
            /**
             * 在process()中有两类函数供我们选择,KeyedBroadcastProcessFunction和BroadcastProcessFunction,
             * 这里要注意当"connectedStream"是KeyedStream时选择KeyedBroadcastProcessFunction
             * 当"connectedStream"不是KeyedStream时选择BroadcastProcessFunction就可以.
             * 使用keyBy算子返回的就是KeyedStream
             **/
            SingleOutputStreamOperator<String> resultStream = connectedStream.process(new BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>() {
    
                // 这个方法写主流数据处理逻辑
                @Override
                public void processElement(Tuple3<String, String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.ReadOnlyContext ctx, Collector<String> out) throws Exception {
                    /**
                     * 要注意,这里我们最好从ReadOnlyContext来获取广播状态数据,因为获取只读的状态数据可以保证数据的安全性,
                     * 如果是通过成员变量的方式获取可修改的状态数据,就会存在数据不安全的问题,如在代码逻辑中出现了对状态数据
                     * 修改的代码,那么共享此状态的并行算子可能看到的状态数据不一致,就会导致数据错误或者代码报错.
                     * 而使用ReadOnlyContext就可以保证processElement这个方法中我们只对状态数据进行读取.
                     **/
                    ReadOnlyBroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);
                    if (broadcastState != null) {
                        // 通过主流中的ID作为key获取广播变量中的用户信息
                        Tuple2<String, String> userInfo = broadcastState.get(value.f0);
                        // 输出数据的形式(id,behavior,date,name)
                        if (userInfo == null) {
                            out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");
                        } else {
                            out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + userInfo.f1);
                        }
                    } else {
                        out.collect(value.f0 + "," + value.f1 + "," + value.f2 + "," + "NULL");
                    }
    
                }
    
                // 这个方法写广播流数据处理逻辑
                @Override
                public void processBroadcastElement(Tuple2<String, String> value, BroadcastProcessFunction<Tuple3<String, String, String>, Tuple2<String, String>, String>.Context ctx, Collector<String> out) throws Exception {
                    // 使用Context获取状态
                    BroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoState);
    
                    // 将数据存入到状态中
                    broadcastState.put(value.f0, value);
                }
            });
            // 打印结果
            resultStream.print();
    
            env.execute("Flink broadcast");
        }
    }
    
  • 结果
    3> 101,浏览商品,2023-08-02,小明
    3> 101,商品加入购物车,2023-08-03,小明
    3> 102,申请延期发货,2023-08-03,张丽
    3> 104,下单,2023-08-05,王二虎
    3> 106,浏览商品,2023-08-07,李四
    1> 102,浏览商品,2023-08-02,张丽
    1> 101,从购物车删除商品,2023-08-03,小明
    1> 103,点击商品详情页,2023-08-04,公孙飞天
    1> 104,付款,2023-08-06,王二虎
    1> 106,加入购物车,2023-08-08,李四
    2> 103,查看商品价格,2023-08-04,公孙飞天
    2> 102,下单,2023-08-02,张丽
    2> 104,点击收藏,2023-08-05,王二虎
    2> 105,浏览商品,2023-08-07,NULL
    2> 107,浏览商品,2023-08-10,NULL
    
    代码内容就不进行详细解释了,注释基本都写清楚了,如有疑问可评论提问,共同探讨.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/70176.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

笔记本电脑如何把sd卡数据恢复

在使用笔记本电脑过程中&#xff0c;如果不小心将SD卡里面的重要数据弄丢怎么办呢&#xff1f;别着急&#xff0c;本文将向您介绍SD卡数据丢失常见原因和恢复方法。 ▌一、SD卡数据丢失常见原因 - 意外删除&#xff1a;误操作或不小心将文件或文件夹删除。 - 误格式化&#…

【资讯速递】AI与人类思维的融合;OpenAI在中国申请注册“GPT-5”商标;移动大模型主要面向to B 智能算力是未来方向

2023年8月11日 星期五 癸卯年六月廿五 第000001号 欢迎来到爱书不爱输的程序猿的博客, 本博客致力于知识分享&#xff0c;与更多的人进行学习交流 本文收录于IT资讯速递专栏,本专栏主要用于发布各种IT资讯&#xff0c;为大家可以省时省力的就能阅读和了解到行业的一些新资讯 资…

C++初阶之一篇文章教会你list(理解和使用)

list&#xff08;理解和使用&#xff09; 什么是list特点和优势基本操作示例用法与其他序列式容器&#xff08;如 std::vector 和 std::deque&#xff09;相比&#xff0c;std::list 显著的区别和优势成员类型 list构造函数1. default (1)2. fill (2)3.range (3)4. copy (4) li…

ubuntu20.04磁盘满了 /dev/mapper/ubuntu--vg-ubuntu--lv 占用 100%

问题 执行 mysql 大文件导入任务&#xff0c;最后快完成了&#xff0c;查看结果发现错了&#xff01;悲催&#xff01;都执行了 两天了 The table ‘XXXXXX’ is full &#xff1f; 磁盘满了&#xff1f; 刚好之前另一个 centos 服务器上也出现过磁盘满了&#xff0c;因此&a…

什么是Selenium?使用Selenium进行自动化测试

什么是 Selenium&#xff1f;   Selenium 是一种开源工具&#xff0c;用于在 Web 浏览器上执行自动化测试&#xff08;使用任何 Web 浏览器进行 Web 应用程序测试&#xff09;。   等等&#xff0c;先别激动&#xff0c;让我再次重申一下&#xff0c;Selenium 仅可以测试We…

大连交通大学813软件工程考研习题

1.什么是软件生存周期模型?有哪些主要模型? 生存周期模型&#xff1a;描述软件开发过程中各种活动如何执行的模型。对软件开发提供强有力的支持&#xff0c;为开发过程中的活动提供统一的政策保证&#xff0c;为参与开发的人员提供帮助和指导&#xff0c;是软件生存周期模型…

云计算——常见存储类型

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​ 目录 前言 一.存储类型 1.本地磁盘 2.DAS 3.NAS 4.SAN &#xff08;1&#xff09;FC SA…

锁定Mac的内置键盘,防止外接键盘时的误触

场景&#xff1a;把你的外接键盘放在mac上&#xff0c;然后打字时&#xff0c;发现外接键盘误触mac键盘&#xff0c;导致使用体验极差 解决方案&#xff1a;下载Karabiner-Elements这款软件&#xff0c;并给它开启相关权限。 地址&#xff1a;https://github.com/pqrs-org/Ka…

ModaHub魔搭社区——Milvus 、Qdrant、Waeviate、Pinecone、ElasticSearch矢量数据库对比

资本市场上,2022年也是风起云涌的一年的,各大向量数据库公司纷纷完成了千万美元级别新一轮的融资。可以预见,2023年将会是向量数据库继续快速发展的一年,也会是这一新兴技术由发展走向成熟的一年。这里针对Milvus 、Qdrant、Waeviate、Pinecone、ElasticSearch这五个流行的…

编写简单的.gitlab-ci.yml打包部署项目

服务器说明&#xff1a; 192.168.192.120&#xff1a;项目服务器 192.168.192.121&#xff1a;GitLab 为了可以使用gitlab的cicd功能&#xff0c;我们需要先安装GitLab Runner 安装GitLab Runner参考&#xff1a; GitLab实现CICD自动化部署_gitlab cidi_程序员xiaoQ的博客-CS…

HCIA---TCP/UDP协议

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 目录 前言 一.UDP协议简介 UDP协议的特点&#xff1a; 二.TCP协议简介 TCP协议特点 三.TCP和UDP的区别 四.TCP/IP结构详解 五.TCP运输连接的阶段 ​编辑 …

43..利用fsolve函数解对应lambda下的方程组(matlab程序)

1.简述 fsolve的基本用法 : x fsolve(fun,x0) 其中fun应为函数句柄&#xff0c;x0为搜索的种子&#xff0c;即预估的fun0的解的大致位置。 函数句柄的定义方式主要有两种&#xff1a; 1.定义函数文件&#xff0c;使用操作符 定义function文件root2d.m, 如下&#xff1a; …

怎么把图片表格转换成word表格?几个步骤达成

在处理文档时&#xff0c;图片表格的转换是一个常见的需求。而手动输入表格是非常耗时的&#xff0c;因此&#xff0c;使用文本识别软件来自动转换图片表格可以大大提高工作效率。在本文中&#xff0c;我们将介绍如何使用OCR文字识别技术来将图片表格转换为Word表格。 OCR文字识…

yolov2检测网数据集标注_labelme使用_json2txt格式转换

yolov2检测网数据集标注_labelme使用_json2txt格式转换 一、安装Anaconda二、创建labelme虚拟环境三、使用labelme标注健康非健康猫狗数据3.1 打开数据集所在文件夹3.2 进行标注数据集3.3 json2txt3.4 按文件目录和训练测试数据集重分配 四、数据喂给服务器网络参考链接 一、安…

Paddle OCR V4 测试Demo

效果 项目 VS2022.net4.8OCRV4 代码 using OpenCvSharp; using Sdcb.PaddleInference; using Sdcb.PaddleOCR; using Sdcb.PaddleOCR.Models; using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Drawing; usin…

[保研/考研机试] KY135 又一版 A+B 浙江大学复试上机题 C++实现

题目链接&#xff1a; KY135 又一版 AB https://www.nowcoder.com/share/jump/437195121691736185698 描述 输入两个不超过整型定义的非负10进制整数A和B(<231-1)&#xff0c;输出AB的m (1 < m <10)进制数。 输入描述&#xff1a; 输入格式&#xff1a;测试输入包…

C语言的简单基础知识

C语言的基础知识包括变量、数据类型、运算符、控制流语句、函数等。下面会对每个部分进行详细解释&#xff0c;并给出相应的案例。 变量和数据类型&#xff1a; 变量&#xff1a;C语言中的变量用于存储数据&#xff0c;并且需要先声明后使用。声明变量时需要指定其数据类型。例…

包管理工具详解npm 、 yarn 、 cnpm 、 npx 、 pnpm(2023)

1、包管理工具npm &#xff08;1&#xff09;包管理工具npm&#xff1a; Node Package Manager&#xff0c;也就是Node包管理器&#xff1b;但是目前已经不仅仅是Node包管理器了&#xff0c;在前端项目中我们也在使用它来管理依赖的包&#xff1b;比如vue、vue-router、vuex、…

怎么用PS的魔术棒抠图?PS魔术棒抠图的操作方法

使用PS的魔术棒抠图教程&#xff1a; 1、首先&#xff0c;在ps界面上方点击“文件”选项&#xff0c;再在其弹出的选项栏中选择“打开”选项。然后&#xff0c;打开你所需要的图片。 2、然后&#xff0c;单击左侧的“魔术棒”工具。 3、然后&#xff0c;用鼠标点击图片的背景&…

根据渲染数据长度动态渲染后缀图标

在动态获取数据时&#xff0c;想要渲染后面的图标是根据数据的长度渲染图标位置&#xff0c;效果如下&#xff1a; 代码如下&#xff1a; <el-row :gutter"60"><el-col :span"24"><el-form-item><el-input v-model.trim"form…