1.flink快速入门

前言

下图表示的是一个简单的flink-job的计算图,这种图被称为DAG(有向无环图),表示的这个任务的计算逻辑,无论是spark、hive、还是flink都会把用户的计算逻辑转换为这样的DAG,数据的计算按照DAG触发,理论上只要构建出这样一个DAG图,就可以描述清楚用户的计算逻辑,在DAG的基础上,将Node并行化就可以将整个job并行化。

在Flink之前的上一代流式计算框架Apache Storm的hello world如下(节选了一部分):从storm的helloworld代码可以很清楚的看到storm构建dag是依赖用户自己构建,用户将自己脑中的dag图使用代码画出来,line2创建了一个DAG的builder,line4新增了一个节点,line6也新增了一个节点,dag画完了后在line16将DAG生成出来提交到集群执行。从这里可以看出storm构建DAG的逻辑是用户心中有图,自己画出来。

// 实例化TopologyBuilder类。
TopologyBuilder topologyBuilder = new TopologyBuilder();
// 设置喷发节点并分配并发数,该并发数将会控制该对象在集群中的线程数。
topologyBuilder.setSpout("SimpleSpout", new SimpleSpout(), 1);
// 设置数据处理节点并分配并发数。指定该节点接收喷发节点的策略为随机方式。
topologyBuilder.setBolt("SimpleBolt", new SimpleBolt(), 3).shuffleGrouping("SimpleSpout");
Config config = new Config();
config.setDebug(true);
if (args != null && args.length > 0) {
    config.setNumWorkers(1);
    StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
} else {
    // 这里是本地模式下运行的启动代码。
    config.setMaxTaskParallelism(1);
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("simple", config, topologyBuilder.createTopology());
}

再看一下flink的helloworld,代码如下,该代码对应的DAG就是文章开头的图片,下面代码中line3获取一个执行的环境,line6从9999端口读入数据,line7做flatmap,ling15做分组操作,line20对分组的数据做sum聚合,line22执行任务;通过和storm的helloworld的对比,可以很明显的看出flink代码中很难看出DAG的样子,flink专注的并不是用户去画DAG,而是用户表达清楚自己的业务,由flink将DAG画出并执行,这也是flink会将storm慢慢淘汰的原因之一

public class Demo01_hello {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        env.socketTextStream("localhost", 9999)
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        for (String s : value.split(" ")) {
                            out.collect(Tuple2.of(s, 1));
                        }
                    }
                })
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                }).sum(1).print();

        env.execute();
    }
}

总结一下:flink目前提供了多种api,包裹flink-stream-api,table/sql-api,python-api,这些api的表象不同,但是底层都是将用户表达的逻辑翻译为DAG部署到集群上

那就从Hello-world开始吧

大数据的hello-word都是从wordcount开始的,这是mapreduce时代的传承,让我们再看一下flink的wordcount

public class Demo01_hello {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        env.socketTextStream("localhost", 9999)
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        for (String s : value.split(" ")) {
                            out.collect(Tuple2.of(s, 1));
                        }
                    }
                })
                .keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                }).sum(1).print();

        env.execute();
    }
}

line3从StreamExecutionEnvironment获取了一个执行环境,这个环境在本地就是local的,在yarn上就是yarn的,在k8s上就是k8s的

line4设置任务的并行度,这里遇到了第一个概念:并行度,并行度表示任务的并行个数,比如数据源kafka有2个分区,那么最佳的并行度就是2,因为一个分区只能被一个消费者消费,并行度大于2则多余的消费者消费不到数据

line6设置了数据源为socket,监听9999端口

line7对数据源的数据做flatmap操作,输入是string,输出是tuple2<string,integer>

line15对tuple2<string,integer>做了分组操作,按照string分组,这里涉及了另一个概念shuffle,shuffle就是打乱的意思

line20对分组后的数据tuple2<string,integer>做了sum操作,计算出每一个string的数量

ling22执行任务

下图展示了该任务如何从代码变成可以运行的执行图运行在分布式环境中

可以看到上图中有四张图,编写的代码会经历

StreamGraph -> JobGraph -> ExecutionGraph -> 物理执行图,最终提交到集群执行

(1)StreamGraph

a)StreamNode:表示每一个operator,并且携带了这个operator的若干信息

b)StreamEdge:表示streamnode之间的边,边上还携带了标识:rebalance、hash、forward,表示streamnode之间的数据传输方式

c)StreamGraph其实已经很像前言中的dag图了,但是还有些不同 

(2)JobGraph

