2.生成Transformation

目录

前言

Source

FlatMap

KeyBy

sum

总结


前言

以下面的WordCount为例

package com.wlh.p1;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class HelloWorld {
    public static void main(String[] args) throws Exception {
        //
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.socketTextStream("localhost", 7777)
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        String[] split = s.split(" ");
                        for (String word : split) {
                            collector.collect(Tuple2.of(word, 1));
                        }
                    }
                })
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                        return stringIntegerTuple2.f0;
                    }
                })
                .sum(1)
                .print();

        //
        env.execute();

    }
}

上面是一个简单的WordCount程序,理解Flink之前,需要理解好Flink中的一些核心抽象概念

如下图所示,主要为3个:

(1)Transformation

(2)StreamOperator

(3)User-Defined Function

Transformation指的是在DateStream之间转换的操作,比如上面WordCount例子中的flatMap,它其实就对应着一个Transformation,表示从某个DataStream转换为另一个DataStream对应的Transformation。

以WordCount为例,先看一下对应的transformations,上述任务对应的transformations是一个list,list包含3个元素,但是元素对应的transformation的id是2/4/5。具体这些transformation是如何产生的是本文的重点。

Source

我们根据代码一步步跟进看一下

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

当获取到流式执行环境后(如果在本地获取的是Local的执行环境),StreamExecutionEnvironment中会存在一个成员变量transformations,初始化为空集合。

env.socketTextStream("localhost", 7777)

跟进代码后,

可以看到new SocketTextStreamFunction

这就是上面说的User-Defined Function,跟进该方法,可以看到确实是继承了Function接口

跟进addSource(

        new SocketTextStreamFunction(hostname, port, delimiter, maxRetry), "Socket Stream");

这里有一个比较关键的参数Boundedness.CONTINUOUS_UNBOUNDED

表示该source是一个无界流,事实也是如此,socket流当然是无界的。

跟进 addSource(function, sourceName, typeInfo, Boundedness.CONTINUOUS_UNBOUNDED);

注意TypeInformation<OUT> resolvedTypeInfo =

        getTypeInfo(function, sourceName, SourceFunction.class, typeInfo);

TypeInformation是Flink中类型系统的核心类,该方法的内部逻辑是通过java提供的Type系统来提取的。debug跟进一下类型的提取

baseClass是SourceFunction;clazz是具体的实现类SocketTextStreamFunction。通过new TypeExtractor()对象提取该function具体的输出类型信息。

跟进new TypeExtractor()

        .privateCreateTypeInfo(baseClass, clazz, returnParamPos, in1Type, in2Type);查看具体的类型提取代码,通过反射获取,关于反射获取具体的类型,自行学习了解。

获取到SocketTextStreamFunction的输出类型后,继续跟进 addSource(function, sourceName, typeInfo, Boundedness.CONTINUOUS_UNBOUNDED);

刚刚已经获取输出类型之后,继续跟进后续代码

boolean isParallel = function instanceof ParallelSourceFunction; // 判断该source是不是可以并行的source,很明显这里的isParallel是false

clean(function); // 该行在做闭包清理/检查。如果不通过会报类似异常"Object " + obj + " is not serializable"

final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function); 

通过StreamSource的继承关系可以看出,StreamSource其实是开头提到的二号人物StreamOperator,至此,User-Defined Function和StreamOperator都已经出现了,并且它们的关系是StreamOperator中包含User-Defined Function,和开头图示一致。

return new DataStreamSource<>(

        this, resolvedTypeInfo, sourceOperator, isParallel, sourceName, boundedness);

最终,构建了Transformation,并且将operator传入,至此Transformation、StreamOperator、User-Defined Function都已经出现在视野中。

注意:在看DataStreamSource的继承关系时,会看到它继承自SingleOutputStreamOperator,这里是我觉得Flink命名不太好的地方,SingleOutputStreamOperator会被误认为是StreamOperator,但其实不是,SingleOutputStreamOperator是继承自DataStream的,并且在注释中明确说明SingleOutputStreamOperator是transformation。

跟进DataStream,会看到DataStream中封装了environment环境和通过env.socketTextStream("localhost", 7777)定义的第一个transformation。后面的api操作,如flatMap等,都是基于该DataStream进行操作了。

FlatMap

在上面DataStream的基础上将后续的api都介绍一下,跟进.flatMap

形参flatMapper,即为用户在编程时自定义的function,代码逻辑很清晰,依然是先获取输出的类型信息

跟进return flatMap(flatMapper, outType);

StreamFlatMap是StreamOperator的子类

跟进return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));

跟进return doTransform(operatorName, outTypeInfo, SimpleOperatorFactory.of(operator));

首先new OneInputTransformation,创建一个新的Transformation,在构建的时候,传入了this.transformation,就是上面的LegacySourceTransformation。

new SingleOutputStreamOperator(environment, resultTransform); 创建一个新的DataStream,作为这一步API操作的返回值。

跟进getExecutionEnvironment().addOperator(resultTransform);

