大数据学习之Flink算子、了解(Transformation)转换算子(基础篇三)

Transformation转换算子(基础篇三)


目录

Transformation转换算子(基础篇三)

三、转换算子(Transformation)

1.基本转换算子

1.1 映射(Map)

1.2 过滤(filter)

1.3 扁平映射(flatmap)

1.4基本转换算子的例子

2.聚合算子(Aggregation)

2.1 按键分区(keyBy)

2.2 简单聚合

2.3 归约聚合(reduce)

3.用户自定义函数(UDF)

3.1 函数类(Function Classes)

3.2 富函数类(Rich Function Classes)

4.物理分区(Physical Partitioning)

4.1 随机分区(shuffle)

4.2 轮询分区(Round-Robin)

4.3 重缩放分区(rescale)

4.4 广播(broadcast)

4.5 全局分区(global)

4.6 自定义分区(Custom)


三、转换算子(Transformation)

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个 DataStream 转换为新的 DataStream,如图所示。一个Flink程序的核心,其实就是所有的转换操作,它们决定了处理的业务逻辑。

1.基本转换算子

1.1 映射(Map)

map算子接收一个函数作为参数,并把这个函数应用于DataStream的每个元素,最后将函数的返回结果作为结果DataStream中对应元素的值,即将DataStream的每个元素转换成新的元素。

1.2 过滤(filter)

filter 转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉。

1.3 扁平映射(flatmap)

flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理

1.4基本转换算子的例子

代码如下:

import org.apache.flink.streaming.api.scala._

object Practice_of_Simple_Operators {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1) //设置并行度为1
    //常见的简单算子 有:map、flatmap、filter
    //map
    //从集合中获取不同数据类型数据
    val dataStream1 = env.fromCollection(List(1,2,3))
    //对每一个数 都乘以2
    val resultStream1 = dataStream1.map(data => data * 2)
    resultStream1.print("resultStream1")
    //flatmap
    val dataStream2 = env.fromCollection(List("hello word","hello flink","hello spark"))
    val resultStream2 = dataStream2.flatMap(_.split(" "))
    resultStream2.print("resultStream2")
    //filter
    val resultStream3 = dataStream1.filter(_%2==0)
    resultStream3.print("resultStream3")

    env.execute("Stream Transform")//启动flink作业
  }
}

运行结果:

2.聚合算子(Aggregation)

  • 直观上看,基本转换算子确实是在“转换”——因为它们都是基于当前数据,去做了处理和输出。

  • 而在实际应用中,我们往往需要对大量的数据进行统计或整合,从而提炼出更有用的信息。比如之前 word count 程序中,要对每个词出现的频次进行叠加统计。这种操作,计算的结果不仅依赖当前数据,还跟之前的数据有关,相当于要把所有数据聚在一起进行汇总合并——这就是所谓的“聚合”(Aggregation),也对应着 MapReduce 中的 reduce 操作。

2.1 按键分区(keyBy)
  • 对于 Flink 而言,DataStream是没有直接进行聚合的API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,要做聚合,需要先进行分区; 这个操作就是通过keyBy来完成的。

  • keyBy 是聚合前必须要用到的一个算子。keyBy 通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)。

  • 基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot 中进行处理了。

  • keyBy算子主要作用于元素类型是元组或数组的DataStream上。使用该算子可以将DataStream中的元素按照指定的key(字段)进行分组,具有相同key的元素将进入同一个分区中(不进行聚合),并且不改变原来元素的数据结构。在逻辑上将流划分为不相交的分区,在内部是通过哈希分区实现的。

//配置执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//fromElements --> 给一个固定的元素集合 创建一个数据流(DataStream)
//数据流是以键值对的形式存在的
val source = env.fromElements((1, 2), (2, 1),(1, 6), (1, 9), (1, 7),  (2, 2), (2, 10), (3, 1))
//keyby算子
source.keyBy(temp => temp._1).print("result")
 // 执行 Flink 作业
env.execute("Flink FromElements Example")

运行结果:

