Flink Flink中的分流

一、什么是分流

所谓“分流”,就是将一条数据流拆分成完全独立的两条、甚至多条流。也就是基于一个DataStream,定义一些筛选条件,将符合条件的数据拣选出来放到对应的流里。
在这里插入图片描述

二、基于filter算子的简单实现分流

其实根据条件筛选数据的需求,本身非常容易实现:只要针对同一条流多次独立调用.filter()方法进行筛选,就可以得到拆分之后的流了。
案例需求:读取一个整数数字流,将数据流划分为奇数流和偶数流。

package com.flink.DataStream.SplitStream;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkSplitStreamByFilter {
    public static void main(String[] args) throws Exception {
        //TODO 创建Flink上下文执行环境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment
                .createLocalEnvironmentWithWebUI(new Configuration().set(RestOptions.BIND_PORT, "8081"));
        //.getExecutionEnvironment();
        //TODO 设置全局并行度为2
        streamExecutionEnvironment.setParallelism(2);
        DataStreamSource<String> dataStreamSource = streamExecutionEnvironment.socketTextStream("localhost", 8888);
        //TODO 先将输入流转为Integer类型
        SingleOutputStreamOperator<Integer> mapResult = dataStreamSource.map((input) -> {
            int i = Integer.parseInt(input);
            return i;
        });
        //TODO 使用匿名函数分流偶数流
        SingleOutputStreamOperator<Integer> ds1 = mapResult.filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer a) throws Exception {
                return a % 2 == 0;
            }
        });
        //TODO 使用lamda表达式分流奇数流
        SingleOutputStreamOperator<Integer> ds2 = mapResult.filter((a) -> a % 2 == 1);
        ds1.print("偶数流");
        ds2.print("奇数流");
        streamExecutionEnvironment.execute();
    }
}

执行结果

奇数流:1> 1
偶数流:2> 2
偶数流:1> 2
偶数流:2> 4
奇数流:1> 3
奇数流:2> 1

Process finished with exit code 130 (interrupted by signal 2: SIGINT)

这种实现非常简单,但代码显得有些冗余——我们的处理逻辑对拆分出的三条流其实是一样的,却重复写了三次。而且这段代码背后的含义,是将原始数据流 stream 复制三份,然后对每一份分别做筛选;这明显是不够高效的。我们自然想到,能不能不用复制流,直接用一个算子就把它们都拆分开呢?

三、使用测输出流

关于处理函数中侧输出流的用法,我们已经在 7.5 节做了详细介绍。简单来说,只需要调用上下文 ctx 的.output()方法,就可以输出任意类型的数据了。而侧输出流的标记和提取,都离不开一个“输出标签”(OutputTag),指定了侧输出流的 id 和类型。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/187313.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

lvm 扩容根分区失败记录

lvm 扩容根分区失败记录 1、问题描述2、错误描述3、解决方法重启系统进入grub界面&#xff0c;选择kernel 2.x 启动系统。然后同样的resize2fs命令扩容成功。 1、问题描述 根分区不足。 系统有2个内核版本&#xff0c;一个是kernel 2.x&#xff0c;另一个是kernel 4.x。 这次l…

《微信小程序从入门到精通》---笔记1

小程序&#xff0c;我又来学习啦&#xff01;请多关照~ 项目驱动 小程序开发建议使用flex布局在小程序中&#xff0c;页面渲染和业务逻辑是分开的&#xff0c;分别运行在不同的线程中。Mini Program于2017年1月7号正式上线小程序的有点&#xff1a;跨平台、开发门槛低、开发周…

盘点60个Python爬虫源码Python爱好者不容错过

盘点60个Python爬虫源码Python爱好者不容错过 爬虫&#xff08;Spider&#xff09; 学习知识费力气&#xff0c;收集整理更不易。 知识付费甚欢喜&#xff0c;为咱码农谋福利。 链接&#xff1a;https://pan.baidu.com/s/1JWrDgl46_ammprQaJiKqaQ?pwd8888 提取码&#xff…

机器学习与因果推断的高级实践 | 数学建模

文章目录 因果推断因果推断的前世今生&#xff08;1&#xff09;潜在结果框架&#xff08;Potential Outcome Framework&#xff09;&#xff08;2&#xff09;结构因果模型&#xff08;Structual Causal Model&#xff0c;SCM&#xff09; 身处人工智能爆发式增长时代的机器学…

战地5无限序章(无法保存)的解决办法

启动游戏后&#xff0c;目录就会自动变成这样了&#xff0c;也不会无限循环了&#xff01;

Flash Attention:高效注意力机制的突破与应用

注意力机制彻底改变了自然语言处理和深度学习领域。它们允许模型在执行机器翻译、语言生成等任务时专注于输入数据的相关部分。 在这篇博客[1]中&#xff0c;我们将深入研究被称为“Flash Attention”的注意力机制的突破性进展。我们将探讨它是什么、它是如何工作的&#xff0c…

【matlab程序】matlab给风速添加图例大小