将transformation添加进env环境的transformations集合中,这个集合在未来会遍历生成StreamGraph。

KeyBy

在上面DataStream的基础上将后续的api都介绍一下,跟进.keyBy

直接new KeyedStream,跟进

获取key的类型信息,继续跟进构造方法

new PartitionTransformation创建了一个Transformation,将当前Datastream的Transformation作为PartitionTransformation的输入,并且将用户自定义的keySelector封装进KeyGroupStreamPartitioner。

继续跟进后,由于KeyedStream继承自DataStream,同样的,将env和当前的transformation封装进去。

至此KeyedStream构建完成,它的内容如下,

sum

在上面KeyedStream的基础上,继续跟进代码

 SumAggregator<T> extends AggregationFunction<T> 

SumAggerator就是User-Defined Function

跟进return aggregate(new SumAggregator<>(positionToSum, getType(), getExecutionConfig()));

方法的签名为AggeragationFunction

跟进return reduce(aggregate).name("Keyed Aggregation");

这里出现了transformation

getExecutionEnvironment().addOperator(reduce); 这一行代码在之前说过,将该transformation加入到env的transformations中。

这里重点来看一下ReduceTransformation

构造方法如下:包含了聚合算子必须的一些信息,reducer是聚合的函数,input是之前的transformation,keySelector是分组key的提取方式。

值得注意的一行代码updateManagedMemoryStateBackendUseCase(true);

这里是在设置状态后端,这里第一次提到了状态的概念,状态是Flink得以流行的重要原因之一,有状态的流式计算。

print

这里是WordCount例子中的最后一步了,和前面的算子都非常类似,看到这里应该是可以举一反三了。

熟悉的味道,创建Function

创建StreamOperator

在创建DateStream时,创建了transformation

总结

本文介绍了Flink是如何将用户的api转换为Transformation,这是Flink的核心抽象,DataStream是面向用户的,Transformation并不面向用户,在Flink触发执行时,transformation会被Flink转换为更贴近底层执行的各种有向无环图,即常说的DAG。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/937058.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

1. 机器学习基本知识(3)——机器学习的主要挑战

1.5 机器学习的主要挑战 1.5.1 训练数据不足 对于复杂问题而言&#xff0c;数据比算法更重要但中小型数据集仍然很普遍&#xff0c;获得额外的训练数据并不总是一件轻而易举或物美价廉的事情&#xff0c;所以暂时不要抛弃算法。 1.5.2 训练数据不具有代表性 采样偏差&#…

TypeScript学习路线图

‌ TypeScript 是由微软开发和维护的一种静态类型编程语言&#xff0c;它是 JavaScript 的超集。TypeScript 的创建是为了解决构建大规模 JavaScript 应用程序所面临的挑战&#xff0c;并向该语言添加了可选的类型注解、类、接口和其他特性。 使用 TypeScript 的主要好处包括&a…

负载均衡oj项目:编译模块

编译运行模块是一个网络服务&#xff0c;这样编译模块就可以可以快速部署到&#xff0c;其他主机上。 编译模块思路 util.hpp #pragma once #include <string> #include <vector> #include <sys/types.h> #include <sys/stat.h> #include <unistd…

绿色浪潮,VELO Angel Glide坐垫奏响环保骑行乐章

地球的环境日益恶劣&#xff0c;冰川消融、海平面上升、极端天气频繁出现&#xff0c;这一切都在不断提醒着我们&#xff0c;保护地球家园刻不容缓。而在这场关乎人类未来的环保行动中&#xff0c;各个领域都在积极探索可持续发展的道路&#xff0c;自行车坐垫领域也迎来了绿色…

【从零开始入门unity游戏开发之——C#篇09】if-else条件表达式、三元运算符、switch-case的使用

文章目录 一、if条件表达式1、if 语句基本结构示例输出&#xff1a; 2、else语句示例输出&#xff1a; 3、else if 语句示例输出&#xff1a; 4、组合逻辑运算符示例输出&#xff1a; 5、嵌套 if 语句示例输出&#xff1a;总结 二、三元运算符1、语法&#xff1a;2、示例&#…

Visual Studio 使用 GitHub Copilot 扩展

&#x1f380;&#x1f380;&#x1f380;【AI辅助编程系列】&#x1f380;&#x1f380;&#x1f380; Visual Studio 使用 GitHub Copilot 与 IntelliCode 辅助编码Visual Studio 安装和管理 GitHub CopilotVisual Studio 使用 GitHub Copilot 扩展Visual Studio 使用 GitHu…

conda学习

参考: Anaconda 官网教程 https://freelearning.anaconda.cloud/get-started-with-anaconda/18202conda配置虚拟环境/conda环境迁移/python环境迁移 https://blog.csdn.net/qq_43369406/article/details/127140839 环境&#xff1a; macOS 15.2Anaconda Navigator 2.4.2 x.1…

Nginx配置示例教程