2.2 简单聚合
  • 有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们 内置实现了一些最基本、最简单的聚合 API,主要有以下几种:

    • sum():在输入流上,对指定的字段做叠加求和的操作。

    • min():在输入流上,对指定的字段求最小值。

    • max():在输入流上,对指定的字段求最大值。

    • minBy():与 min()类似,在输入流上针对指定字段求最小值。

      不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;

      而 minBy()则会返回包含字段最小值的整条数据。

    • maxBy():与 max()类似,在输入流上针对指定字段求最大值。

      不同的是,max()只计算指定字段的最大值,其他字段会保留最初第一个数据的值;

      而 maxBy()则会返回包含字段最大值的整条数据。

  • 简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数; 但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字 段的方式有两种:指定位置,和指定名称。 对于元组类型的数据,同样也可以使用这两种方式来指定字段。需要注意的是,元组中字 段的名称,是以1、2、_3、…来命名的。 例如,下面就是对元组数据流进行聚合的测试:

  • 对于元组类型的数据,同样也可以使用这两种方式来指定字段。需要注意的是,元组中字 段的名称,是以1、2、_3、…来命名的。

测试:

import org.apache.flink.streaming.api.scala._

object TransTupleAggregation {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val stream = env
      .fromElements(
        ("a", 1), ("a", 3), ("b", 3), ("b", 4)
      )
    stream.print("原数据")
    stream.keyBy(_._1).sum(1).print() //对元组的索引 1 位置数据求和
    stream.keyBy(_._1).sum("_2").print() //对元组的第 2 个位置数据求和
    stream.keyBy(_._1).max(1).print() //对元组的索引 1 位置求最大值
    stream.keyBy(_._1).max("_2").print() //对元组的第 2 个位置数据求最大值
    stream.keyBy(_._1).min(1).print() //对元组的索引 1 位置求最小值
    stream.keyBy(_._1).min("_2").print() //对元组的第 2 个位置数据求最小值
    stream.keyBy(_._1).maxBy(1).print() //对元组的索引 1 位置求最大值
    stream.keyBy(_._1).maxBy("_2").print() //对元组的第 2 个位置数据求最大值
    stream.keyBy(_._1).minBy(1).print() //对元组的索引 1 位置求最小值
    stream.keyBy(_._1).minBy("_2").print() //对元组的第 2 个位置数据求最小值
    env.execute()
  }
}

而如果数据流的类型是样例类,那么就只能通过字段名称来指定,不能通过位置来指定了。

import org.apache.flink.streaming.api.scala._
object TransAggregationCaseClass {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    val stream = env.fromElements(
      Event("Mary", "./home", 1000L),
      Event("Bob", "./cart", 2000L)
    )
    // 使用 user 作为分组的字段,并计算最大的时间戳
    stream.keyBy(_.user).max("timestamp").print()
    env.execute()
  }
}

一个聚合算子,会为每一个key保存一个聚合的值,在Flink中我们把它叫作“状态”(state)。 所以每当有一个新的数据输入,算子就会更新保存的聚合结果,并发送一个带有更新后聚合值 的事件到下游算子。对于无界流来说,这些状态是永远不会被清除的,所以我们使用聚合算子, 应该只用在含有有限个 key 的数据流上。

2.3 归约聚合(reduce)
  • 与简单聚合类似,reduce()操作也会将 KeyedStream 转换为 DataStream。它不会改变流的 元素数据类型,所以输出类型和输入类型是一样的。

  • 调用 KeyedStream 的 reduce()方法时,需要传入一个参数,实现 ReduceFunction 接口。接 口在源码中的定义如下:

public interface ReduceFunction<T> extends Function, Serializable {
    T reduce(T value1, T value2) throws Exception;
}

ReduceFunction 接口里需要实现 reduce()方法,这个方法接收两个输入事件,经过转换处 理之后输出一个相同类型的事件;所以,对于一组数据,我们可以先取两个进行合并,然后再 将合并的结果看作一个数据、再跟后面的数据合并,最终会将它“简化”成唯一的一个数据, 这也就是 reduce“归约”的含义。在流处理的底层实现过程中,实际上是将中间“合并的结果” 作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。

下面我们来看一个稍复杂的例子。

我们将数据流按照用户 id 进行分区,然后用一个 reduce()算子实现 sum()的功能,统计每 个用户访问的频次;进而将所有统计结果分到一组,用另一个 reduce()算子实现 maxBy()的功 能,记录所有用户中访问频次最高的那个,也就是当前访问量最大的用户是谁。

import org.apache.flink.streaming.api.scala._

object TransReduce {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env
      .addSource(new ClickSource)
      .map(r => (r.user, 1L))
      //按照用户名进行分组
      .keyBy(_._1)
      //计算每个用户的访问频次
      .reduce((r1, r2) => (r1._1, r1._2 + r2._2))
      //将所有数据都分到同一个分区
      .keyBy(_ => true)
      //通过 reduce 实现 max 功能,计算访问频次最高的用户
      .reduce((r1, r2) => if (r1._2 > r2._2) r1 else r2)
      .print()
    env.execute()
  }
}

