大数据-玩转数据-Flink-Transform(上)

一、Transform

转换算子可以把一个或多个DataStream转成一个新的DataStream.程序可以把多个复杂的转换组合成复杂的数据流拓扑.
在这里插入图片描述

二、基本转换算子

2.1、map(映射)

将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素

package com.lyh.flink05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;

public class Transform_map {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Integer> s_num = svn.fromElements(1, 2, 3, 4, 5);
        s_num.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer values) throws Exception {
                return values*values;
            }
        }).print();

    }
}

2.2、filter(过滤)

根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃

package com.lyh.flink05;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;

public class Transform_filter {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Integer> elements = svn.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9);
//        elements.filter(new FilterFunction<Integer>() {
//            @Override
//            public boolean filter(Integer integer) throws Exception {
//                if (integer % 2 == 0 )
//                        return false;
//                        else {
//                            return true;
//                }
//            }
//        }).print();
        elements.filter(value -> value % 2 != 0).print();
    }
}

2.3、flatMap(扁平映射)

消费一个元素并产生零个或多个元素

package com.lyh.flink05;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.util.Collector;

import java.util.concurrent.ExecutionException;

public class flatMap {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment svn = ExecutionEnvironment.getExecutionEnvironment();
        DataSource<Integer> dataSource = svn.fromElements(1, 2, 3);
        dataSource.flatMap(new FlatMapFunction<Integer, Integer>() {
            @Override
            public void flatMap(Integer integer, Collector<Integer> collector) throws Exception {
                collector.collect(integer*integer);
                collector.collect(integer*integer*integer);
            }
        }).print();
    }
}

三、聚合算子

3.1、keyBy(按键分区)

把流中的数据分到不同的分区(并行度)中.具有相同key的元素会分到同一个分区中

package com.lyh.flink05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class keyBy_s {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以数组的第一个元素作为key
                .map((MapFunction<Tuple2<Long, Long>, String>) longLongTuple2 -> "key:" + longLongTuple2.f0 + ",value:" + longLongTuple2.f1)
                .print();

        env.execute("execute");
    }
}

3.2、sum,min,max,minBy,maxBy(简单聚合)

KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream

package com.lyh.flink05;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class keyBy_s {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // this can be used in a streaming program like this (assuming we have a StreamExecutionEnvironment env)
        env.fromElements(Tuple2.of(2L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(2L, 4L), Tuple2.of(1L, 2L))
                .keyBy(0) // 以数组的第一个元素作为key
                .sum(1)
                .print();

        env.execute("execute");
    }
}

3.3、reduce(归约聚合)

一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
在这里插入图片描述

package com.lyh.flink05;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class keyByReduce {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        env.fromElements(Tuple2.of(1L,2L),Tuple2.of(2L,4L),Tuple2.of(2L,9L),Tuple2.of(1L,9L),Tuple2.of(1L,2L),Tuple2.of(2L,3L))
                .keyBy(0)
                .reduce(new ReduceFunction<Tuple2<Long, Long>>() {
                    @Override
                    public Tuple2<Long, Long> reduce(Tuple2<Long, Long> values1, Tuple2<Long, Long> values2) throws Exception {
                        return new Tuple2<>(values1.f0,values1.f1+values2.f1);
                    }
                })
                .print();
        env.execute();
    }
}

3.4、process(底层处理)

process算子在Flink算是一个比较底层的算子, 很多类型的流上都可以调用, 可以从流中获取更多的信息(不仅仅数据本身),process 用法比较灵活,后面再做专门研究。

package com.lyh.flink05;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class Process_s {

    public static void main(String[] args) throws Exception {

        // 获取环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5);
        SingleOutputStreamOperator<Integer> processed = streamSource.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
                if (value % 3 == 0) {
                    //测流数据
                    ctx.output(new OutputTag<Integer>("3%0", TypeInformation.of(Integer.class)), value);
                }
                if (value % 3 == 1) {
                    //测流数据
                    ctx.output(new OutputTag<Integer>("3%1", TypeInformation.of(Integer.class)), value);
                }
                //主流 ,数据
                out.collect(value);
            }
        });

        DataStream<Integer> output0 = processed.getSideOutput(new OutputTag<>("3%0",TypeInformation.of(Integer.class)));
        DataStream<Integer> output1 = processed.getSideOutput(new OutputTag<>("3%1",TypeInformation.of(Integer.class)));
        output1.print();

        env.execute();
    }
}