最近对Nginx做了一些初步研究&#xff0c;Nginx是lgor Sysoev为俄罗斯访问量第二的rambler.ru站点设计开发。主要根据工作中各类应用服务部署访问的需求&#xff0c;围绕HTTP服务、负载均衡、正反向代理、子路由、静态资源发布访问等&#xff0c;以及结合minio管理的图片文件资…

git使用教程(超详细)-透彻理解git

一.核心基础 核心概念有六个 首先请把与svn有关的一切概念暂时从你的脑海中移除掉&#xff0c;我们要重新认识本文所讲述的所有概念。 1.worktree worktree是一个目录&#xff0c;你在这里对文件进行增加、删除、修改。也就是我们常说的工作区。在git中worktree必须要与一个…

Django结合websocket实现分组的多人聊天

其他地方和上一篇大致相同&#xff0c;上一篇地址点击进入, 改动点1&#xff1a;在setting.py中最后再添加如下配置&#xff1a; # 多人聊天 CHANNEL_LAYERS {"default":{"BACKEND": "channels.layers.InMemoryChannelLayer"} }因此完整的se…

Keil-MDK开发环境编译后axf自动转换bin格式文件

编译选项添加如下&#xff0c;调用fromelf工具自动完成转换&#xff1a; fromelf --bin -o "$LL.bin" "#L"

如何快速搭建若依管理系统?

1、下载若依管理系统前后端分离版代码至本地&#xff08;当前版本为RuoYi v3.8.8&#xff09;&#xff1a; RuoYi-Vue: &#x1f389; 基于SpringBoot&#xff0c;Spring Security&#xff0c;JWT&#xff0c;Vue & Element 的前后端分离权限管理系统&#xff0c;同时提供…

【JavaEE】网络(1)

&#x1f435;本篇文章开始讲解计算机网络相关的知识 一、基础概念 1.1 局域网和广域网 局域网→Local Area Network→简称LAN&#xff0c;局域网是局部组建的一种私有网络&#xff0c;局域网内的主机之间可以进行网络通信&#xff0c;局域网和局域网之间在没有连接的情况不能…

网络应用技术 实验八:防火墙实现访问控制(华为ensp)

目录 一、实验简介 二、实验目的 三、实验需求 四、实验拓扑 五、实验步骤 1、设计全网 IP 地址 2、设计防火墙安全策略 3、在 eNSP 中部署园区网 4、配置用户主机地址 5、配置网络设备 配置交换机SW-1~SW-5 配置路由交换机RS-1~RS-5 配置路由器R-1~R-3 6、配置仿…

day11 性能测试(4)——Jmeter使用(黑马的完结,课程不全)直连数据库+逻辑控制器+定时器

【没有所谓的运气&#x1f36c;&#xff0c;只有绝对的努力✊】 目录 1、复习 1.1 断言&#xff08;3种&#xff09; 1.2 关联&#xff08;3种&#xff09; 1.3 录制脚本 2、Jmeter直连数据库 2.1 直连数据库——使用场景 2.2 直连数据库——操作步骤 2.2.1 案例1&…

Modelscope AgentFabric: 开放可定制的AI智能体构建框架

目录 git clone https://github.com/modelscope/modelscope-agent.git cd modelscope-agent && pip install -r requirements.txt && pip install -r apps/agentfabric/requirements.txtexport PYTHONPATH$PYTHONPATH:/home/ubuntu/users/lilingfei/modelscop…

CSS|08 浮动清除浮动

浮动 需求: 能够实现让多个元素排在同一行&#xff0c;并且给这些元素设置宽度与高度! 让多个元素排在同一行:行内元素的特性 给这些元素设置宽高:块级元素的特性 在标准文档流中的元素只有两种:块级元素和行内元素。如果想让一些元素既要有块级元素的特点也要有行内元素的特…

[Pro Git#3] 远程仓库 | ssh key | .gitignore配置

目录 1. 分布式版本控制系统的概念 2. 实际使用中的“中央服务器” 3. 远程仓库的理解 4. 新建远程仓库 5. 克隆远程仓库 6. 设置SSH Key 实验 一、多用户协作与公钥管理 二、克隆后的本地与远程分支对应 三、向远程仓库推送 四、拉取远程仓库更新 五、配置Git忽略…

【uni-app】App与webview双向实时通信

【uni-app】App与webview双向实时通信 在 Uniapp 中&#xff0c;App 与 里面嵌入的 webview 进行双向的实时通信 vue2 &#xff0c; 模拟器 主要分为两部分 webview 向 app 发送信息 app 向 webview 发送信息 以下是实现方式&#xff0c;用一个例子来说明 &#xff08;文…

微信小程序处理交易投诉管理,支持多小程序

大家好&#xff0c;我是小悟 1、问题背景 玩过微信小程序生态的&#xff0c;或许就有这种感受&#xff0c;如果收到投诉单&#xff0c;不会及时通知到手机端&#xff0c;而是每天早上10:00向小程序的管理员及运营者推送通知。通知内容为截至前一天24时该小程序账号内待处理的交…