目录
一、简介
1.1 flink是什么
1.2 flink主要特点
核心特性:
分层API:
1.3 flink vs spark
1.3.1 数据处理框架
1.3.2 数据模型
1.3.3 运行时架构
二、wordcount实例
2.1 项目依赖
2.2 添加框架支持
2.3 批处理 - DataSet API
2.4 有界流处理wordcount
2.5 无界流处理wordcount
执行API
主机192.168.136.20
配置端口
三、Flink部署
3.1 flink集群中的主要组件
3.2 本地启动flink
1. 启动
2. 访问Web UI
3. 关闭flink
3.3 集群启动
1. flink-conf.yaml,修改jobManager
2. masters和worker
3. 分发安装目录
4. 启动集群
3.4 向集群提交作业
1. webUI提交作业
2. 命令行提交作业
3.5 部署模式
3.6 独立模式
3.7 yarn模式
启动集群
一、简介
1.1 flink是什么
apache flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行状态计算。
1.2 flink主要特点
类似于一个管道,数据处理完了之后能够及时输出,flink主要应用场景就是处理大规模的数据流。
核心特性:
1、高吞吐、低延迟,每秒处理百万个事件,毫秒级延迟
2、 结果的准确性。flink提供了事件时间(event-time)和处理时间(processing-time)。对于乱序事件流,事件事件语义仍然能提供一致且准确的结果。
3、 精确一次(ecatly-once)的状态一致性保证
4、 可以与常用存储系统连接。
5、 高可用,支持动态扩展
分层API:
约顶层越抽象,表达含义越简明,使用越方便
约底层越具体,表达含义越丰富,使用越灵活
1.3 flink vs spark
1.3.1 数据处理框架
1.3.2 数据模型
Spark采用RDD模型,spark streaming的DStream实际上也是一组组小批数据RDD的集合
flink基本数据模型是数据流,以及事件event序列
1.3.3 运行时架构
spark是批计算,将DAG划分为不同的stage,一个完成后才可以计算下一个
flink是标准的流执行模式,一个事件在一个节点处理完后可以直接发往下一个节点进行处理
二、wordcount实例
2.1 项目依赖
<properties>
<flink.version>1.13.0</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
</properties>
<dependencies>
<!-- 引入 Flink 相关依赖-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
2.2 添加框架支持
2.3 批处理 - DataSet API
package org.example.cp2
import org.apache.flink.api.scala._
object BatchWordCount {
def main(args: Array[String]): Unit = {
// 1. 创建一个执行环境
val env = ExecutionEnvironment.getExecutionEnvironment
// 2. 读取文本文件数据
val lineDataSet: DataSet[String] = env.readTextFile("input/word.txt")
// 3. 对数据集进行转换处理
val wordAndOne: DataSet[(String, Int)] = lineDataSet.flatMap(_.split(" ")).map(x=>(x,1))
// 4. 按照单词进行转换处理
val wordAndOneGroup: GroupedDataSet[(String, Int)] = wordAndOne.groupBy(0)
// 5. 对分组数据进行sum聚合统计
val sum: AggregateDataSet[(String, Int)] = wordAndOneGroup.sum(1)
// 6. 打印输出
sum.print()
}
}
这里需要导入 import org.apache.flink.api.scala._,否则会报下面的异常。
需要注意的是,这种代码的实现方式是基于 DataSet API 的。也就是说,是把对数据的处理转换看作数据集来进行操作的。flink是一种流、批一体的处理架构,对于数据批量处理的时候底层也是数据流,所以没有必要使用 DataSet API 去进行特别的处理。
所以官方推荐的用法是直接使用 DataStream API ,在提交任务的时候通过将执行模式设为 BATCH 来进行批处理。
2.4 有界流处理wordcount
和批处理不一样的地方在于,流处理的执行环境使用StreamExecutionEnvironment,批处理使用ExecutionEnvironment。
package org.example.cp2
import org.apache.flink.api.scala.{AggregateDataSet, DataSet, GroupedDataSet}
import org.apache.flink.streaming.api.scala._
object BoundedStreamWordCount {
def main(args: Array[String]): Unit = {
// 1. 创建一个流式执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2. 读取文本文件数据
val lineDataStream: DataStream[String] = env.readTextFile("input/word.txt")
// 3. 对数据集进行转换处理
val wordAndOne = lineDataStream.flatMap(_.split(" ")).map(x=>(x,1))
// 4. 按照单词进行转换处理
val wordAndOneGroup = wordAndOne.keyBy(_._1)
// 5. 对分组数据进行sum聚合统计
val sum = wordAndOneGroup.sum(1)
// 6. 打印输出
sum.print()
// 执行任务
env.execute()
}
}
2.5 无界流处理wordcount
执行API
package org.example.cp2
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 1. 创建一个执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 2. 读取文本文件数据
val lineDataSet = env.socketTextStream("192.168.136.20",7777)
// 3. 对数据集进行转换处理
val wordAndOne = lineDataSet.flatMap(_.split(" ")).map(x=>(x,1))
// 4. 按照单词进行转换处理
val wordAndOneGroup = wordAndOne.keyBy(_._1)
// 5. 对分组数据进行sum聚合统计
val sum = wordAndOneGroup.sum(1)
// 6. 打印输出
sum.print()
// 执行任务
env.execute()
}
}
程序启动之后没有任何输出, 也不会退出。这是由于flink的流处理是事件驱动的,当前程序会一直处于监听状态。只有接收到数据才会执行任务、输出统计结果。
主机192.168.136.20
[root@Hadoop20 hadoop]# nc -lk 7777
输出的结果和读取文件的流处理十分相似。每输入一条数据,就有一次对应的输出。这里的数字表示的是线程数,默认为CPU的核数。当输入的数据足够多的时候,从1-12所有的核数都会占据。
监听的端口不会写死, 可以将主机名和端口号配置在外面。在flink代码中有 parameterTool 工具用来读取。
配置端口
--host 192.168.136.20 --port 7777
// 固定端口
val lineDataSet = env.socketTextStream("192.168.136.20",7777)
// 配置端口
val parameterTool = ParameterTool.fromArgs(args)
val hostname = parameterTool.get("host")
val port = parameterTool.getInt("port")
val lineDataStream = env.socketTextStream(hostname,port)
三、Flink部署
3.1 flink集群中的主要组件
包括客户端(Client)、作业管理器(JobManager)和任务管理器(TaskManager)。代码编写完之后,由客户端获取并做转换,发送给JobManager,也就是Flink集群的管理者,对作业进行中央调度管理。对作业进行转换后,将任务分发给众多的TaskManager,由TaskManager对数据进行实际的处理。
详细流程:
3.2 本地启动flink
1. 启动
[root@Hadoop20 flink113]# ./bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host Hadoop20.
Starting taskexecutor daemon on host Hadoop20.
[root@Hadoop20 flink113]# jps
18096 Jps
2948 NodeManager
17700 StandaloneSessionClusterEntrypoint
2117 NameNode
2773 ResourceManager
3445 RunJar
3319 RunJar
17975 TaskManagerRunner
2284 DataNode
2. 访问Web UI
启动成功后,访问192.168.136.20:8081,可以对flink集群和任务进行监控管理。
3. 关闭flink
[root@Hadoop20 flink113]# ./bin/stop-cluster.sh
Stopping taskexecutor daemon (pid: 17975) on host Hadoop20.
Stopping standalonesession daemon (pid: 17700) on host Hadoop20.
3.3 集群启动
启动的命令和配置没有变化,需要对主从关系 masters 和 workers 进行配置。
1. flink-conf.yaml,修改jobManager
[root@Hadoop20 flink113]# cd ./conf/
[root@Hadoop20 conf]# vim flink-conf.yaml
// 修改本机地址
jobmanager.rpc.address: 192.168.136.20
2. masters和worker
[root@Hadoop20 conf]# vim masters
192.168.136.20:8081
[root@Hadoop20 conf]# vim workers
192.168.136.21
192.168.136.22
3. 分发安装目录
$ scp -r ./flink-1.13.0 root@xsqone21:/opt/module
$ scp -r ./flink-1.13.0 root@xsqone22:/opt/module
4. 启动集群
[root@Hadoop20 flink113]# ./bin/start-cluster.sh
3.4 向集群提交作业
IDEA中的默认的打包对scala代码打包效果不是很好,所以会引入打包插件。
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
1. webUI提交作业
2. 命令行提交作业
上传jar包
[root@Hadoop20 flink113]# ./bin/flink run -m 192.168.136.20:8081 -c org.example.cp2.StreamWordCount -p 2 ./FlinkTutorial-1.0-SNAPSHOT.jar --host 192.168.136.20 --port 7777
1. 查看当前运行的作业
[root@Hadoop20 flink113]# ./bin/flink list
Waiting for response...
No running jobs.
No scheduled jobs.
2. 查看所有的运行作业
[root@Hadoop20 flink113]# ./bin/flink list -a
Waiting for response...
No running jobs.
No scheduled jobs.
3. 取消当前作业
[root@Hadoop20 flink113]# ./bin/flink cancel [jobID]
3.5 部署模式
flink主要有三种部署模式:会话模式、单作业模式、应用模式。
会话模式:
先启动一个集群,保持一个会话。在这个会话中通过客户端提交作业。集群启动时所有资源都已经确定,所有提交的作业会竞争集群中的资源。
会话模式适合单个规模小、执行时间短的大量作业。
单作业模式:
会话模式因为资源共享会导致很多问题。为了隔离资源,为每个提交的作业启动一个集群,就是单作业模式。
单作业模式无法直接启动,需要借助一些资源管理平台来启动集群,如yarn。
应用模式:
不管是会话模式还是单作业模式,应用代码都是在客户端执行,由客户端提交给 jobManager 。这种方式客户端需要占用大量网络带宽,用来下载依赖和向jobManager上发送数据。而往往我们提交作业用的是同一个客户端,就会加重客户端所在节点的资源消耗。
于是应用模式直接由JobManager执行应用程序,而不是通过客户端。这意味着,我们需要为每一个提交的单独应用启动一个JobManager,也就是创建一个集群。这个JobManager只为执行这一个应用而存在,执行结束之后JobManager就会关闭。
3.6 独立模式
独立运行,不依赖任何外部的资源管理平台,是部署flink最基本的方式。但是出现资源不足或者出现故障时,没有自动扩展或者重分配资源的保证,所以只能在开发测试等非常少的场景。
3.7 yarn模式
yarn上的部署过程就是:客户端把flink应用提交给yarn的ResourceManager,Yarn的ResourceManager会向NodeManager申请容器。在容器上部署flink的JobManager和TaskManager实例,从而启动集群。
启动集群
$ bin/yarn-session.sh -nm test
可用参数解读:
-d:分离模式。即使关掉当前对话窗口,YARN session也可以在后台运行
-jm(--jobManagerMemory):配置jobManager所需内存,默认单位为MB
-nm(--name):配置在YARN UI界面上显示的任务名
-qu(--queue):指定YARN队列名
-tm(--taskManager):配置每个TaskManager所使用内存