大数据-玩转数据-Flink-Transform

一、Transform

转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑.
在这里插入图片描述

二、基本转换算子

2.1、map(映射)

将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素

package com.lyh.flink05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;

public class Transform_map {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Integer> s_num = svn.fromElements(1, 2, 3, 4, 5);
        s_num.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer values) throws Exception {
                return values*values;
            }
        }).print();

    }
}

2.2、filter(过滤)

根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃

package com.lyh.flink05;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;

public class Transform_filter {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Integer> elements = svn.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
//        elements.filter(new FilterFunction<Integer>() {
//            @Override
//            public boolean filter(Integer integer) throws Exception {
//                if (integer % 2 == 0 )
//                        return false;
//                        else {
//                            return true;
//                }
//            }
//        }).print();
        elements.filter(value -> value % 2 != 0).print();
    }
}

2.3、flatMap(扁平映射)

消费一个元素并产生零个或多个元素

package com.lyh.flink05;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;

import java.util.concurrent.ExecutionException;

public class flatMap {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Integer> dataSource = svn.fromElements(1, 2, 3);
        dataSource.flatMap(new FlatMapFunction<Integer, Integer>() {
            @Override
            public void flatMap(Integer integer, Collector<Integer> collector) throws Exception {
                collector.collect(integer*integer);
                collector.collect(integer*integer*integer);
            }
        }).print();
    }
}

三、聚合算子

3.1、keyBy(按键分区)

把流中的数据分到不同的分区(并行度)中.具有相同key的元素会分到同一个分区中

package com.lyh.flink05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class keyBy_s {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以数组的第一个元素作为key
                .map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1)
                .print();

        env.execute("execute");
    }
}

3.2、sum,min,max,minBy,maxBy(简单聚合)

KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream

package com.lyh.flink05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class keyBy_s {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以数组的第一个元素作为key
                .sum(1)
                .print();

        env.execute("execute");
    }
}

3.3、reduce(归约聚合)

一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
在这里插入图片描述

package com.lyh.flink05;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class keyByReduce {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.fromElements(Tuple2.of(1L,2L),Tuple2.of(2L,4L),Tuple2.of(2L,9L),Tuple2.of(1L,9L),Tuple2.of(1L,2L),Tuple2.of(2L,3L))
                .keyBy(0)
                .reduce(new ReduceFunction<Tuple2<Long, Long>>() {
                    @Override
                    public Tuple2<Long, Long> reduce(Tuple2<Long, Long> values1, Tuple2<Long, Long> values2) throws Exception {
                        return new Tuple2<>(values1.f0,values1.f1+values2.f1);
                    }
                })
                .print();
        env.execute();
    }
}

3.4、process(底层处理)

process算子在Flink算是一个比较底层的算子, 很多类型的流上都可以调用, 可以从流中获取更多的信息(不仅仅数据本身),process 用法比较灵活,后面再做专门研究。

package com.lyh.flink05;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class Process_s {

    public static void main(String[] args) throws Exception {

        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);
        SingleOutputStreamOperator<Integer> processed = streamSource.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
                if (value % 3 == 0) {
                    //测流数据
                    ctx.output(new OutputTag<Integer>("3%0", TypeInformation.of(Integer.class)), value);
                }
                if (value % 3 == 1) {
                    //测流数据
                    ctx.output(new OutputTag<Integer>("3%1", TypeInformation.of(Integer.class)), value);
                }
                //主流 ,数据
                out.collect(value);
            }
        });

        DataStream<Integer> output0 = processed.getSideOutput(new OutputTag<>("3%0",TypeInformation.of(Integer.class)));
        DataStream<Integer> output1 = processed.getSideOutput(new OutputTag<>("3%1",TypeInformation.of(Integer.class)));
        output1.print();

        env.execute();
    }
}

四、合流算子

4.1、connect(连接)

在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。
Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

package com.lyh.flink05;

import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.common.protocol.types.Field;

public class connect_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3, 4, 5);
        DataStreamSource<String> data2 = env.fromElements("a", "b", "c");
        ConnectedStreams<Integer, String> data3 = data1.connect(data2);
        data3.getFirstInput().print("data1");
        data3.getSecondInput().print("data2");
        env.execute();
    }
}

4.2、union(合并)