【matlab程序】matlab给风速添加图例大小 clear;clc;close all; % load 加载风速数据。 load(matlab.mat) % 加载颜色包信息 gray load(D:\matlab_work\函数名为colormore的颜色索引表制作\R_color_txt\R_color_single\gray89.txt); brown load(D:\matlab_work\函数名为color…

解决在Windows10或Windows11下无权限修改hosts文件

解决在Windows10或Windows11下无权限修改hosts文件&#xff0c;无法写入内容 1、首先在开始菜单中找到这个 2、接着输入&#xff1a; C:\Windows\System32\drivers\etc3、再次输入以下命令行&#xff1a;notepad hosts &#xff0c;并回车&#xff1a; notepad hosts 4、然后…

2023 年 认证杯 小美赛 国际大学生数学建模挑战赛 |数学建模完整代码+建模过程全解全析

当大家面临着复杂的数学建模问题时&#xff0c;你是否曾经感到茫然无措&#xff1f;作为2022年美国大学生数学建模比赛的O奖得主&#xff0c;我为大家提供了一套优秀的解题思路&#xff0c;让你轻松应对各种难题。 cs数模团队在认证杯 小美赛前为大家提供了许多资料的内容呀&am…

01_MySQL介绍及安装

#任务背景 一、真实案例 某公司现在有几套不同版本的MySQL数据库&#xff0c;现在大部分的生产和测试环境都已经切换到5.7版本&#xff0c;由于历史原因&#xff0c;有一套测试环境版本为MySQL-5.5。现为了将测试环境版本统一&#xff0c;需要将原来测试环境数据库MySQL-5.5版…

基于ThinkPHP8 + Vue3 + element-ui-plus + 微信小程序(原生) + Vant2 的 BBS论坛系统设计【PHP课设】

一、BBS论坛功能描述 我做的是一个论坛类的网页项目&#xff0c;每个用户可以登录注册查看并发布文章&#xff0c;以及对文章的点赞和评论&#xff0c;还有文件上传和个人签名发布和基础信息修改&#xff0c;管理员对网站的数据进行统计&#xff0c;对文章和文件的上传以及评论…

AtomicReference原子引用类-线程安全

简介与作用&#xff1a; AtomicReference是Java中的一个原子类&#xff0c;它的主要作用是提供了一种原子操作的方式来更新对象的引用。它通常用于多线程环境下&#xff0c;用来解决并发访问共享对象时可能出现的竞态条件问题。 &#xff08;实际开发中用于某个数据模型更新&a…

小程序姓名:ssm+vue基本微信小程序的个人健康管理系统

项目介绍 首先,论文一开始便是清楚的论述了小程序的研究内容。其次,剖析系统需求分析,弄明白“做什么”,分析包括业务分析和业务流程的分析以及用例分析,更进一步明确系统的需求。然后在明白了小程序的需求基础上需要进一步地设计系统,主要包罗软件架构模式、整体功能模块、数…

83基于matlab 的时钟时间识别GUI

基于matlab 的时钟时间识别GUI。图像去除背景-转化为二值化图像-找出对应的直线边缘-找到秒针、分针、时针对应的直线&#xff0c;并算出斜率、角度-判断时间&#xff0c;分针与时针 &#xff08;度数&#xff09;。数据可更换自己的&#xff0c;程序已调通&#xff0c;可直接运…

代码随想录算法训练营第30天|回溯总结 332. 重新安排行程

回溯是递归的副产品&#xff0c;只要有递归就会有回溯&#xff0c;所以回溯法也经常和二叉树遍历&#xff0c;深度优先搜索混在一起&#xff0c;因为这两种方式都是用了递归。 回溯法就是暴力搜索&#xff0c;并不是什么高效的算法&#xff0c;最多再剪枝一下。 回溯算法能解…

C语言—一维数组在内存中的存放

1、先看代码&#xff1a; #define _CRT_SECURE_NO_WARNINGS 1#include<stdio.h> int main() {int arr[]{1,2,3,4,5,6,7,8,9,10}; int szsizeof(arr)/sizeof(arr[0]);int i0;for(i0;i<sz;i){printf("&arr[%d] %p\n",i,&arr[i]);}return 0; } 2、定…

JAVA毕业设计112—基于Java+Springboot+Vue的宠物领养社区小程序(源码+数据库)

基于JavaSpringbootVue的宠物领养社区小程序(源码数据库)112 一、系统介绍 本系统前后端分离带小程序 小程序&#xff08;用户端&#xff09;&#xff0c;后台管理系统&#xff08;管理员&#xff09; 小程序&#xff1a; 登录、注册、宠物领养、发布寻宠、发布领养、宠物社…

单文件组件MVVM

单文件组件&MVVM 所谓组件化开发&#xff0c;就是创建一个个组件。 Vue是一个大类&#xff0c;渲染一切从new Vue开始。 指定视图&#xff1a;el template render:jsx语法 $mount[数学公式] 编译App.vue&#xff0c;作为视图入口 单个组件&#xff1a;结构 样式 data compu…

Vatee万腾的科技探险:vatee数字化力量的前瞻征途

在Vatee万腾的科技探险中&#xff0c;我们领略到了一场数字化力量的前瞻征途&#xff0c;这是一次引领未来的创新之旅。Vatee万腾以其独特的科技理念和数字化力量&#xff0c;开启了一次引领行业的前瞻性征途&#xff0c;为数字化未来描绘出了崭新的篇章。 Vatee万腾的数字化力…

IIC驱动OLED(SSD1306) HAL库+CubeMX

一.IIC传输数据的格式 1.写操作 2.读操作 3.IIC信号 二. IIC底层驱动 1.重新初始化配置延时单元 //软件延时 void I2C_Delay(uint32_t t) {volatile uint32_t tmp t;while(tmp--); }void I2C_GPIO_ReInit(void) {/* 1. 使用结构体定义硬件GPIO对象 */GPIO_InitTypeDef GPIO…