a)JobVertex:streamgraph中的streamnode如果存在可以优化的情况,比如operator-chain,那么多个streamnode就可以合并为一个jobvertex,operator-chain的条件是streamedge=forward且前后两个streamnode并行度相同

b) IntermediateDataset:jobvertex的产出数据,即若干个operator处理后的结果集

c)JobEdge:数据传输通道,从intermediatedataset传输数据到下游jobvertex

(3)ExecutionGraph

a)ExecutionVertex: jobvertex的并行化节点

b)ExecutionJobVertex:jobvertex对应的节点,一一对应

c)IntermediateResultPartition: 表示ExecutionVertex的输出结果,一个ExecutionVertex对应一个IntermediateResultPartition

d)IntermediateResult:和IntermediateDataset一一对应

e)ExecutionEdge:连接IntermediateResultPartition和ExecutionVertex一一对应

(4)物理执行图

a)Task:具体的调度task,封装了operator的操作,包括用户的逻辑

b)ResultPartition:对应IntermediateResultPartition,一一对应

c)ResultSubPartition:是Resultpartition的子分区,他的数量和下游的task有关,如果source算子就一个,所以他的ResultPartition就一个,但是下游有两个flatmap算子,所以这个ResultPartition会分成2个ResultSubPartition,分别给下游两个flatmap算子消费

d)InputChannel:连接ResultSubPartition和下游task算子的数据通道

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/81545.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

计算机竞赛 卷积神经网络手写字符识别 - 深度学习

文章目录 0 前言1 简介2 LeNet-5 模型的介绍2.1 结构解析2.2 C1层2.3 S2层S2层和C3层连接 2.4 F6与C5层 3 写数字识别算法模型的构建3.1 输入层设计3.2 激活函数的选取3.3 卷积层设计3.4 降采样层3.5 输出层设计 4 网络模型的总体结构5 部分实现代码6 在线手写识别7 最后 0 前言…

Android Studio Giraffe控制台乱码

这几天在使用Android Studio Giraffe进行一个App的开发&#xff0c;在项目构建的时候&#xff0c;控制台输出中文都是乱码&#xff0c;看着很不爽&#xff0c;进行了两项配置&#xff0c;中文就可以正常输出了&#xff0c;看起来就爽多了。 第一个配置&#xff1a;点击Help菜单…

系统架构设计专业技能 · 信息系统基础

系列文章目录 系统架构设计专业技能 网络技术&#xff08;三&#xff09; 系统架构设计专业技能 系统安全分析与设计&#xff08;四&#xff09;【系统架构设计师】 系统架构设计高级技能 软件架构设计&#xff08;一&#xff09;【系统架构设计师】 系统架构设计高级技能 …

这些Linux基础命令你总得掌握吧

B站|公众号&#xff1a;啥都会一点的研究生 写在前面 很多深度学习/机器学习/数据分析等领域&#xff08;或者说大多数在Python环境下进行操作的领域&#xff09;的初学者入门时是在Windows上进行学习&#xff0c;也得益于如Anaconda等工具把环境管理做的如此友善 但如果想在…

【Unity每日一记】SceneManager场景资源动态加载

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;元宇宙-秩沅 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 秩沅 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a;uni…

【CAM】CAM(Class Activation Mapping)——可视化CNN的特征定位

文章目录 一、CAM(Class Activation Mapping)二、CAM技术实现2.1 网络修改2.2 微调2.2 特征提取 三、总结Reference 完整代码见Github &#xff1a;https://github.com/capsule2077/CAM-Visualization &#xff0c;如果有用可以点个Star&#xff0c;谢谢&#xff01; 一、CAM(C…

视频转云存的痛点

现在运营商体系里面&#xff0c;有大量的视频转云存储的需求&#xff0c;但是视频云存储有一个比较大的痛点&#xff0c;就是成本&#xff01; 成本一&#xff1a;存储成本&#xff1b; 我们以1000路2M视频转云存&#xff0c;存储时间为90天为例&#xff08;B端存储时间有时候…

Java | IDEA中 jconsole 不是内部或外部命令,也不是可运行的程序

解决办法&#xff1a; 1.先将Terminal的Shell path 修改为C:\WINDOWS\system32\cmd.exe 2.在检查环境变量中的ComSpec的值 3.找到自己电脑下载的jdk的bin的地址 4.将jdk的bin地址加入到系统变量path中

仪表板展示 | DataEase看中国:2023年中国电影市场分析