reduce()同简单聚合算子一样,也要针对每一个 key 保存状态。因为状态不会清空,所以 我们需要将 reduce()算子作用在一个有限 key 的流上。

3.用户自定义函数(UDF)

3.1 函数类(Function Classes)
3.2 富函数类(Rich Function Classes)

4.物理分区(Physical Partitioning)

4.1 随机分区(shuffle)

4.2 轮询分区(Round-Robin)

4.3 重缩放分区(rescale)

4.4 广播(broadcast)
4.5 全局分区(global)
4.6 自定义分区(Custom)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/344194.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【优先级队列 之 堆的实现】

文章目录 前言优先级队列 PriorityQueue优先队列的模拟实现 堆堆的储存方式堆的创建建堆的时间复杂度堆的插入与删除 总结 前言 优先级队列 PriorityQueue 概念&#xff1a;对列是先进先出的的数据结构&#xff0c;但有些情况&#xff0c;数据可能带有优先级&#xff0c;一般出…

C++:使用tinyXML生成矢量图svg

先说一下tinyXML库的配置&#xff1a; 很简单&#xff0c;去下面官网下载 TinyXML download | SourceForge.net 解压后是这样 直接将红框中的几个文件放到项目中即可使用 关于svg文件&#xff0c;SVG是基于XML的可扩展矢量图形&#xff0c;svg是xml文件&#xff0c;但是xml…

TCP三握四挥(面试需要)

TCP建立连接需要三次握手过程&#xff0c;关闭连接需要四次挥手过程 三次握手 从图中可以看出&#xff0c;客户端在发起connect时&#xff0c;会发起第一次和第三次握手。服务端在接收客户端连接时&#xff0c;会发起第二次握手。 这三次握手&#xff0c;都会通过SYNACK的方式…

阿里云4核8G云服务器价格、带宽及系统盘费用

阿里云服务器4核8g配置云服务器u1价格是955.58元一年&#xff0c;4核8G配置还可以选择ECS计算型c7实例、计算型c8i实例、计算平衡增强型c6e、ECS经济型e实例、AMD计算型c8a等机型等ECS实例规格&#xff0c;规格不同性能不同&#xff0c;价格也不同&#xff0c;阿里云服务器网al…

EasyExcel入门使用

EasyExcel是一个基于Java的、快速、简洁、解决大文件内存溢出的Excel处理工具。他能让你在不用考虑性能、内存的等因素的情况下&#xff0c;快速完成Excel的读、写等功能。 EasyExcel 的主要特点如下&#xff1a; 1、高性能&#xff1a;EasyExcel 采用了异步导入导出的方式&…

数据结构:搜索二叉树 | 红黑树 | 验证是否为红黑树

文章目录 1.红黑树的概述2.红黑树的性质3.红黑树的代码实现3.1.红黑树的节点定义3.2.红黑树的插入操作3.3.红黑树是否平衡 黑红树是一颗特殊的搜索二叉树&#xff0c;本文在前文的基础上&#xff0c;图解红黑树插入&#xff1a;前文 链接&#xff0c;完整对部分关键代码展示&a…

macOS跨进程通信: Unix Domain Socket 创建实例

macOS跨进程通信: Unix Domain Socket 创建实例 一&#xff1a; 简介 Socket 是 网络传输的抽象概念。 一般我们常用的有Tcp Socket和 UDP Scoket&#xff0c; 和类Unix 系统&#xff08;包括Mac&#xff09;独有的 Unix Domain Socket&#xff08;UDX&#xff09;。 Tcp So…

避雷!仅1天撤回55篇中国学者的研究论文!这本毕业神刊需要注意!

【SciencePub学术】 这本国际期刊&#xff0c;仅仅去年12月一个月的时间&#xff0c;就撤回了近60篇国人文章&#xff01;这本期刊就是来自Hindawi出版社的APPLIED BIONICS AND BIOMECHANICS。 01 期刊信息简介 APPLIED BIONICS AND BIOMECHANICS IF(2022)&#xff1a;2.2&…

三维城市模型提升日本的智慧城市管理

MicroStation 将工作效率提高 50%&#xff0c;实现了前所未有的逼真模拟 构建三维城市模型生态系统 PLATEAU 项目由日本国土交通省牵头&#xff0c;是一项三维城市模型和数字孪生计划&#xff0c;旨在到 2027 年为日本 500 个城市构建开放的城市模型数字生态系统。作为日本最…

