flink多流操作(connect cogroup union broadcast)

flink多流操作

  • 1 分流操作
  • 2 connect连接操作
    • 2.1 connect 连接(DataStream,DataStream→ConnectedStreams)
    • 2.2 coMap(ConnectedStreams → DataStream)
    • 2.3 coFlatMap(ConnectedStreams → DataStream)
  • 3 union操作
    • 3.1 union 合并(DataStream * → DataStream)
  • 4 coGroup 协同分组
    • 4.1 coGroup 实现 left join操作
  • 5 join
  • 6 broadcast 广播
    • 6.1 API 介绍 , 核心要点

1 分流操作

SingleOutputStreamOperator<Student> mainStream = students.process(new ProcessFunction<Student, Student>() {
    @Override
    public void processElement(Student student, ProcessFunction<Student, Student>.Context ctx, Collector<Student> collector) throws Exception {
        if (student.getGender().equals("m")) {
            // 输出到测流
            ctx.output(maleOutputTag, student);
        } else if (student.getGender().equals("f")) {
            // 输出到测流
            ctx.output(femaleOutputTag, student.toString());
        } else {
            // 在主流中输出
            collector.collect(student);
        }
    }
});

SingleOutputStreamOperator<Student> side1 = mainStream.getSideOutput(maleOutputTag);
SingleOutputStreamOperator<String> side2 = mainStream.getSideOutput(femaleOutputTag);

2 connect连接操作

2.1 connect 连接(DataStream,DataStream→ConnectedStreams)

connect 翻译成中文意为连接,可以将两个数据类型一样也可以类型不一样 DataStream 连接成一个新 的 ConnectedStreams。需要注意的是,connect 方法与 union 方法不同,虽然调用 connect 方法将两个 流连接成一个新的 ConnectedStreams,但是里面的两个流依然是相互独立的,这个方法最大的好处是 可以让两个流共享 State 状态。

// 使用 fromElements 创建两个 DataStream
DataStreamSource<String> word = env.fromElements("a", "b", "c", "d");
DataStreamSource<Integer> num = env.fromElements(1, 3, 5, 7, 9);

// 将两个 DataStream 连接到一起
ConnectedStreams<String, Integer> connected = word.connect(num);

2.2 coMap(ConnectedStreams → DataStream)

对 ConnectedStreams 调用 map 方法时需要传入 CoMapFunction 函数;
该接口需要指定 3 个泛型:

  1. 第一个输入 DataStream 的数据类型
  2. 第二个输入 DataStream 的数据类型
  3. 返回结果的数据类型。
    该接口需要重写两个方法:
  4. map1 方法,是对第 1 个流进行 map 的处理逻辑。
  5. 2 map2 方法,是对 2 个流进行 map 的处理逻辑

这两个方法必须是相同的返回值类型。

//将两个 DataStream 连接到一起

ConnectedStreams<String, Integer> wordAndNum = word.connect(num);

// 对 ConnectedStreams 中两个流分别调用个不同逻辑的 map 方法
DataStream<String> result = wordAndNum.map(new CoMapFunction<String, Integer, String>() {
    @Override
    public String map1(String value) throws Exception {
        // 第一个 map 方法是将第一个流的字符变大写
        return value.toUpperCase();
    }

    @Override
    public String map2(Integer value) throws Exception {
        // 第二个 map 方法将是第二个流的数字乘以 10 并转成 String
        return String.valueOf(value * 10);
    }
});


2.3 coFlatMap(ConnectedStreams → DataStream)

对 ConnectedStreams 调用 flatMap 方法。调用 flatMap 方法,传入的 Function 是 CoFlatMapFunction;
这个接口要重写两个方法:

  1. flatMap1 方法,是对第 1 个流进行 flatMap 的处理逻辑;
  2. flatMap2 方法,是对 2 个流进行 flatMap 的处理逻辑;

这两个方法都必须返回是相同的类型。