四、下节合流算子、用户自定义函数、分区算子

待续。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/64063.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 130. 被围绕的区域

题目链接&#xff1a;130. 被围绕的区域 题目描述 给你一个 m x n 的矩阵 board &#xff0c;由若干字符 ‘X’ 和 ‘O’ &#xff0c;找到所有被 ‘X’ 围绕的区域&#xff0c;并将这些区域里所有的 ‘O’ 用 ‘X’ 填充。 示例1&#xff1a; 输入&#xff1a;board [[“…

人大金仓数据库Docker部署

docker 搭建 yum -y install yum-utilsyum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.reposystemctl start docker.servicesystemctl enable docker.servicesystemctl status docker.service 配置Docker cd /etc/docker/ vi da…

搭建简易syslog日志中转服务器

在某种场景下&#xff0c;无法接入日志审计设备&#xff0c;本文提供一种方式&#xff0c;可通过搭建简易日志中转服务器&#xff0c;收集到该环境下的日志后&#xff0c;再将其导入日志审计设备中。 0x1 开启服务 rsyslog守护进程来自于当前的linux发布版本的预装模块&#x…

初识网络(JavaEE初阶系列9)

目录 前言&#xff1a; 1.网络的发展史 1.1独立模式 1.2网络互联 1.3局域网LAN 1.4广域网WAN 2.网络通信基础 2.1IP地址 2.2端口号 3.认识协议 3.1协议分层 3.2分层的作用 3.3TCP/IP五层&#xff08;或四层&#xff09;模型 3.4OSI七层模型 3.5网络设备所在分层 …

通向架构师的道路之weblogic与apache的整合与调优

一、BEAWeblogic的历史 BEA WebLogic是用于开发、集成、部署和管理大型分布式Web应用、 网络应用和数据库应 用的Java应用服务器。将Java的动态功能和Java Enterprise标准的安全性引入大型网络应用的 开发、集成、部署和管理之中。 BEA WebLogic Server拥有处理关键Web应…

迭代器模式(C++)

定义 提供一种方法顺序访问一个聚合对象中的各个元素&#xff0c;而又不暴露(稳定)该对象的内部表示。 应用场景 在软件构建过程中&#xff0c;集合对象内部结构常常变化各异。但对于这些集合对象&#xff0c;我们希望在不暴露其内部结构的同时&#xff0c;可以让外部客户代…

github 无语的问题,Host does not existfatal: Could not read from remote repository.

Unable to open connection: Host does not existfatal: Could not read from remote repository. image.png image.png image.png Please make sure you have the correct access rights and the repository exists. 如果github desktop和git pull 和git clone全部都出问题了&…

Elastic:linux设置elasticsearch、kibana开机自启

0. 引言 每次启动服务器都要手动启动es服务&#xff0c;相当之不方便&#xff0c;为此&#xff0c;书写一个脚本&#xff0c;实现es、kibana的开机自启 1. 原理 首先任何服务要实现开机自启&#xff0c;都可分为如下三步&#xff1a; 1、在/etc/init.d目录下创建启动、关闭服…

Namecheap 便宜域名注册使用,直接购买

FREENOM免费域名不能注册了&#xff0c;现在只能自己动手注册便宜的域名&#xff0c;前面我们也记录了不能注册FREENOM免费域名不能注册怎么办&#xff0c;不能注册FREENOM免费域名&#xff0c;怎么办&#xff0c;这里是解决方案&#xff01; 注册6元域名。 现在我们又多了一个…

tomcat上部署jpress

一.确保有jdk&#xff0c;tomcat和mysql环境 二.新建jpress数据库&#xff0c;新建jpress用户并赋予所有权限 三.将jpress的war上传到tomcat/apache-tomcat-8.5.70/webapps&#xff0c;具体根据你的实际tomcat安装路径为准&#xff0c;上传完成后他会自己解包 四.到浏览器完…

