一、运行环境介绍
Flink
执行环境主要分为本地环境和集群环境,本地环境主要为了方便用户编写和调试代码使用,而集群环境则被用于正式环境中,可以借助Hadoop Yarn
、k8s
或Mesos
等不同的资源管理器部署自己的应用。
环境依赖:
【1】JDK
环境:Flink
核心模块均使用 Java开发,所以运行环境需要依赖JDK
,JDK
版本需要保证在1.8
以上。
【2】Maven
编译环境:Flink
的源代码目前仅支持通过 Maven进行编译,所以如果需要对源代码进行编译,或通过IDE
开发Flink Application
,则建议使用Maven
作为项目工程编译方式。需要注意的是,Flink
程序需要Maven
的版本在3.0.4
及以上,否则项目编译可能会出问题,建议用户根据要求进行环境的搭建。
【3】IDEA
:需要安装scala
插件以及scala
环境等;
二、Flink项目 Scala版 DataSet 有界流
需求:同进文件文件中的单词出现的次数;
【1】创建Maven
项目,pom.xml
文件中配置如下依赖
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.10.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-scala -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.10.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 该插件用于将Scala代码编译成class文件 -->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.4.6</version>
<executions>
<execution>
<goals>
<!--声明绑定到 maven 的compile阶段-->
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
【2】resource
目录中添加需要进行统计的文件文件及内容
【3】WordCount.java
文件内容如下,需要注意隐私转换问题,需要引入scala._
import org.apache.flink.api.scala._
/**
* @Description 批处理 word count
* @Author zhengzhaoxiang
* @Date 2020/7/12 18:55
* @Param
* @Return
*/
object WordCount {
def main(args: Array[String]): Unit = {
//创建一个批处理的执行环境
val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
//从文件中读取数据
var inputDateSet: DataSet[String] = env.readTextFile("E:\\Project\\flink\\src\\main\\resources\\wordcount.txt")
//基于Dataset 做转换,首先按空格打散,然后按照 word作为key做group by
val resultDataSet: DataSet[(String,Int)] = inputDateSet
.flatMap(_.split(" "))//分词得到所有 word构成的数据集
.map((_,1))//_表示当前 word 转换成一个二元组(word,count)
.groupBy(0)//以二元组中第一个元素作为key
.sum(1) //1表示聚合二元组的第二个元素的值
//打印输出
resultDataSet.print()
}
}
【4】统计结果展示:
三、Flink项目 Scala版 DataStream 无界流
【1】StreamWordCount.java
文件内容如下
package com.zzx.flink
import org.apache.flink.streaming.api.scala._
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 创建一个流处理执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 接受 socket 文本流
val inputDataStream: DataStream[String] = env.socketTextStream("hadoop1",6666);
//定义转换操作 word count
val resultDataStream: DataStream[(String,Int)] = inputDataStream
.flatMap(_.split(" "))//以空格分词,得到所有的 word
.filter(_.nonEmpty)
.map((_,1))//转换成 word count 二元组
.keyBy(0)//按照第一个元素分组
.sum(1)//按照第二个元素求和
resultDataStream.print()
//上面的只是定义了处理流程,同时定义一个名称。不会让任务结束
env.execute("stream word count word")
}
}
【2】我这里在Hadoop1
中通过nc -lk xxx
打开一个socket
通信
【3】查看IDEA
输出统计内容如下:输出word
的顺序不是按照输入的顺序,是因为它有并行度(多线程)是并行执行的。最前面的数字是并行子任务的编号类似线程号。最大的数字其实跟你cpu核数是息息相关的。这个并行度也可以通过env.setParallelism
进行设置。我们也可以给每一个任务(算子)设置不同的并行度;
【4】当我们需要将Java
文件打包上传到Flink
的时候,这里的host
和port
可以从参数中进行获取,代码修改如下:
package com.zzx.flink
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
object StreamWordCount {
def main(args: Array[String]): Unit = {
// 创建一个流处理执行环境
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
// 接受 socket 文本流 hostname:prot 从程序运行参数中读取
val params: ParameterTool = ParameterTool.fromArgs(args);
val hostname: String = params.get("host");
val port: Int = params.getInt("port");
val inputDataStream: DataStream[String] = env.socketTextStream(hostname,port);
//定义转换操作 word count
val resultDataStream: DataStream[(String,Int)] = inputDataStream
.flatMap(_.split(" "))//以空格分词,得到所有的 word
.filter(_.nonEmpty)
.map((_,1))//转换成 word count 二元组
.keyBy(0)//按照第一个元素分组
.sum(1)//按照第二个元素求和
resultDataStream.print()
//上面的只是定义了处理流程,同时定义一个名称。不会让任务结束
env.execute("stream word count word")
}
}