// 使用 fromElements 创建两个 DataStream
DataStreamSource<String> word = env.fromElements("a b c", "d e f");
DataStreamSource<String> num = env.fromElements("1,2,3", "4,5,6");

// 将两个 DataStream 连接到一起
ConnectedStreams<String, String> connected = word.connect(num);

// 对 ConnectedStreams 中两个流分别调用个不同逻辑的 flatMap 方法
DataStream<String> result = connected.flatMap(new CoFlatMapFunction<String, String, String>() {
    @Override
    public void flatMap1(String value, Collector<String> out) throws Exception {
        String[] words = value.split(" ");
        for (String w : words) {
            out.collect(w);
        }
    }

    @Override
    public void flatMap2(String value, Collector<String> out) throws Exception {
        String[] nums = value.split(",");
        for (String n : nums) {
            out.collect(n);
        }
    }
});

3 union操作

3.1 union 合并(DataStream * → DataStream)

该方法可以将两个或者多个数据类型一致的 DataStream 合并成一个 DataStream。DataStream union(DataStream… streams)可以看出 DataStream 的 union 方法的参数为可变参数,即可以合并两 个或多个数据类型一致的 DataStream,connect 不要求两个流的类型一致,但union必须一致。

下面的例子是使用 fromElements 生成两个 DataStream,一个是基数的,一个是偶数的,然后将两个 DataStream 合并成一个 DataStream。

// 使用 fromElements 创建两个 DataStream
DataStreamSource<Integer> odd = env.fromElements(1, 3, 5, 7, 9);
DataStreamSource<Integer> even = env.fromElements(2, 4, 6, 8, 10);

// 将两个 DataStream 合并到一起
DataStream<Integer> result = odd.union(even);

4 coGroup 协同分组

coGroup 本质上是join 算子的底层算子;功能类似;可以用cogroup来实现join left join full join的功能。 代码结构如下:

DataStreamSource<String> stream1 = env.fromElements("1,aa,m,18", "2,bb,m,28", "3,cc,f,38");
DataStreamSource<String> stream2 = env.fromElements("1:aa:m:18", "2:bb:m:28", "3:cc:f:38");

DataStream<String> res = stream1
    .coGroup(stream2)
    .where(new KeySelector<String, String>() {
        @Override
        public String getKey(String value) throws Exception {
            return value;
        }
    })
    .equalTo(new KeySelector<String, String>() {
        @Override
        public String getKey(String value) throws Exception {
            return value;
        }
    })
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .apply(new CoGroupFunction<String, String, String>() {
        @Override
        public void coGroup(Iterable<String> first, Iterable<String> second, Collector<String> out) throws Exception {
            // 这里添加具体的 coGroup 处理逻辑
           // 这两个迭代器,是这5s的数据中的某一组,id = 1
        }
    });

4.1 coGroup 实现 left join操作

package batch;

import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;


public class coGrouptest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//        id name
        DataStreamSource<String> stream1 = env.socketTextStream("localhost", 9998);
//        id age
        DataStreamSource<String> stream2 = env.socketTextStream("localhost", 9999);