背景介绍 随着《消失的她》、《变形金刚&#xff1a;超能勇士崛起》、《蜘蛛侠&#xff1a;纵横宇宙》、《我爱你》等国内外影片的上映&#xff0c;2023年上半年的电影市场也接近尾声。据国家电影专资办初步统计&#xff0c;上半年全国城市院线票房达262亿元&#xff0c;已经超…

Mariadb高可用MHA (四十二)

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 目录 前言 一、概述 1.1 概念 1.2 组成 1.3 特点 1.4 工作原理 二、构建MHA 2.1 ssh免密登录 2.2 主从复制 2.3 MHA安装 2.3.1所有节点安装perl环境 2.3..2 node 2.3.…

SpringBoot + Vue 微人事权限组管理模块 (十四)

权限组前端页面制作 权限组管理角色和菜单之间关系&#xff0c;操作员管理着用户和角色之间的关系。 英文的输入框要有个前缀&#xff0c;SpringSecurity里角色英文名需要加一个ROLE_的前缀 上代码 <div><div class"permissManaTool"><el-input pla…

完全备份、增量备份、差异备份、binlog日志

Top NSD DBA DAY06 案例1&#xff1a;完全备份与恢复案例2&#xff1a;增量备份与恢复案例3&#xff1a;差异备份与恢复案例4&#xff1a;binlog日志 1 案例1&#xff1a;完全备份与恢复 1.1 问题 练习物理备份与恢复练习mysqldump备份与恢复 1.2 方案 在数据库服务器192…

【图论】Floyd算法

一.简介 Floyd算法&#xff0c;也称为Floyd-Warshall算法&#xff0c;是一种用于解决所有节点对最短路径问题的动态规划算法。它可以在有向图或带权图中找到任意两个节点之间的最短路径。 Floyd算法的基本思想是通过中间节点逐步优化路径长度。它使用一个二维数组来存储任意两…

java面试基础 -- ArrayList 和 LinkedList有什么区别, ArrayList和Vector呢?

目录 基本介绍 有什么不同?? ArrayList的扩容机制 ArrayLIst的基本使用 ArrayList和Vector 基本介绍 还记得我们的java集合框架吗, 我们来复习一下, 如图: 可以看出来 ArrayList和LinkedList 都是具体类, 他们都是接口List的实现类. 但是他们底层的逻辑是不同的, 相信…

RabbitMq交换机类型介绍

RabbitMq交换机类型介绍 在RabbitMq中&#xff0c;生产者的消息都是通过交换器来接收&#xff0c;然后再从交换器分发到不同的队列&#xff0c;再由消费者从队列获取消息。这种模式也被成为“发布/订阅”。 分发的过程中交换器类型会影响分发的逻辑。 直连交换机&#xff1a…

【Go】Go 文本匹配 - 正则表达式基础与编程中的应用 (8000+字)

正则表达式&#xff08;Regular Expression, 缩写常用regex, regexp表示&#xff09;是计算机科学中的一个概念&#xff0c;很多高级语言都支持正则表达式。 目录 何为正则表达式 语法规则 普通字符 字符转义 限定符 定位符 分组构造 模式匹配 regexp包 MatchString…

什么是Eureka?以及Eureka注册服务的搭建

导包 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0"xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org/POM/4.0.0 htt…

LeetCode_Java_2236. 判断根结点是否等于子结点之和

2236. 判断根结点是否等于子结点之和 给你一个 二叉树 的根结点 root&#xff0c;该二叉树由恰好 3 个结点组成&#xff1a;根结点、左子结点和右子结点。 如果根结点值等于两个子结点值之和&#xff0c;返回 true &#xff0c;否则返回 false 。 示例1 输入&#xff1a;roo…

java八股文面试[JVM]——JVM内存结构

参考&#xff1a; JVM学习笔记&#xff08;一&#xff09;_卷心菜不卷Iris的博客-CSDN博客 JVM是运行在操作系统之上的&#xff0c;它与硬件没有直接的交互 JVM内存结构&#xff1a; 方法区&#xff1a;存储已被虚拟机加载的类元数据信息(元空间) 堆&#xff1a;存放对象实…

SpringBoot项目集成ElasticSearch服务

本文已收录于专栏 《中间件合集》 目录 版本介绍背景介绍优势说明集成过程1.引入依赖2.添加配置文件3.初始化 示例说明代码结果 总结提升 版本介绍 Spring boot的版本是&#xff1a; 2.3.12   ElasticSearch的版本是&#xff1a;7.6.2 背景介绍 在我们的项目中经常会遇到对于…