package com.lyh.flink05;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class union_s {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> data1 = env.fromElements(1, 2, 3);
        DataStreamSource<Integer> data2 = env.fromElements(555,666);
        DataStreamSource<Integer> data3 = env.fromElements(999);
        DataStream<Integer> data = data1.union(data2).union(data3);
        data.print();
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/70094.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

阻塞队列_线程安全版本_生产消费者模型

前言 在前面PriorityQueue优先级队列_Y君的进化史的博客-CSDN博客&#xff0c;我们学习了优先级队列&#xff0c;但是发现&#xff0c;当一个线程将优先级队列使用完之后&#xff0c;会自动退出程序&#xff0c;如果此时我们想使其一直等待到下一个任务的录入&#xff0c;就需…

面试热题(LRU缓存)

请你设计并实现一个满足 LRU (最近最少使用) 缓存 约束的数据结构。 实现 LRUCache 类&#xff1a; LRUCache(int capacity) 以 正整数 作为容量 capacity 初始化 LRU 缓存int get(int key) 如果关键字 key 存在于缓存中&#xff0c;则返回关键字的值&#xff0c;否则返回 -1 …

信息论基础知识

注意&#xff1a;本文只针对离散随机变量做出探讨&#xff0c;连续随机变量的情况不适用于本文探讨的内容&#xff01; &#xff08;一&#xff09;自信息 1. 自信息 I ( x ) − l o g n P ( x ) \color{blue}I(x) - log_{n}{P(x)} I(x)−logn​P(x) 注意&#xff1a; 若n …

亚马逊 EC2服务器下部署java环境

1. jdk 1.8 安装 1.1 下载jdk包 官网 Java Downloads | Oracle tar.gz 包 下载下来 1.2 本地连接 服务器 我用的是亚马逊的ec2 系统是 ubuntu 的 ssh工具是 Mobaxterm , 公有dns 创建实例时的秘钥 链接 Mobaxterm 因为使用的 ubuntu 所以登录的 名称 就是 ubuntu 然后 …

Linux centos 常用命令 【持续更新】

一、查看文件信息 indoe和目录项 # df命令查看每个硬盘分区的inode总数和已经使用的数量 df -i# 查看inode的大学 xfs_growfs /dev/sda1|grep "isize"# 查看文件的indoe号码 ls -istat查看文件信息 # 文件的详细信息 stat anaconda-ks.cfg # -t参数是在一行内输出…

Linux 的基本指令(3)

指令1&#xff1a;date 作用&#xff1a;用来获取时间的指令。 1. 获取当下的时间&#xff1a; date %Y-%m-%d_%H:%M:%S 其中&#xff1a;%Y 表示年&#xff0c;%m 表示月&#xff0c;%d 表示日&#xff0c;%H 表示 小时&#xff0c;%M 表示分&#xff0c;%S 表示秒。 上面代…

用 oneAPI 实现 AI 欺诈检测:一款智能图像识别工具

简介 虚假图像和视频日益成为社交媒体、新闻报道以及在线内容中的一大隐患。在这个信息爆炸的时代&#xff0c;如何准确地识别和应对这些虚假内容已经成为一个迫切的问题。为了帮助用户更好地辨别虚假内容&#xff0c;我开发了一款基于 oneAPI、TensorFlow 和 Neural Compress…

springBoot集成caffeine,自定义缓存配置 CacheManager

目录 springboot集成caffeine Maven依赖 配置信息&#xff1a;properties文件 config配置 使用案例 Caffeine定制化配置多个cachemanager springboot集成redis并且定制化配置cachemanager springboot集成caffeine Caffeine是一种基于服务器内存的缓存库。它将数据存储在…

进销存管理系统(小杨国贸)springboot采购仓库财务java jsp源代码mysql

本项目为前几天收费帮学妹做的一个项目&#xff0c;Java EE JSP项目&#xff0c;在工作环境中基本使用不到&#xff0c;但是很多学校把这个当作编程入门的项目来做&#xff0c;故分享出本项目供初学者参考。 一、项目描述 进销存管理系统&#xff08;小杨国贸&#xff09;spri…

k8s之StorageClass(NFS)

一、前言 1、环境 k8s v1.23.5 &#xff0c;服务器是centos7.9 192.168.164.20 k8s-master1 192.168.164.30 k8s-node1 192.168.164.40 k8s-node2 2、貌似storageClass在kubernetes v1.20就被砍了。 因为它比较慢&#xff0c;而且耗资源&#xff0c;但可以通过不同的实现镜…

玩机搞机--【开机出现您的设备内部出现了问题,请联系你的制造商了解详情】故障解决思路

很多友友在玩机过程中经常会遇到下图所示故障。大多数都是刷了第三方系统或者内核或者面具导致的。正常来说。这个提示可以无视的&#xff0c;不影响正常的手机使用。但强迫症例外。究其原因。一般是内核校验原因。解决方法也分为多种。今天就为大家解析下这个提示的解决思路 &…

基于docker部署的Selenium Grid分布式自动化测试

01、什么是Selenium Grid Selenium Grid是Selenium套件的一部分&#xff0c;它专门用于并行运行多个测试用例在不同的浏览器、操作系统和机器上。 Selenium Grid有两个版本——老版本Grid 1和新版本Grid 2。我们只对新版本做介绍&#xff0c;因为Selenium团队已经逐渐遗弃老版…

yum 安装本地包 rpm

有时直接yum install 有几个包死活下不下来 根据网址&#xff0c;手动下载&#xff0c;下载后上传至 centos 然后运行 sudo yum localinstall xxx.rpm 即可安装 参考 https://blog.csdn.net/weiguang1017/article/details/52293244

微服务01-SpringCloud

1、简介 SpringCloud集成了各种微服务功能组件&#xff0c;并基于SpringBoot实现了这些组件的自动装配&#xff0c;从而提供了良好的开箱即用体验。 其中常见的组件包括&#xff1a; 2、服务拆分和远程调用 2.1 服务拆分 这里总结了微服务拆分时的几个原则&#xff1a; …

JAVA Android 正则表达式

正则表达式 正则表达式是对字符串执行模式匹配的技术。 正则表达式匹配流程 private void RegTheory() {// 正则表达式String content "1998年12月8日&#xff0c;第二代Java平台的企业版J2EE发布。1999年6月&#xff0c;Sun公司发布了第二代Java平台(简称为Java2) &qu…

HTML+JavaScript构建一个将C/C++定义的ANSI字符串转换为MASM32定义的DWUniCode字符串的工具

公文一键排版系统基本完成&#xff0c;准备继续完善SysInfo&#xff0c;增加用户帐户信息&#xff0c;其中涉及到Win32_Account结构&#xff0c;其C定义如下&#xff1a; [Dynamic, Provider("CIMWin32"), UUID("{8502C4CC-5FBB-11D2-AAC1-006008C78BC7}"…

【Linux】进程间通信——System V信号量

目录 写在前面的话 一些概念的理解 信号量的引入 信号量的概念及使用 写在前面的话 System V信号量是一种较低级的IPC机制&#xff0c;使用的时候需要手动进行操作和同步。在现代操作系统中&#xff0c;更常用的是POSIX信号量&#xff08;通过sem_*系列的函数进行操作&…

【雕爷学编程】Arduino动手做(24)---水位传感器模块3

37款传感器与模块的提法&#xff0c;在网络上广泛流传&#xff0c;其实Arduino能够兼容的传感器模块肯定是不止37种的。鉴于本人手头积累了一些传感器和执行器模块&#xff0c;依照实践出真知&#xff08;一定要动手做&#xff09;的理念&#xff0c;以学习和交流为目的&#x…

激活函数总结(五):Shrink系列激活函数补充(HardShrink、SoftShrink、TanhShrink)

激活函数总结&#xff08;五&#xff09;&#xff1a;Shrink系列激活函数补充 1 引言2 激活函数2.1 HardShrink激活函数2.2 SoftShrink激活函数2.3 TanhShrink激活函数 3. 总结 1 引言 在前面的文章中已经介绍了一系列激活函数 (Sigmoid、Tanh、ReLU、Leaky ReLU、PReLU、Swis…

基本动态规划问题的扩展

基本动态规划问题的扩展 应用动态规划可以有效的解决许多问题&#xff0c;其中有许多问题的数学模型&#xff0c;尤其对一些自从57年就开始研究的基本问题所应用的数学模型&#xff0c;都十分精巧。有关这些问题的解法&#xff0c;我们甚至可以视为标准——也就是最优的解法。…