// nc -lp 9999
// nc -lp 9998
        SingleOutputStreamOperator<Tuple2<String, String>> s1 = stream1.map(s -> {
            String[] arr = s.split(",");
            return Tuple2.of(arr[0], arr[1]);
        }).returns(new TypeHint<Tuple2<String, String>>() {
        });

        SingleOutputStreamOperator<Tuple2<String, String>> s2 = stream2.map(s -> {
            String[] arr = s.split(",");
            return Tuple2.of(arr[0], arr[1]);
        }).returns(new TypeHint<Tuple2<String, String>>() {
        });


        DataStream<Tuple3<String, String, String>> out = s1.coGroup(s2).where(tp -> tp.f0)  //左的f0 id 字段
                .equalTo(tp -> tp.f0)  //又的f0 id 字段
                .window(TumblingProcessingTimeWindows.of(Time.seconds(2)))
                .apply(new CoGroupFunction<Tuple2<String, String>, Tuple2<String, String>, Tuple3<String, String, String>>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<String, String>> iterable, Iterable<Tuple2<String, String>> iterable1, Collector<Tuple3<String, String, String>> out) throws Exception {
                        for (Tuple2<String, String> t1 : iterable) {
                            boolean t2isnull = false;
                            for (Tuple2<String, String> t2 : iterable1) {
                                out.collect(new Tuple3<String, String, String>(t1.f0,t1.f1,t2.f1));
                                t2isnull = true;
                            }
                            if(!t2isnull){
                                out.collect(new Tuple3<String, String, String>(t1.f0,t1.f1,null));
                            }
                        }

                    }
                });
        out.print();

        env.execute();


}
}

5 join

用于关联两个流(类似于 sql 中 join),需要指定 join,需要在窗口中进行关联后的逻辑计算。
只能支持inner join 不支持 左右和全连接

stream.join(otherStream)
      .where(<KeySelector>)
      .equalTo(<KeySelector>)
      .window(<WindowAssigner>)
      .apply(<JoinFunction>);

实例:

SingleOutputStreamOperator<Student> s1;
SingleOutputStreamOperator<StuInfo> s2;

// join 两个流,此时并没有具体的计算逻辑
JoinedStreams<Student, StuInfo> joined = s1.join(s2);

// 对 join 流进行计算处理
DataStream<String> stream = joined
        // where 流 1 的某字段 equalTo 流 2 的某字段
        .where(s -> s.getId()).equalTo(s -> s.getId())
        // join 实质上只能在窗口中进行
        .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
        // 对窗口中满足关联条件的数据进行计算
        .apply(new JoinFunction<Student, StuInfo, String>() {
            // 这边传入的两个流的两条数据,是能够满足关联条件的
            @Override
            public String join(Student first, StuInfo second) throws Exception {
                // first: 左流数据 ; second: 右流数据
                // 计算逻辑
                // 返回结果
                return null;
            }
        });

// 对 join 流进行计算处理
joined.where(s -> s.getId()).equalTo(s -> s.getId())
        .window(TumblingProcessingTimeWindows.of(Time.seconds(20)))
        .apply(new FlatJoinFunction<Student, StuInfo, String>() {
            @Override
            public void join(Student first, StuInfo second, Collector<String> out) throws Exception {
                out.collect();
            }
        });

6 broadcast 广播

Broadcast State 是 Flink 1.5 引入的新特性。 在开发过程中,如果遇到需要下发/广播配置、规则等低吞吐事件流到下游所有 task 时,就可以使用Broadcast State 特性。下游的 task 接收这些配置、规则并保存为 BroadcastState, 将这些配置应用到 另一个数据流的计算中 。
在这里插入图片描述

6.1 API 介绍 , 核心要点

  • 将需要广播出去的流,调用 broadcast 方法进行广播转换,得到广播流 BroadCastStream
  • 然后在主流上调用 connect 算子,来连接广播流(以实现广播状态的共享处理)
  • 在连接流上调用 process 算子,就会在同一个 ProcessFunciton 中提供两个方法分别对两个流进行 处理,并在这个 ProcessFunction 内实现“广播状态”的共享
public class _16_BroadCast_Demo {
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger("rest.port", 8822);
        
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        env.setParallelism(1);
        
        // id,eventId
        DataStreamSource<String> stream1 = env.socketTextStream("localhost", 9998);
        SingleOutputStreamOperator<Tuple2<String, String>> s1 = stream1.map(s -> {
            String[] arr = s.split(",");
            return Tuple2.of(arr[0], arr[1]);
        }).returns(new TypeHint<Tuple2<String, String>>() { });
        
