一 相关大数据概念
1.1 根据时间
1.实时计算:
数据实时处理,结果实时存储
是一种持续、低时延、事件触发的任务
2.离线计算:
数据延迟处理,结果N+1模式(昨天的数据今天存储)
是一种批量、高时延、主动发起的计算任务
1.2 处理方式
1.流式处理:
一次处理一条或者少量;状态小
2.批量处理:
处理大量数据;处理完返回最终结果
二 Flink的架构以及工作原理
2.1 相关概念
Apache Flink 是一个实时计算框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
2.2 特性
1.支持高吞吐、低延迟、高性能的流处理
2.支持带有事件时间的窗口(Window)操作
3.支持有状态计算的Exactly-once语义
4.支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作
5.支持具有反压功能的持续流模型
6.支持基于轻量级分布式快照(Snapshot)实现的容错
7.一个运行时同时支持Batch on Streaming处理和Streaming处理
8.Flink在JVM内部实现了自己的内存管理,避免了出现oom
9.支持迭代计算
10.支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存
2.3 wordConut 代码
2.3.1 流处理
1.流处理的模式
setRuntimeMode(RuntimeExecutionMode.STREAMING)(持续流模型)
2.读取文件的方法
无界流:socketTextStream
有界流:readTextFile
3.输出结果是:连续型的(累加的)
package com.shujia.flink.core;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Demo1StreamWordCount {
public static void main(String[] args)throws Exception {
//1、创建flink的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置任务的并行度;一个并行度相当于一个task
//这个不给值默认值是电脑的核数
env.setParallelism(2);
//数据从上游发送到下游的延迟时间,默认200毫秒
env.setBufferTimeout(200);
//2、读取数据
//nc -lk 8888
//DataStreamSource是DataStream的子类,本来读文件返回的类型是DataStreamSource
//但是为了整洁使用DataStream
DataStream<String> wordDS = env.socketTextStream("master", 8888);
//统计单词数量
//SingleOutputStreamOperator是DataStream的子类,map返回的类型是SingleOutputStreamOperator
//但是为了整洁使用DataStream
/*
使用的是Tuple2类中的of方法,转化成kv形式的二元组
但是这样后面需要加上二元组的类型,需要手动加入
Types:是flink中的类
*/
DataStream<Tuple2<String, Integer>> kvDS = wordDS.map(word
-> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));
//3、统计单词的数量
KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);
//对下标为1的列求和
//SingleOutputStreamOperator是DataStream的子类,sum返回的类型是SingleOutputStreamOperator
//但是为了整洁使用DataStream
DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);
//打印数据
countDS.print();
//启动flink
env.execute();
}
}
2.3.2 批处理
1.RuntimeExecutionMode.BATCH(MR模型)
2.批处理模式只能用于处理有界流
3.输出结果是最终结果
package com.shujia.flink.core;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Demo2BatchWordCount {
public static void main(String[] args)throws Exception {
//1、创建flink的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//修改处理模式
/*
*处理模式
* RuntimeExecutionMode.BATCH:批处理模式(MapReduce模型)
* 1、输出最终结果
* 2、批处理模式只能用于处理有界流
*
* RuntimeExecutionMode.STREAMING:流处理模式(持续流模型)
* 1、输出连续结果
* 2、流处理模式,有界流核无界流都可以处理
*/
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
//设置任务的并行度;一个并行度相当于一个task
//这个不给值默认值是电脑的核数
env.setParallelism(2);
//数据从上游发送到下游的延迟时间,默认200毫秒
env.setBufferTimeout(200);
//2、读取文件--有界流
//DataStreamSource是DataStream的子类,本来读文件返回的类型是DataStreamSource
//但是为了整洁使用DataStream
DataStream<String> wordDS = env.readTextFile("filnk/data/words.txt");
//统计单词数量
//SingleOutputStreamOperator是DataStream的子类,map返回的类型是SingleOutputStreamOperator
//但是为了整洁使用DataStream
/*
使用的是Tuple2类中的of方法,转化成kv形式的二元组
但是这样后面需要加上二元组的类型,需要手动加入
Types:是flink中的类
*/
DataStream<Tuple2<String, Integer>> kvDS = wordDS.map(word
-> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));
//3、统计单词的数量
KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);
//对下标为1的列求和
//SingleOutputStreamOperator是DataStream的子类,sum返回的类型是SingleOutputStreamOperator
//但是为了整洁使用DataStream
DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);
//打印数据
countDS.print();
//启动flink
env.execute();
}
}
2.4 原理
1.创建环境 socket
2.使用socketTextStream读取文件,形成一个task任务(因为读取文件只支持单线程,所以只有一个task)
3.map是分组,形成两个task任务
4.keyBy,分组(相同的key被分到一个task中),里面有一个shuffle,也是两个task任务。这里之前被称为上游,之后称为下游。
5.sum。求和。将keyBy里面已经分好的task任务里面的元素求和
6.打印。
spark与flink的区别
spark是MR模型,先执行map task 再执行reduce task。MR模型可以再map端进行预聚合,shuffle减少了数据的传输
flink是持续流模型,上游task与下游tsak同时启动,等待数据到达,每一条数据都会进行计算,出来结果。
三 source
3.1 集合source
1.将java中的集合变成一个source
package com.shujia.flink.source;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
public class Demo1ListSource {
public static void main(String[] args)throws Exception {
//创建flink环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置任务的并行度;一个并行度相当于一个task
//这个不给值默认值是电脑的核数
env.setParallelism(2);
//数据从上游发送到下游的延迟时间,默认200毫秒
env.setBufferTimeout(200);
ArrayList<String> list = new ArrayList<>();
list.add("java");
list.add("java");
list.add("java");
list.add("java");
list.add("java");
list.add("java");
//
DataStream<String> listDS = env.fromCollection(list);
listDS.print();
env.execute();
}
}
3.2 fileSource
3.2.1.fileSource的有界流读法
1.创建一个FileSource对象。使用forRecordStreamFormat方法,里面传的参数是格式与编码的对象TextLineInputFormat,与读取文件路径的对象Path。然后整体使用built创建
2.无界流读取的是文件
3.使用fileSource。使用flink环境对象env中的fromSource方法,里面传的三个参数是:
fileSource对象,WatermarkStrategy.noWatermarks(),与数据source的名字。这里创建的对象是DS对象
4.代码
package com.shujia.flink.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Demo2FileSource {
public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置任务的并行度;一个并行度相当于一个task
//这个不给值默认值是电脑的核数
env.setParallelism(2);
//数据从上游发送到下游的延迟时间,默认200毫秒
env.setBufferTimeout(200);
/*
* 1、老版本读取文件 -- 有界流
*/
DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
// linesDS.print();
/*
*2、型版本 --可有界读取也可以无界读取
*/
//构建fileSource
FileSource<String> fileSource = FileSource.
forRecordStreamFormat(
new TextLineInputFormat("UTF-8"),
new Path("flink/data/students.csv")
)
.build();
//使用fileSource
DataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");
fileDS.print();
env.execute();
}
}
5.结果
3.2.2 fileSource的无界流读法
1.与有界流读法的差别就是在创建FileSource对象时,多设置了一步,每隔一段时间读取目录下新的文件,构建无界流。方法是:
monitorContinuously(Duration.ofSeconds(5)),
2.这个读取是文件夹,所以设置forRecordStreamFormat里面传的参数Path读取的是文件夹
3.代码
package com.shujia.flink.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
public class Demo2FileSource {
public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置任务的并行度;一个并行度相当于一个task
//这个不给值默认值是电脑的核数
env.setParallelism(2);
//数据从上游发送到下游的延迟时间,默认200毫秒
env.setBufferTimeout(200);
/*
* 1、老版本读取文件 -- 有界流
*/
DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
// linesDS.print();
/*
*2、型版本 --可有界读取也可以无界读取
*/
//构建fileSource
FileSource<String> fileSource = FileSource.
forRecordStreamFormat(
new TextLineInputFormat("UTF-8"),
new Path("flink/data/stu")
)
//每隔一段时间读取目录下新的文件,构建无界流
.monitorContinuously(Duration.ofSeconds(5))
.build();
//使用fileSource
DataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");
fileDS.print();
env.execute();
}
}
4.结果
3.2.3 fileSource批流处理
1. 流批统一:同一套算子代码既能做流处理也能做批处理
同一个file数据源,既能有界读取也能无界读取
2.批处理(有界流)
package com.shujia.flink.source;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
public class Demo2FileSource {
public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置任务的并行度;一个并行度相当于一个task
//这个不给值默认值是电脑的核数
env.setParallelism(2);
//数据从上游发送到下游的延迟时间,默认200毫秒
env.setBufferTimeout(200);
//设置处理模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
/*
* 1、老版本读取文件 -- 有界流
*/
DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
// linesDS.print();
/*
*2、型版本 --可有界读取也可以无界读取
*/
//构建fileSource
FileSource<String> fileSource = FileSource.
forRecordStreamFormat(
new TextLineInputFormat("UTF-8"),
new Path("flink/data/stu")
)
//每隔一段时间读取目录下新的文件,构建无界流
// .monitorContinuously(Duration.ofSeconds(5))
.build();
//使用fileSource
DataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");
DataStream<Tuple2<String, Integer>> kvDS = fileDS.map(stu -> Tuple2.of(stu.split(",")[4], 1), Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);
DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);
countDS.print();
// fileDS.print();
env.execute();
}
}
运行结果:是最终结果
3.流处理(无界流)
package com.shujia.flink.source;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
public class Demo2FileSource {
public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//设置任务的并行度;一个并行度相当于一个task
//这个不给值默认值是电脑的核数
env.setParallelism(2);
//数据从上游发送到下游的延迟时间,默认200毫秒
env.setBufferTimeout(200);
//设置处理模式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
/*
* 1、老版本读取文件 -- 有界流
*/
DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
// linesDS.print();
/*
*2、型版本 --可有界读取也可以无界读取
*/
//构建fileSource
FileSource<String> fileSource = FileSource.
forRecordStreamFormat(
new TextLineInputFormat("UTF-8"),
new Path("flink/data/stu")
)
//每隔一段时间读取目录下新的文件,构建无界流
.monitorContinuously(Duration.ofSeconds(5))
.build();
//使用fileSource
DataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");
DataStream<Tuple2<String, Integer>> kvDS = fileDS.map(stu -> Tuple2.of(stu.split(",")[4], 1), Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);
DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);
countDS.print();
// fileDS.print();
env.execute();
}
}
结果是 连续型结果
3.3 自定义source
1.使用addSource方法,里面传入的是实现SourceFunction接口的对象,该接口有一个泛型
2.自己定义类实现SourceFunction,重写run()方法,run方法里面使用collect方法提交数据
package com.shujia.flink.source;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class Demo3MySource {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//使用自定义
//addSource里面需要传一个实现了SourceFunction接口类的对象
DataStream<Integer> linesDS = env.addSource(new MySource());
linesDS.print();
env.execute();
}
}
class MySource implements SourceFunction<Integer> {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while (true) {
//发送数据
//ctx是SourceContext对象
ctx.collect(100);
Thread.sleep(5);
}
}
//cancel方法再任务取消的时候执行,一般用于回收资源
@Override
public void cancel() {
}
}
3.实例:连接Mysql数据库,读取数据库其中一个表的内容
其实Student类中使用了小辣椒的插件,简化的代码的冗余度
package com.shujia.flink.source;
import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class Demo4MySQLSource {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//使用自定义Source
DataStream<Student> lineDS = env.addSource(new MySQlSource());
// lineDS.print();
DataStream<Tuple2<String, Integer>> kvDS = lineDS
.map(line ->
Tuple2.of(line.getClazz(), 1),
Types.TUPLE(Types.STRING, Types.INT)
);
DataStream<Tuple2<String, Integer>> countDS = kvDS
.keyBy(kv -> kv.f0)
.sum(1);
countDS.print();
env.execute();
}
}
class MySQlSource implements SourceFunction<Student>{
@Override
public void run(SourceContext<Student> sourceContext) throws Exception {
//加载mysql驱动
Class.forName("com.mysql.jdbc.Driver");
//创建数据库连接对象
Connection connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useSSL=false","root","123456");
//编写查询语句
PreparedStatement sta = connection.prepareStatement("select * from students");
//执行查询
ResultSet resultSet = sta.executeQuery();
//解析数据
while (resultSet.next()){
int id = resultSet.getInt("id");
String name = resultSet.getString("name");
int age = resultSet.getInt("age");
String gender = resultSet.getString("gender");
String clazz = resultSet.getString("clazz");
//将数据发送到下游
sourceContext.collect(new Student(id,name,age,gender,clazz));
}
//关闭查询连接
sta.close();
//关闭数据库连接
connection.close();
}
@Override
public void cancel() {
}
}
/**
* lombok插件作用:可以再代码编译时动态给代码增加方法(相当于scala中的case class)
* /@Getter
* /@Setter
* 上面两个可以和起来成为一个@Data:这个是除了构造方法,其他方法都有,toString等等
* / @AllArgsConstructor:这个是构造方法
*/
@Data
@AllArgsConstructor
class Student{
private Integer id;
private String name;
private Integer age;
private String gender;
private String clazz;
}
四 算子
4.1 map
1.map里面需要传入一个实现MapFunction的接口的对象,而MapFunction继承与Function。
2.重写map方法
package com.shujia.flink.tf;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Demo1Map {
public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> linesDS = env.socketTextStream("master", 8888);
/*
* 1、使用匿名内部类的方式
*/
//map里面需要传入一个MapFunction的对象
//public interface MapFunction<T, O> extends Function
SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = linesDS
.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return Tuple2.of(value, 1);
}
});
// kvDS.print();
/*
* 2、使用lambda表达式的方式 -- 简化的写法
*/
linesDS.map(word->Tuple2.of(word,1), Types.TUPLE(Types.STRING,Types.INT)).print();
env.execute();
}
}
4.2 flatMap
1.flapMap里面传入的是实现FlatMapFunction的接口的对象,而FlatMapFunction继承与Function。
2.重写flatMap方法,里面需要传两个参数,一个是传进来的数据,另一个是发送数据的接口类Collector。故使用lambda表达式时,->前面需要传2个值。
3.使用lambda表达式,最后要指定返回的类型,要不然会报错。
4.无论是lambda表达式还是用匿名内部类,里面的主要步骤还是循环发送数据到下游。这里的数据要循环,但是循环之前怎么做都可以,切分啊 转化啊等等。
package com.shujia.flink.tf;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class Demo2FlatMap {
public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> linesDs = env.socketTextStream("master", 8888);
//flatMap(FlatMapFunction<T, R> flatMapper)
//public interface FlatMapFunction<T, O> extends Function
DataStream<String> flapMapDS = linesDs
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
for (String word : line.split(",")) {
out.collect(word);
}
}
});
// flapMapDS.print();
linesDs.flatMap( (line, out) -> {
for (String word : line.split(",")) {
out.collect(word);
}
}, Types.STRING).print();
env.execute();
}
}
4.3 filter
1.filter里面传入的是实现FilterFunction的接口的对象,而FilterFunction继承与Function。
2.lambda表达式中不需要传入返回值类型,因为他是boolean的值。
package com.shujia.flink.tf;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Demo3Filter {
public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
//filter(FilterFunction<T> filter)
//public interface FilterFunction<T> extends Function
// boolean filter(T value)
DataStream<String> filterDS = linesDS
.filter(new FilterFunction<String>() {
@Override
public boolean filter(String value) throws Exception {
String clazz = value.split(",")[4];
return "文科一班".equals(clazz);
}
});
// filterDS.print();
linesDS.filter(line->"文科一班".equals(line.split(",")[4])).print();
env.execute();
}
}
4.4 keyBy
1.keyBy里面传入的是实现KeySelector接口的对象,KeySelector继承于Function
2.重写getKey方法,直接返回参数即可
3.他的作用就是将相同key发送到同一个task任务里面
4.他的返回值是kv形式的DS
package com.shujia.flink.tf;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Demo4KeyBy {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> linesDS = env.socketTextStream("master", 8888);
//keyBy(KeySelector<T, K> key)
//KeySelector<IN, KEY> extends Function
KeyedStream<String, String> keyByDS = linesDS
.keyBy(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
});
//将相同的key发送到同一个task中
linesDS.keyBy(word->word).print();
env.execute();
}
}
4.5 reduce
1.他只能作用在kv形式的DS上
//reduce(ReduceFunction<T> reducer) //public interface ReduceFunction<T> extends Function
2.最后返回一个二元组的形式,无论是组合式还是lambda表达式中
3.reduce中传进来的两个kv形式的DS中,key都是一样的,然后将他们俩的values求值,最后返回他们共同的key,以及value的和
4.最后求和其实可以用sum的算子,里面传入的是1。他也是作用在kv形式的DS上
5.其中kv1是上一次计算的结果(状态),kv2是这次传入的值
package com.shujia.flink.tf;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class Demo5Reduce {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> linesDS = env.socketTextStream("master", 8888);
DataStream<Tuple2<String, Integer>> kvDS = linesDS
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : value.split(",")) {
out.collect(Tuple2.of(word, 1));
}
}
}, Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);
//reduce(ReduceFunction<T> reducer)
//public interface ReduceFunction<T> extends Function
DataStream<Tuple2<String, Integer>> reduceDS = keyByDS
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(
Tuple2<String, Integer> value1,
Tuple2<String, Integer> value2) throws Exception {
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
});
// reduceDS.print();
//使用lambda表达式
DataStream<Tuple2<String, Integer>> reduceDS1 = keyByDS.reduce((kv1, kv2) -> {
String word = kv1.f0;
int count = kv1.f1 + kv2.f1;
return Tuple2.of(word, count);
});
reduceDS1.print();
env.execute();
}
}
4.6 window
1.这里简单的说一下窗口,后面会细说。
2.也只能作用在kv形式的DS上
package com.shujia.flink.tf;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
public class Demo6Window {
public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lineDS = env.socketTextStream("master", 8888);
/*
* 每隔5秒统计最近15秒每个单词的数量 --- 滑动窗口
*/
//转换成kv
DataStream<Tuple2<String, Integer>> kvDS = lineDS
.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));
//按照单词分组
KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);
//划分窗口
//SlidingProcessingTimeWindows:滑动的处理时间窗口
//WindowedStream<T, KEY, W> window
WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS
.window(SlidingProcessingTimeWindows.of(Time.seconds(16), Time.seconds(5)));
//这个时间是每隔5秒处理一次15秒里窗口的数据,如果前15秒过去了,只会计算后面的15秒出现的数据
/*
* 比如 0-5-10-15-20-25,在0-5里面输入了hjx,hjx,java,那么结果是(hjx,2),(java,1)
* 然后在5-15里面输入了kkk,lll,hjx,结果是(hjx,3),(java,1)(lll,1),(kkk,1)
* 但是这个时候执行了第二个窗口,5-20秒的窗口,里面数据只有kkk,lll,hjx
*
*/
windowDS.sum(1).print();
env.execute();
}
}
4.7 union
1.只是代码层面上的合并,其实数据没有合并
package com.shujia.flink.tf;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class Demo7Union {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> ds1 = env.socketTextStream("master", 8888);
DataStreamSource<String> ds2 = env.socketTextStream("master", 9999);
/*
* union:合并两个DS
* 在数据层面并没有合并,只是在逻辑层面合并了
*/
DataStream<String> unionDS = ds1.union(ds2);
unionDS.print();
env.execute();
}
}
4.8 process
1.是flink的底层算子,可以代替map,flatMap,filter。
package com.shujia.flink.tf;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
public class Demo8ProcessFunction {
public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> linesDS = env.socketTextStream("master", 8888);
//public abstract class ProcessFunction<I, O> extends AbstractRichFunction
// process(ProcessFunction<T, R> processFunction)
DataStream<Tuple2<String, Integer>> kvDS = linesDS.process(new ProcessFunction<String, Tuple2<String, Integer>>() {
@Override
public void processElement(String value,
ProcessFunction<String, Tuple2<String, Integer>>.Context ctx,
Collector<Tuple2<String, Integer>> out) throws Exception {
/*
*value:一行数据
* ctx:上下文对象
* out:将数据发送到下游
*/
for (String word : value.split(",")) {
out.collect(Tuple2.of(word, 1));
}
}
});
kvDS.print();
env.execute();
}
}
五 Sink
5.1 fileSink
1.记住核心参数就可以
写入的位置,编码:forRowFormat
指定策略:withRollingPolicy
package com.shujia.flink.sink;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import java.time.Duration;
public class Demo1FileSInk {
public static void main(String[] args) throws Exception{
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> linesDS = env.socketTextStream("master", 8888);
FileSink<String> fileSink = FileSink
//final Path basePath, final Encoder<IN> encoder
指定数据的格式以及存放的位置
.<String>forRowFormat(new Path("flink/data/words"),
new SimpleStringEncoder<>("UTF-8"))
//指定策略
//withRollingPolicy(final RollingPolicy<IN, String> policy)
.withRollingPolicy(
DefaultRollingPolicy.builder()
包含了至少10秒的数据量
.withRolloverInterval(Duration.ofSeconds(10))
从没接收延时10秒之外的新纪录
.withInactivityInterval(Duration.ofSeconds(10))
//文件大小已经达到 1MB(写入最后一条记录之后)
.withMaxPartSize(MemorySize.ofMebiBytes(1))
.build()
).build();
//使用fileSink
linesDS.sinkTo(fileSink);
env.execute();
}
}
5.2 自定义sink
1.自定义sink需要实现SinkFunction接口,重写invoke方法
2.addSink里面传入的是一个实现了SinkFunction接口的类的对象
3.SinkFunction的泛型要与写入数据的类型一样
package com.shujia.flink.sink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
public class Demo2MySInk {
public static void main(String[] args)throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> linesDS = env.socketTextStream("master", 8888);
linesDS.addSink(new MySink());
env.execute();
}
}
class MySink implements SinkFunction<String>{
//自定义sink需要实现SinkFunction,重写invoke方法,
// invoke里面可以写数据保存的位置,也可以直接输出
@Override
public void invoke(String value, Context context) throws Exception {
System.out.println("自定义sink"+value);//自定义sink+端口里面输出的
}
}
5.3 实例
1.如果是实现SinkFunction接口,那么invoke方法每次都要执行一次连接数据库,浪费资源。所以自定义sink写入数据库时候,继承RichSinkFunction抽象类。里面的open与close只执行一次,刚刚好可以用来连接数据库与关闭数据库
package com.shujia.flink.sink;
import com.mysql.jdbc.Connection;
import com.mysql.jdbc.PreparedStatement;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.DriverManager;
public class Demo3MySqlSInk {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> linesDS = env.socketTextStream("master", 8888);
DataStream<Tuple2<String, Integer>> countDS = linesDS
.map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT))
.keyBy(kv -> kv.f0)
.sum(1);
countDS.addSink(new MySqlSInk());
env.execute();
}
}
/**
* 自定义sink将数据保存到mysql
* RichSinkFunction:多了open和close方法,用于打开和关闭连接
* SinkFunction
*/
class MySqlSInk extends RichSinkFunction<Tuple2<String, Integer>> {
Connection con;
PreparedStatement stat;
@Override
public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
stat.setString(1, value.f0);
stat.setInt(2, value.f1);
stat.execute();
}
@Override
public void open(Configuration parameters) throws Exception {
//创建数据库驱动
Class.forName("com.mysql.jdbc.Driver");
con = (Connection) DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useSSL=false", "root", "123456");
//replace into 替换插入,如果没有就插入,如果有就更新,表需要有主键
stat = (PreparedStatement) con.prepareStatement("replace into word_count values(?,?)");
}
@Override
public void close() throws Exception {
stat.close();
con.close();
}
}