java获取linux和window序列号

前言 获取系统序列号在Java中并不是一个直接支持的功能&#xff0c;因为Java语言本身并不提供直接访问硬件级别的信息&#xff0c;如CPU序列号。但是&#xff0c;我们可以使用一些平台特定的工具或命令来实现这一功能。下面我将展示如何使用Java获取Windows和Linux系统上的CPU…

通过代理服务器的方式解决跨域问题

学习源码可以看我的个人前端学习笔记 (github.com):qdxzw/frontlearningNotes 觉得有帮助的同学&#xff0c;可以点心心支持一下哈 这里以本地访问https://heimahr.itheima.net/api/sys/permission接口为列子 Node.js 代理服务器 (server.js) 本次考虑使用JSONP或CORS代理来…

PHP“引用”漏洞

今日例题&#xff1a; <?php highlight_file(__FILE__); error_reporting(0); include("flag.php"); class just4fun { var $enter; var $secret; } if (isset($_GET[pass])) { $pass $_GET[pass]; $passstr_replace(*,\*,$pass); } $o unser…

【操作系统】实验三 编译 Linux 内核

&#x1f57a;作者&#xff1a; 主页 我的专栏C语言从0到1探秘C数据结构从0到1探秘Linux &#x1f618;欢迎关注&#xff1a;&#x1f44d;点赞&#x1f64c;收藏✍️留言 &#x1f3c7;码字不易&#xff0c;你的&#x1f44d;点赞&#x1f64c;收藏❤️关注对我真的很重要&…

[Linux]HTTP状态响应码和示例

1xx&#xff1a;信息响应类&#xff0c;表示接收到请求并且继续处理 2xx&#xff1a;处理成功响应类&#xff0c;表示动作被成功接收、理解和接受 3xx&#xff1a;重定向响应类&#xff0c;为了完成指定的动作&#xff0c;必须接受进一步处理 4xx&#xff1a;客户端错误&#x…

如何使用Docker本地部署Jupyter Notebook并结合内网穿透实现远程访问

&#x1f4d1;前言 本文主要是Linux下通过使用Docker本地部署Jupyter Notebook并结合内网穿透实现远程访问的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️ &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;…

单调栈笔记

单调栈 1.每日温度2.下一个更大元素 I3.下一个更大的元素4.接雨水5.柱状图中最大的矩形 单调栈正如其名字&#xff0c;用一个栈&#xff08;能够实现栈性质的数据结构就行&#xff09;来存储元素&#xff0c;存储在栈中的元素保持单调性&#xff08;单调递增或者是单调递减&…

Allegro如何设置飞线的显示方式?

预拉线最短化。 设置方法:选择菜单栏Setup→Design Parameters...(参数设置) 跳出下面对话框,根据需求设置。 Jogged:拼合的。 Straight:直的。 Closest endpoint:最靠近端点。 Pin to pin:引脚到引脚。 Jogged的显示效果。

MyBatis的逆向工程的创建,generator插件的使用和可能出现的一些问题,生成的实体类多出.java 1 .java 2这种拓展文件的处理方案

目录 创建逆向工程的步骤 ①添加依赖和插件 ②创建MyBatis的核心配置文件 ③创建逆向工程的配置文件 ④执行MBG插件的generate目标 数据库版本8有可能出现的问题&#xff1a; 1、生成的实体类多了.java 1 .java 2的拓展文件... 2、生成的属性与表中字段不匹配&#xff…

【IEEE会议征稿】2024年第九届智能计算与信号处理国际学术会议(ICSP 2024)

2024年第九届智能计算与信号处理国际学术会议&#xff08;ICSP 2024&#xff09; 2024年第八届智能计算与信号处理国际学术会议&#xff08;ICSP 2024&#xff09;将在西安举行&#xff0c; 会期是2024年4月19-21日&#xff0c; 为期三天, 会议由西安科技大学主办。 欢迎参会&…

Linux 一键部署influxd2-telegraf

influxd2前言 influxd2 是 InfluxDB 2.x 版本的后台进程,是一个开源的时序数据库平台,用于存储、查询和可视化时间序列数据。它提供了一个强大的查询语言和 API,可以快速而轻松地处理大量的高性能时序数据。 telegraf 是一个开源的代理程序,它可以收集、处理和传输各种不…