        // id,age,city
        DataStreamSource<String> stream2 = env.socketTextStream("localhost", 9999);
        SingleOutputStreamOperator<Tuple3<String, String, String>> s2 = stream2.map(s -> {
            String[] arr = s.split(",");
            return Tuple3.of(arr[0], arr[1], arr[2]);
        }).returns(new TypeHint<Tuple3<String, String, String>>() { });
        
        /**
         * 案例背景:
         * 流 1: 用户行为事件流(持续不断,同一个人也会反复出现,出现次数不定
         * 流 2: 用户维度信息(年龄,城市),同一个人的数据只会来一次,来的时间也不定 (作为广播流)
         * 需要加工流 1,把用户的维度信息填充好,利用广播流来实现
         */
        
        // 将字典数据所在流: s2 , 转成 广播流
        MapStateDescriptor<String, Tuple2<String, String>> userInfoStateDesc =
                new MapStateDescriptor<>("userInfoStateDesc", TypeInformation.of(String.class),
                        TypeInformation.of(new TypeHint<Tuple2<String, String>>() {}));
        BroadcastStream<Tuple3<String, String, String>> s2BroadcastStream = s2.broadcast(userInfoStateDesc);
        
        // 哪个流处理中需要用到广播状态数据,就要 去 连接 connect 这个广播流
        SingleOutputStreamOperator<String> connected = s1.connect(s2BroadcastStream)
                .process(new BroadcastProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, String>() {
                    /**BroadcastState<String, Tuple2<String, String>> broadcastState;*/
                    
                    /**
                     * 本方法,是用来处理 主流中的数据(每来一条,调用一次)
                     * @param element 左流(主流)中的一条数据
                     * @param ctx 上下文
                     * @param out 输出器
                     * @throws Exception
                     */
                    @Override
                    public void processElement(Tuple2<String, String> element,
                                               BroadcastProcessFunction<Tuple2<String, String>,
                                                       Tuple3<String, String, String>, String>.ReadOnlyContext ctx,
                                               Collector<String> out) throws Exception {
                        // 通过 ReadOnlyContext ctx 取到的广播状态对象,是一个 “只读 ” 的对象;
                        ReadOnlyBroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoStateDesc);
                        
                        if (broadcastState != null) {
                            Tuple2<String, String> userInfo = broadcastState.get(element.f0);
						out.collect(element.f0 + "," + element.f1 + "," + (userInfo == null ? null : userInfo.f0) + "," + (userInfo == null ? null : userInfo.f1));
						} else { out.collect(element.f0 + "," + element.f1 + "," + null + "," + null);
						 }
				 }
				 /**** 
				 * @param element 广播流中的一条数据 
				 * @param ctx 上下文 
				 * @param out 输出器 
				 * @throws Exception 
				 */ 
				@Override 
				public void processBroadcastElement(Tuple3<String, String, String> element, 
				                                    BroadcastProcessFunction<Tuple2<String, String>, Tuple3<String, String, String>, String>.Context ctx, 
				                                    Collector<String> out) throws Exception { 
				    // 从上下文中,获取广播状态对象(可读可写的状态对象) 
				    BroadcastState<String, Tuple2<String, String>> broadcastState = ctx.getBroadcastState(userInfoStateDesc); 
				    // 然后将获得的这条广播流数据,拆分后,装入广播状态 
				    broadcastState.put(element.f0, Tuple2.of(element.f1, element.f2)); 
				}
				
				resultStream.print(); 
				env.execute(); 
				}
				}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/397258.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

一文搞懂LDO !

7.LDO 1.原理 通过运放调节P-MOS的输出 低压差&#xff1a; 输出压降比较低&#xff0c;例如输入3.3V&#xff0c;输出可以达到3.2V。 线性&#xff1a; LDO内部的MOS管工作于线性状态。&#xff08;可变电阻区&#xff09; 稳压器&#xff1a; 说明了LDO的用途是用来给电…