LangChain手记 Overview

整理并翻译自DeepLearning.AILangChain的官方课程&#xff1a;Overview 综述&#xff08;Overview&#xff09; LangChain是为大模型应用开发设计的开源框架 LangChain目前提供Python和JavaScript&#xff08;TypeScript&#xff09;两种语言的包 LangChain的主攻方向是聚合和…

探讨|使用或不使用机器学习

动动发财的小手&#xff0c;点个赞吧&#xff01; 机器学习擅长解决某些复杂问题&#xff0c;通常涉及特征和结果之间的困难关系&#xff0c;这些关系不能轻易地硬编码为启发式或 if-else 语句。然而&#xff0c;在决定 ML 是否是当前给定问题的良好解决方案时&#xff0c;有一…

【Renpy】设置选项不满足条件禁止选择

【要求】如果某个属性不满足某个要求&#xff0c;则无法选择这个选项。 【版本】Renpy 8.1.1 【实现】 1.在options.rpy文件中添加 define config.menu_include_disabled True 2.在选项中增加if条件。 menu:"Yes" if money > 20: ##如果money小于20这个选项…

Docker desktop使用配置

1. 下载安装 https://www.docker.com/ 官网下载并安装doker desktop 2. 配置镜像 &#xff08;1&#xff09;首先去阿里云网站上进行注册&#xff1a;https://cr.console.aliyun.com/cn-hangzhou/instances/mirrors &#xff08;2&#xff09;注册完成后搜索&#xff1a;容…

大数据技术之Hadoop(二)

目录 一、Hadoop的诞生 二、大数据概述 三、大数据软件生态 3.1 数据存储相关技术 3.2 数据计算相关技术 3.3 数据传输相关技术 四、什么是Hadoop 本篇主要讲解大数据的核心概念以及Hadoop的基本介绍。 一、Hadoop的诞生 大数据的发展与日益庞大的数据量是密不可分的。从…

Android 网络协议与网络编程

一、TCP/IP协议 Transmission Control Protocol/Internet Protocol的简写&#xff0c;中译名为传输控制协议/因特网互联 协议&#xff0c;是Internet最基本的协议、Internet国际互联网络的基础&#xff0c;由网络层的IP协议和传输层的TCP 协议组成。协议采用了4层的层级结构。…

(15)Qt绘图(two)

目录 坐标变换 平移坐标轴 缩放坐标轴 旋转坐标轴 定时器加坐标轴旋转实现动画旋转 transform旋转&#xff08;可设置旋转轴&#xff09; 绕X轴旋转 绕Y轴旋转 绕Z轴旋转 错切 Y轴错切 X轴错切 画家的保存与坐标复原 基本图形绘制 绘制点 绘制线 绘制矩形 普…

carla中lka实现(一)

前言&#xff1a; 对于之前项目中工作内容进行总结&#xff0c;使用Carla中的车辆进行lka算法调试&#xff0c;整体技术路线&#xff1a; ①在Carla中生成车辆&#xff0c;并在车辆上搭载camera&#xff0c;通过camera采集图像数据&#xff1b; ②使用图像处理lka算法&#…

Python入门自学进阶-Web框架——38、redis、rabbitmq、git

缓存数据库redis&#xff1a; NoSQL&#xff08;Not only SQL&#xff09;泛指非关系型的数据库。为了解决大规模数据集合多重数据类的挑战。 NoSQL数据库的四大分类&#xff1a; 键值&#xff08;Key-Value&#xff09;存储数据库列存储数据库文档型数据库图形&#xff08;…

STM32基础入门学习笔记:核心板 电路原理与驱动编程

文章目录&#xff1a; 一&#xff1a;LED灯操作 1.LED灯的点亮和熄灭 延迟闪烁 main.c led.c led.h BitAction枚举 2.LED呼吸灯&#xff08;灯的强弱交替变化&#xff09; main.c delay.c 3.按键控制LED灯 key.h key.c main.c 二&#xff1a;FLASH读写程序(有…