LeetCode.590. N 叉树的后序遍历

题目 590. N 叉树的后序遍历 分析 我们之前有做过LeetCode的 145. 二叉树的后序遍历&#xff0c;其实对于 N 叉树来说和二叉树的思路是一模一样的。 二叉树的后序遍历是【左 右 根】 N叉树的后序遍历顺序是【孩子 根】&#xff0c;你可以把二叉树的【左 右 根】想象成【孩子…

Win11专业版安装集成了谷歌框架的安卓子系统,包含谷歌商店

1.摘要 上一篇博客讲述了使用微软商店安装安卓子系统的教程 https://blog.csdn.net/RudeTomatoes/article/details/135958882 上述方法的优点是安装过程简单&#xff0c;但是&#xff0c;由于Windows安卓子系统是微软与亚马逊联合开发&#xff0c;默认没有安装谷歌框架。我尝试…

[SwiftUI]自定义下划线

系统有一个下划线修饰&#xff0c;但最低只支持到iOS16。 extension View {available(iOS 16.0, macOS 13.0, tvOS 16.0, watchOS 9.0, *)public func underline(_ isActive: Bool true, pattern: Text.LineStyle.Pattern .solid, color: Color? nil) -> some View} 下…

金融帝国实验室(CapLab)官方更新_V9.1.62版本(2024年第10次)

〖金融帝国实验室〗&#xff08;Capitalism Lab&#xff09;游戏更新记录&#xff08;2024年度&#xff09; ————————————— ◎游戏开发&#xff1a;Enlight Software Ltd.&#xff08;微启软件有限公司&#xff09; ◎官方网站&#xff1a;https://www.capitalism…

IBM Spectrum LSF Process Manager 在共享分布式计算环境中运行和管理业务关键工作流程

IBM Spectrum LSF Process Manager 设计、记录和运行复杂的计算工作流 亮点 ● 快速创建复杂的分布式工作流 ● 开发可重复的最佳实践 ● 自信地运行关键工作流程 ● 提高流程可靠性 IBM Spectrum LSF Process Manager 使您能够设计和自动化计算或分析流程&#xff0c; 捕获…

http相关概念以及apache的功能

概念 互联网&#xff1a;是网络的网络&#xff0c;是所有类型网络的母集 因特网&#xff1a;世界上最大的互联网网络 万维网&#xff1a;www &#xff08;不是网络&#xff0c;而是数据库&#xff09;是网页与网页之间的跳转关系 URL:万维网使用统一资源定位符&#xff0c;…

Spring Cloud微服务网关Zuul灰度发布入门实战

一、灰度发布 灰度发布是指在系统迭代的时候一种平滑过度上线发布方式。灰度发布是在原有的系统的基础上面&#xff0c;额外增加一个新版本&#xff0c;这个新版本包含新上线的需要验证的功能&#xff0c;通过负载均衡引入部分流量到新版本的应用上&#xff0c;如果在这个过程…

Mysql数据库主从集群从库Slave因为RelayLog过多过大引起服务器硬盘爆满生产事故实战解决

Mysql数据库主从集群从库slave因为RelayLog过多过大引起从库服务器硬盘爆满生产事故实战解决 一、MySQL数据库主从集群概念 MySQL数据库主从集群是一种高可用性和读写分离的数据库架构&#xff0c;它基于MySQL的复制&#xff08;Replication&#xff09;技术来同步数据。在主…

力扣题目训练(17)

2024年2月10日力扣题目训练 2024年2月10日力扣题目训练551. 学生出勤记录 I557. 反转字符串中的单词 III559. N 叉树的最大深度241. 为运算表达式设计优先级260. 只出现一次的数字 III126. 单词接龙 II 2024年2月10日力扣题目训练 2024年2月10日第十七天编程训练&#xff0c;今…

无人机数据链技术,无人机数据链路系统技术详解,无人机数传技术

早期的无人机更多的为军事应用服务&#xff0c;如军事任务侦查等&#xff0c;随着技术和社会的发展&#xff0c;工业级无人机和民用无人机得到快速的发展&#xff0c;工业级无人机用于农业植保、地理测绘、电力巡检、救灾援助等&#xff1b;民用无人机用于航拍、物流等等领域。…

Codeforces Round 928 (Div. 4)(A、B、C、D、E、G)

文章目录 ABCDEG A 统计A、B输出 #include <bits/stdc.h> #define int long long #define rep(i,a,b) for(int i (a); i < (b); i) #define fep(i,a,b) for(int i (a); i > (b); --i) #define pii pair<int, int> #define ll long long #define db doubl…

springboot+flowable 使用方式

创建flowble制定流程图 登录flowalbe 制定流程图 进入建模器应用程序 创建流程图 分配用户 下载流程图 使用springboot 调用flowable /*** 导入流程图老师流程*/Testvoid startTeacherApprover(){Deployment deploy repositoryService.createDeployment().addClasspathRes…

2024,“热辣滚烫”的春节热点!

2024春节&#xff0c;都发生了哪些热点事件&#xff1f; 搜肠刮肚比较难&#xff0c;于是百度了一下&#xff0c;但结果难以令人满意&#xff0c;不同博主的眼中都有不同的热点。 这才想起&#xff0c;我们早已生活在自己的“信息茧房”中&#xff0c;每个人都有自己关注的热…

GZ036 区块链技术应用赛项赛题第9套

2023年全国职业院校技能大赛 高职组 “区块链技术应用” 赛项赛卷&#xff08;9卷&#xff09; 任 务 书 参赛队编号&#xff1a; 背景描述 随着异地务工人员的增多&#xff0c;房屋租赁成为一个广阔是市场&#xff1b;目前&#xff0c;现有技术中的房屋租赁是由…

天拓四方:如何通过工业智能网关进行设备数据采集,以及其带来的优势

工业智能网关是一种嵌入式设备&#xff0c;设计用于连接和管理各种工业设备和系统。它充当设备间的通信中介&#xff0c;实现数据采集、转换和传输。与传统的网关相比&#xff0c;工业智能网关具有更强的数据处理能力和更广泛的连接性&#xff0c;可以支持多种通信协议。在当今…

Unity MVC开发模式与开发流程详解

在Unity游戏开发中&#xff0c;采用MVC&#xff08;Model-View-Controller&#xff09;模式是一种非常常见的设计模式。MVC模式将应用程序分为三个部分&#xff1a;模型&#xff08;Model&#xff09;、视图&#xff08;View&#xff09;和控制器&#xff08;Controller&#x…

数论 - 容斥原理

文章目录 一、题目描述输入格式输出格式数据范围输入样例&#xff1a;输出样例&#xff1a; 二、算法思路三、代码 在计数时&#xff0c;必须注意没有重复&#xff0c;没有遗漏。为了使重叠部分不被重复计算&#xff0c;人们研究出一种新的计数方法&#xff0c;这种方法的基本思…

VSCODE使用Django

https://code.visualstudio.com/docs/python/tutorial-django#_use-a-template-to-render-a-page 通过模板渲染页面 HTML文件 实现步骤 1&#xff0c; 修改代码&#xff0c;hello的App名字增加到installed_apps表中。 2&#xff0c; hello子目录下&#xff0c;创建 .\templat…

【无刷电机学习】基础概念及原理入门介绍

目录 0 参考出处 1 定义 2 各种电机优势比较 2.1 有刷与无刷比较 2.2 交流与直流比较 2.3 内转子与外转子比较 2.4 低压BLDC的一些优点 3 基本原理 3.1 单相无刷电机 3.2 三相无刷电机 4 驱动方法 4.1 六步换相控制 4.2 正弦波控制 5 转子位置信息的获取 5…