Fiink的简单学习一

一 相关大数据概念

1.1 根据时间

1.实时计算:

数据实时处理,结果实时存储

是一种持续、低时延、事件触发的任务

2.离线计算:

数据延迟处理,结果N+1模式(昨天的数据今天存储)

是一种批量、高时延、主动发起的计算任务

1.2 处理方式

1.流式处理:

一次处理一条或者少量;状态小

2.批量处理:

处理大量数据;处理完返回最终结果

二 Flink的架构以及工作原理

2.1 相关概念

Apache Flink 是一个实时计算框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。

2.2 特性

1.支持高吞吐、低延迟、高性能的流处理

2.支持带有事件时间的窗口(Window)操作

3.支持有状态计算的Exactly-once语义

4.支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作

5.支持具有反压功能的持续流模型

6.支持基于轻量级分布式快照(Snapshot)实现的容错

7.一个运行时同时支持Batch on Streaming处理和Streaming处理

8.Flink在JVM内部实现了自己的内存管理,避免了出现oom

9.支持迭代计算

10.支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存

2.3 wordConut 代码

2.3.1 流处理

1.流处理的模式

setRuntimeMode(RuntimeExecutionMode.STREAMING)(持续流模型)

2.读取文件的方法

无界流:socketTextStream

有界流:readTextFile

3.输出结果是:连续型的(累加的)

package com.shujia.flink.core;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo1StreamWordCount {
    public static void main(String[] args)throws Exception {
        //1、创建flink的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //设置任务的并行度;一个并行度相当于一个task
        //这个不给值默认值是电脑的核数
        env.setParallelism(2);
        //数据从上游发送到下游的延迟时间,默认200毫秒
        env.setBufferTimeout(200);

        //2、读取数据
        //nc -lk 8888
        //DataStreamSource是DataStream的子类,本来读文件返回的类型是DataStreamSource
        //但是为了整洁使用DataStream
        DataStream<String> wordDS = env.socketTextStream("master", 8888);

        //统计单词数量
        //SingleOutputStreamOperator是DataStream的子类,map返回的类型是SingleOutputStreamOperator
        //但是为了整洁使用DataStream
        /*
          使用的是Tuple2类中的of方法,转化成kv形式的二元组
          但是这样后面需要加上二元组的类型,需要手动加入
          Types:是flink中的类
         */
        DataStream<Tuple2<String, Integer>> kvDS = wordDS.map(word
                -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));


        //3、统计单词的数量
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);

        //对下标为1的列求和
        //SingleOutputStreamOperator是DataStream的子类,sum返回的类型是SingleOutputStreamOperator
        //但是为了整洁使用DataStream
        DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);


        //打印数据
        countDS.print();

        //启动flink
        env.execute();


    }
}

2.3.2 批处理

1.RuntimeExecutionMode.BATCH(MR模型)

2.批处理模式只能用于处理有界流

3.输出结果是最终结果

package com.shujia.flink.core;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo2BatchWordCount {
    public static void main(String[] args)throws Exception {
        //1、创建flink的执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //修改处理模式
        /*
         *处理模式
         * RuntimeExecutionMode.BATCH:批处理模式(MapReduce模型)
         * 1、输出最终结果
         * 2、批处理模式只能用于处理有界流
         *
         * RuntimeExecutionMode.STREAMING:流处理模式(持续流模型)
         * 1、输出连续结果
         * 2、流处理模式,有界流核无界流都可以处理
         */
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        //设置任务的并行度;一个并行度相当于一个task
        //这个不给值默认值是电脑的核数
        env.setParallelism(2);
        //数据从上游发送到下游的延迟时间,默认200毫秒
        env.setBufferTimeout(200);

        //2、读取文件--有界流
        //DataStreamSource是DataStream的子类,本来读文件返回的类型是DataStreamSource
        //但是为了整洁使用DataStream
        DataStream<String> wordDS = env.readTextFile("filnk/data/words.txt");

        //统计单词数量
        //SingleOutputStreamOperator是DataStream的子类,map返回的类型是SingleOutputStreamOperator
        //但是为了整洁使用DataStream
        /*
          使用的是Tuple2类中的of方法,转化成kv形式的二元组
          但是这样后面需要加上二元组的类型,需要手动加入
          Types:是flink中的类
         */
        DataStream<Tuple2<String, Integer>> kvDS = wordDS.map(word
                -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));


        //3、统计单词的数量
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);

        //对下标为1的列求和
        //SingleOutputStreamOperator是DataStream的子类,sum返回的类型是SingleOutputStreamOperator
        //但是为了整洁使用DataStream
        DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);


        //打印数据
        countDS.print();

        //启动flink
        env.execute();


    }
}

2.4 原理

1.创建环境 socket

2.使用socketTextStream读取文件,形成一个task任务(因为读取文件只支持单线程,所以只有一个task)

3.map是分组,形成两个task任务

4.keyBy,分组(相同的key被分到一个task中),里面有一个shuffle,也是两个task任务。这里之前被称为上游,之后称为下游。

5.sum。求和。将keyBy里面已经分好的task任务里面的元素求和

6.打印。

 spark与flink的区别

spark是MR模型,先执行map task 再执行reduce task。MR模型可以再map端进行预聚合,shuffle减少了数据的传输

flink是持续流模型,上游task与下游tsak同时启动,等待数据到达,每一条数据都会进行计算,出来结果。

三 source

3.1 集合source

1.将java中的集合变成一个source

package com.shujia.flink.source;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;

public class Demo1ListSource {
    public static void main(String[] args)throws Exception {
        //创建flink环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置任务的并行度;一个并行度相当于一个task
        //这个不给值默认值是电脑的核数
        env.setParallelism(2);
        //数据从上游发送到下游的延迟时间,默认200毫秒
        env.setBufferTimeout(200);

        ArrayList<String> list = new ArrayList<>();
        list.add("java");
        list.add("java");
        list.add("java");
        list.add("java");
        list.add("java");
        list.add("java");
        //
        DataStream<String> listDS = env.fromCollection(list);
        listDS.print();
        env.execute();

    }
}

3.2 fileSource

3.2.1.fileSource的有界流读法

1.创建一个FileSource对象。使用forRecordStreamFormat方法,里面传的参数是格式与编码的对象TextLineInputFormat,与读取文件路径的对象Path。然后整体使用built创建

2.无界流读取的是文件

3.使用fileSource。使用flink环境对象env中的fromSource方法,里面传的三个参数是:

fileSource对象,WatermarkStrategy.noWatermarks(),与数据source的名字。这里创建的对象是DS对象

4.代码

package com.shujia.flink.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo2FileSource {
    public static void main(String[] args)throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置任务的并行度;一个并行度相当于一个task
        //这个不给值默认值是电脑的核数
        env.setParallelism(2);
        //数据从上游发送到下游的延迟时间,默认200毫秒
        env.setBufferTimeout(200);

        /*
         * 1、老版本读取文件  -- 有界流
         */
        DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
//        linesDS.print();

        /*
         *2、型版本  --可有界读取也可以无界读取
         */
        //构建fileSource
        FileSource<String> fileSource = FileSource.
                forRecordStreamFormat(
                        new TextLineInputFormat("UTF-8"),
                        new Path("flink/data/students.csv")
                )
                .build();
        //使用fileSource
        DataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");

        fileDS.print();
        env.execute();


    }
}

5.结果

 3.2.2 fileSource的无界流读法

1.与有界流读法的差别就是在创建FileSource对象时,多设置了一步,每隔一段时间读取目录下新的文件,构建无界流。方法是:

monitorContinuously(Duration.ofSeconds(5)),

2.这个读取是文件夹,所以设置forRecordStreamFormat里面传的参数Path读取的是文件夹

3.代码

package com.shujia.flink.source;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;

public class Demo2FileSource {
    public static void main(String[] args)throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置任务的并行度;一个并行度相当于一个task
        //这个不给值默认值是电脑的核数
        env.setParallelism(2);
        //数据从上游发送到下游的延迟时间,默认200毫秒
        env.setBufferTimeout(200);

        /*
         * 1、老版本读取文件  -- 有界流
         */
        DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
//        linesDS.print();

        /*
         *2、型版本  --可有界读取也可以无界读取
         */
        //构建fileSource
        FileSource<String> fileSource = FileSource.
                forRecordStreamFormat(
                        new TextLineInputFormat("UTF-8"),
                        new Path("flink/data/stu")
                )
                //每隔一段时间读取目录下新的文件,构建无界流
                .monitorContinuously(Duration.ofSeconds(5))
                .build();
        //使用fileSource
        DataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");

        fileDS.print();
        env.execute();


    }
}

4.结果

3.2.3 fileSource批流处理

1. 流批统一:同一套算子代码既能做流处理也能做批处理

                      同一个file数据源,既能有界读取也能无界读取 

2.批处理(有界流)

package com.shujia.flink.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;

public class Demo2FileSource {
    public static void main(String[] args)throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置任务的并行度;一个并行度相当于一个task
        //这个不给值默认值是电脑的核数
        env.setParallelism(2);
        //数据从上游发送到下游的延迟时间,默认200毫秒
        env.setBufferTimeout(200);
        //设置处理模式
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        /*
         * 1、老版本读取文件  -- 有界流
         */
        DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
//        linesDS.print();

        /*
         *2、型版本  --可有界读取也可以无界读取
         */
        //构建fileSource
        FileSource<String> fileSource = FileSource.
                forRecordStreamFormat(
                        new TextLineInputFormat("UTF-8"),
                        new Path("flink/data/stu")
                )
                //每隔一段时间读取目录下新的文件,构建无界流
//                .monitorContinuously(Duration.ofSeconds(5))
                .build();
        //使用fileSource
        DataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");

        DataStream<Tuple2<String, Integer>> kvDS = fileDS.map(stu -> Tuple2.of(stu.split(",")[4], 1), Types.TUPLE(Types.STRING, Types.INT));

        KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);

        DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);

        countDS.print();
//        fileDS.print();
        env.execute();


    }
}

 运行结果:是最终结果

3.流处理(无界流)

package com.shujia.flink.source;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;

public class Demo2FileSource {
    public static void main(String[] args)throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置任务的并行度;一个并行度相当于一个task
        //这个不给值默认值是电脑的核数
        env.setParallelism(2);
        //数据从上游发送到下游的延迟时间,默认200毫秒
        env.setBufferTimeout(200);
        //设置处理模式
        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);

        /*
         * 1、老版本读取文件  -- 有界流
         */
        DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");
//        linesDS.print();

        /*
         *2、型版本  --可有界读取也可以无界读取
         */
        //构建fileSource
        FileSource<String> fileSource = FileSource.
                forRecordStreamFormat(
                        new TextLineInputFormat("UTF-8"),
                        new Path("flink/data/stu")
                )
                //每隔一段时间读取目录下新的文件,构建无界流
                .monitorContinuously(Duration.ofSeconds(5))
                .build();
        //使用fileSource
        DataStream<String> fileDS = env.fromSource(fileSource, WatermarkStrategy.noWatermarks(), "fileSource");

        DataStream<Tuple2<String, Integer>> kvDS = fileDS.map(stu -> Tuple2.of(stu.split(",")[4], 1), Types.TUPLE(Types.STRING, Types.INT));

        KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);

        DataStream<Tuple2<String, Integer>> countDS = keyByDS.sum(1);

        countDS.print();
//        fileDS.print();
        env.execute();


    }
}

结果是 连续型结果 

 3.3 自定义source

1.使用addSource方法,里面传入的是实现SourceFunction接口的对象,该接口有一个泛型

2.自己定义类实现SourceFunction,重写run()方法,run方法里面使用collect方法提交数据

package com.shujia.flink.source;

import org.apache.flink.streaming.api.datastream.DataStream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;


public class Demo3MySource {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //使用自定义
        //addSource里面需要传一个实现了SourceFunction接口类的对象
        DataStream<Integer> linesDS = env.addSource(new MySource());

        linesDS.print();

        env.execute();


    }
}

class MySource implements SourceFunction<Integer> {

    @Override
    public void run(SourceContext<Integer> ctx) throws Exception {
        while (true) {
            //发送数据
            //ctx是SourceContext对象
            ctx.collect(100);
            Thread.sleep(5);
        }
    }
    //cancel方法再任务取消的时候执行,一般用于回收资源
    @Override
    public void cancel() {

    }
}

3.实例:连接Mysql数据库,读取数据库其中一个表的内容

其实Student类中使用了小辣椒的插件,简化的代码的冗余度

package com.shujia.flink.source;

import lombok.AllArgsConstructor;
import lombok.Data;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class Demo4MySQLSource {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //使用自定义Source
        DataStream<Student> lineDS = env.addSource(new MySQlSource());

//        lineDS.print();
        DataStream<Tuple2<String, Integer>> kvDS = lineDS
                .map(line ->
                        Tuple2.of(line.getClazz(), 1),
                        Types.TUPLE(Types.STRING, Types.INT)
                );

        DataStream<Tuple2<String, Integer>> countDS = kvDS
                .keyBy(kv -> kv.f0)
                .sum(1);

        countDS.print();

        env.execute();



    }
}

class MySQlSource implements SourceFunction<Student>{
    @Override
    public void run(SourceContext<Student> sourceContext) throws Exception {
        //加载mysql驱动
        Class.forName("com.mysql.jdbc.Driver");
        //创建数据库连接对象
        Connection connection = DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useSSL=false","root","123456");
        //编写查询语句
        PreparedStatement sta = connection.prepareStatement("select * from students");
        //执行查询
        ResultSet resultSet = sta.executeQuery();
        //解析数据
        while (resultSet.next()){
            int id = resultSet.getInt("id");
            String name = resultSet.getString("name");
            int age = resultSet.getInt("age");
            String gender = resultSet.getString("gender");
            String clazz = resultSet.getString("clazz");

            //将数据发送到下游
            sourceContext.collect(new Student(id,name,age,gender,clazz));
        }

        //关闭查询连接
        sta.close();
        //关闭数据库连接
        connection.close();
    }

    @Override
    public void cancel() {
    }
}

/**
 * lombok插件作用:可以再代码编译时动态给代码增加方法(相当于scala中的case class)
 * /@Getter
 * /@Setter
 * 上面两个可以和起来成为一个@Data:这个是除了构造方法,其他方法都有,toString等等
 * / @AllArgsConstructor:这个是构造方法
 */
@Data
@AllArgsConstructor
class Student{
    private Integer id;
    private String name;
    private Integer age;
    private String gender;
    private String clazz;
}

四 算子

4.1 map

1.map里面需要传入一个实现MapFunction的接口的对象,而MapFunction继承与Function。

2.重写map方法

package com.shujia.flink.tf;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo1Map {
    public static void main(String[] args)throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> linesDS = env.socketTextStream("master", 8888);

        /*
         * 1、使用匿名内部类的方式
         */
        //map里面需要传入一个MapFunction的对象
        //public interface MapFunction<T, O> extends Function
        SingleOutputStreamOperator<Tuple2<String, Integer>> kvDS = linesDS
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        return Tuple2.of(value, 1);
                    }
                 });

//        kvDS.print();

        /*
         * 2、使用lambda表达式的方式  -- 简化的写法
         */
        linesDS.map(word->Tuple2.of(word,1), Types.TUPLE(Types.STRING,Types.INT)).print();


        env.execute();



    }
}

4.2 flatMap

1.flapMap里面传入的是实现FlatMapFunction的接口的对象,而FlatMapFunction继承与Function。

2.重写flatMap方法,里面需要传两个参数,一个是传进来的数据,另一个是发送数据的接口类Collector。故使用lambda表达式时,->前面需要传2个值。

3.使用lambda表达式,最后要指定返回的类型,要不然会报错。

4.无论是lambda表达式还是用匿名内部类,里面的主要步骤还是循环发送数据到下游。这里的数据要循环,但是循环之前怎么做都可以,切分啊 转化啊等等。

package com.shujia.flink.tf;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class Demo2FlatMap {
    public static void main(String[] args)throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> linesDs = env.socketTextStream("master", 8888);
        
        //flatMap(FlatMapFunction<T, R> flatMapper)
        //public interface FlatMapFunction<T, O> extends Function
        DataStream<String> flapMapDS = linesDs
                .flatMap(new FlatMapFunction<String, String>() {
                    @Override
                    public void flatMap(String line, Collector<String> out) throws Exception {
                        for (String word : line.split(",")) {
                            out.collect(word);
                        }
                    }
                });

//        flapMapDS.print();
        linesDs.flatMap( (line, out) -> {
                    for (String word : line.split(",")) {
                        out.collect(word);
                    }
                }, Types.STRING).print();





        env.execute();





    }
}

4.3 filter

1.filter里面传入的是实现FilterFunction的接口的对象,而FilterFunction继承与Function。

2.lambda表达式中不需要传入返回值类型,因为他是boolean的值。

package com.shujia.flink.tf;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo3Filter {
    public static void main(String[] args)throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> linesDS = env.readTextFile("flink/data/students.csv");

        //filter(FilterFunction<T> filter)
        //public interface FilterFunction<T> extends Function
        // boolean filter(T value)
        DataStream<String> filterDS = linesDS
                .filter(new FilterFunction<String>() {
                    @Override
                    public boolean filter(String value) throws Exception {
                        String clazz = value.split(",")[4];
                        return "文科一班".equals(clazz);
                    }
                });
//        filterDS.print();
        linesDS.filter(line->"文科一班".equals(line.split(",")[4])).print();



        env.execute();



    }
}

 4.4 keyBy

1.keyBy里面传入的是实现KeySelector接口的对象,KeySelector继承于Function

2.重写getKey方法,直接返回参数即可

3.他的作用就是将相同key发送到同一个task任务里面

4.他的返回值是kv形式的DS

package com.shujia.flink.tf;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo4KeyBy {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<String> linesDS = env.socketTextStream("master", 8888);

        //keyBy(KeySelector<T, K> key)
        //KeySelector<IN, KEY> extends Function

        KeyedStream<String, String> keyByDS = linesDS
                .keyBy(new KeySelector<String, String>() {
                    @Override
                    public String getKey(String value) throws Exception {
                        return value;
                    }
                });
        //将相同的key发送到同一个task中
        linesDS.keyBy(word->word).print();


        env.execute();

    }
}

 4.5 reduce

1.他只能作用在kv形式的DS上

//reduce(ReduceFunction<T> reducer)
//public interface ReduceFunction<T> extends Function

2.最后返回一个二元组的形式,无论是组合式还是lambda表达式中

3.reduce中传进来的两个kv形式的DS中,key都是一样的,然后将他们俩的values求值,最后返回他们共同的key,以及value的和

4.最后求和其实可以用sum的算子,里面传入的是1。他也是作用在kv形式的DS上

5.其中kv1是上一次计算的结果(状态),kv2是这次传入的值

package com.shujia.flink.tf;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class Demo5Reduce {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> linesDS = env.socketTextStream("master", 8888);

        DataStream<Tuple2<String, Integer>> kvDS = linesDS
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        for (String word : value.split(",")) {
                            out.collect(Tuple2.of(word, 1));
                        }
                    }
                }, Types.TUPLE(Types.STRING, Types.INT));

        KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);

        //reduce(ReduceFunction<T> reducer)
        //public interface ReduceFunction<T> extends Function

        DataStream<Tuple2<String, Integer>> reduceDS = keyByDS
                .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> reduce(
                            Tuple2<String, Integer> value1,
                            Tuple2<String, Integer> value2) throws Exception {
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                });

//        reduceDS.print();

        //使用lambda表达式
        DataStream<Tuple2<String, Integer>> reduceDS1 = keyByDS.reduce((kv1, kv2) -> {
            String word = kv1.f0;
            int count = kv1.f1 + kv2.f1;
            return Tuple2.of(word, count);
        });
        reduceDS1.print();

        env.execute();


    }
}

4.6 window

1.这里简单的说一下窗口,后面会细说。

2.也只能作用在kv形式的DS上

package com.shujia.flink.tf;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

public class Demo6Window {
    public static void main(String[] args)throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> lineDS = env.socketTextStream("master", 8888);

        /*
         * 每隔5秒统计最近15秒每个单词的数量 --- 滑动窗口
         */
        //转换成kv
        DataStream<Tuple2<String, Integer>> kvDS = lineDS
                .map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT));

        //按照单词分组
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = kvDS.keyBy(kv -> kv.f0);

        //划分窗口
        //SlidingProcessingTimeWindows:滑动的处理时间窗口
        //WindowedStream<T, KEY, W> window
        WindowedStream<Tuple2<String, Integer>, String, TimeWindow> windowDS = keyByDS
                .window(SlidingProcessingTimeWindows.of(Time.seconds(16), Time.seconds(5)));
        //这个时间是每隔5秒处理一次15秒里窗口的数据,如果前15秒过去了,只会计算后面的15秒出现的数据
        /*
         * 比如 0-5-10-15-20-25,在0-5里面输入了hjx,hjx,java,那么结果是(hjx,2),(java,1)
         * 然后在5-15里面输入了kkk,lll,hjx,结果是(hjx,3),(java,1)(lll,1),(kkk,1)
         * 但是这个时候执行了第二个窗口,5-20秒的窗口,里面数据只有kkk,lll,hjx
         * 
         */

        windowDS.sum(1).print();

        env.execute();


    }
}

4.7 union

1.只是代码层面上的合并,其实数据没有合并

package com.shujia.flink.tf;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo7Union {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> ds1 = env.socketTextStream("master", 8888);
        DataStreamSource<String> ds2 = env.socketTextStream("master", 9999);

        /*
         * union:合并两个DS
         * 在数据层面并没有合并,只是在逻辑层面合并了
         */
        DataStream<String> unionDS = ds1.union(ds2);

        unionDS.print();

        env.execute();

    }
}

4.8 process

1.是flink的底层算子,可以代替map,flatMap,filter。

package com.shujia.flink.tf;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class Demo8ProcessFunction {
    public static void main(String[] args)throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> linesDS = env.socketTextStream("master", 8888);

        //public abstract class ProcessFunction<I, O> extends AbstractRichFunction
        // process(ProcessFunction<T, R> processFunction)
        DataStream<Tuple2<String, Integer>> kvDS = linesDS.process(new ProcessFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void processElement(String value,
                                       ProcessFunction<String, Tuple2<String, Integer>>.Context ctx,
                                       Collector<Tuple2<String, Integer>> out) throws Exception {

                /*
                 *value:一行数据
                 * ctx:上下文对象
                 * out:将数据发送到下游
                 */
                for (String word : value.split(",")) {
                    out.collect(Tuple2.of(word, 1));
                }
            }
        });

        kvDS.print();


        env.execute();

    }
}

五 Sink

5.1 fileSink

1.记住核心参数就可以

        写入的位置,编码:forRowFormat

        指定策略:withRollingPolicy

package com.shujia.flink.sink;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;

import java.time.Duration;

public class Demo1FileSInk {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> linesDS = env.socketTextStream("master", 8888);

        FileSink<String> fileSink = FileSink
                //final Path basePath, final Encoder<IN> encoder
                指定数据的格式以及存放的位置
                .<String>forRowFormat(new Path("flink/data/words"),
                        new SimpleStringEncoder<>("UTF-8"))
                //指定策略
                //withRollingPolicy(final RollingPolicy<IN, String> policy)
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                包含了至少10秒的数据量
                                .withRolloverInterval(Duration.ofSeconds(10))
                                从没接收延时10秒之外的新纪录
                                .withInactivityInterval(Duration.ofSeconds(10))
                                //文件大小已经达到 1MB(写入最后一条记录之后)
                                .withMaxPartSize(MemorySize.ofMebiBytes(1))
                                .build()
                ).build();


        //使用fileSink
        linesDS.sinkTo(fileSink);

        env.execute();

    }
}

5.2 自定义sink

1.自定义sink需要实现SinkFunction接口,重写invoke方法

2.addSink里面传入的是一个实现了SinkFunction接口的类的对象

3.SinkFunction的泛型要与写入数据的类型一样

package com.shujia.flink.sink;


import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class Demo2MySInk {
    public static void main(String[] args)throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> linesDS = env.socketTextStream("master", 8888);

        linesDS.addSink(new MySink());

        env.execute();

    }
}
class MySink implements SinkFunction<String>{
    
    //自定义sink需要实现SinkFunction,重写invoke方法,
    // invoke里面可以写数据保存的位置,也可以直接输出
    @Override
    public void invoke(String value, Context context) throws Exception {

        System.out.println("自定义sink"+value);//自定义sink+端口里面输出的
    }
}

5.3 实例

1.如果是实现SinkFunction接口,那么invoke方法每次都要执行一次连接数据库,浪费资源。所以自定义sink写入数据库时候,继承RichSinkFunction抽象类。里面的open与close只执行一次,刚刚好可以用来连接数据库与关闭数据库

package com.shujia.flink.sink;

import com.mysql.jdbc.Connection;
import com.mysql.jdbc.PreparedStatement;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.DriverManager;

public class Demo3MySqlSInk {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> linesDS = env.socketTextStream("master", 8888);

        DataStream<Tuple2<String, Integer>> countDS = linesDS
                .map(word -> Tuple2.of(word, 1), Types.TUPLE(Types.STRING, Types.INT))
                .keyBy(kv -> kv.f0)
                .sum(1);

        countDS.addSink(new MySqlSInk());

        env.execute();
    }
}

/**
 * 自定义sink将数据保存到mysql
 * RichSinkFunction:多了open和close方法,用于打开和关闭连接
 * SinkFunction
 */
class MySqlSInk extends RichSinkFunction<Tuple2<String, Integer>> {

    Connection con;
    PreparedStatement stat;

    @Override
    public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
        stat.setString(1, value.f0);
        stat.setInt(2, value.f1);

        stat.execute();
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        //创建数据库驱动
        Class.forName("com.mysql.jdbc.Driver");
        con = (Connection) DriverManager.getConnection("jdbc:mysql://master:3306/bigdata29?useSSL=false", "root", "123456");

        //replace into 替换插入,如果没有就插入,如果有就更新,表需要有主键
        stat = (PreparedStatement) con.prepareStatement("replace into word_count values(?,?)");
    }

    @Override
    public void close() throws Exception {
        stat.close();
        con.close();
    }


}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/674786.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

B端管理系统:深色系界面能够给用户带来什么感觉?

深色系的B端界面通常会给用户带来一种严肃、专业、稳重的感觉。这种界面设计通常会使用深色的背景和明亮的文字、图标&#xff0c;给人一种沉稳的视觉效果。 用户在使用深色系的B端界面时&#xff0c;可能会感到界面设计非常注重数据和信息的展示和分析&#xff0c;具有高度的…

Esxi_FAQ_2024.6.3

问题1-内存设置无效 问题&#xff1a; 内存设置无效: 内存预留 (sched.mem.min) 应该等于内存大小 (32768设置的内存容量) 原因&#xff1a; 是因为直通了nvme硬盘&#xff0c;在没有直通nvme硬盘时&#xff0c;不预留内存开机也不会报错。 解决&#xff1a; 需要全部预留内存…

小程序项目创建与Vant-UI引入

一&#xff0c;创建小程序项目 AppID可先用测试号&#xff1b; 模板来源选择 ’全部来源‘ &#xff0c;’基础‘ 。模板一定JS开头的&#xff1b; vant-weapp 官网 vant-Weapp 二&#xff0c;下载vant-weapp 组件 1&#xff0c;在新项目中打开 ’调试器‘&#xff1b; 2…

【大学物理实验】速通双语版

0首先&#xff0c;我们要学什么&#xff1f;outlook&#xff01; 1measurement 2system error&random error 3significant figures 4uncertainty of direct measurement and indirect measurement 5data processing 1 measurement Important points to remember&#…

文件夹批量改名每个不一样?文件夹批量命名的6种工具!(新)

在日常工作和学习中&#xff0c;我们经常需要处理大量的文件夹&#xff0c;并对它们进行有序的命名和管理。手动一个一个地改名不仅效率低下&#xff0c;而且容易出错。幸运的是&#xff0c;现在有许多工具可以帮助我们实现文件夹的批量重命名&#xff0c;确保每个文件夹的名称…

MySQL—函数—函数小结

一、引言 前面博客我们已经学完了MySQL的函数&#xff0c;下面快速的对MySQL的函数做一个小结。 在讲解了MySQL的函数的时候&#xff0c;主要有四个方面&#xff1a; 1、字符串函数 &#xff08;1&#xff09;CONCAT&#xff1a;字符串连接 &#xff08;2&#xff09;LOWER、…

下载旧版postman(无需要登录,无需联网,即可轻松使用postman)

https://www.filehorse.com/download-postman/old-versions/page-3/ 人工智能学习网站 https://chat.xutongbao.top

docker+vue云服务器打包镜像相关操作

dockervue云服务器打包镜像相关操作 容器化部署似乎成了当前一个非常主流的趋势&#xff0c;无论是前端还是后端&#xff0c;流行的操作就是给你一个镜像地址&#xff0c;让你自己去拉取镜像并运行镜像。这似乎是运维的工作&#xff0c;但是在没有专有运维的情况下&#xff0c…

【Java数据结构】详解Stack与Queue(二)

&#x1f512;文章目录: 1.❤️❤️前言~&#x1f973;&#x1f389;&#x1f389;&#x1f389; 2.栈的应用场景 2.1逆序打印链表 2.2逆波兰表达式求值 2.3括号匹配 2.4出栈入栈次序匹配 2.5最小栈 3. 栈 虚拟机栈 栈帧的区别 4.总结 1.❤️❤️前言~&#x1f973…

1371. 每个元音包含偶数次的最长子字符串

1371. 每个元音包含偶数次的最长子字符串 原题链接&#xff1a;完成情况&#xff1a;解题思路&#xff1a;参考代码&#xff1a;_1371每个元音包含偶数次的最长子字符串 错误经验吸取 原题链接&#xff1a; 1371. 每个元音包含偶数次的最长子字符串 https://leetcode.cn/pro…

Qos基础

一、Qos概述 Qos是一个框架&#xff0c;解决服务质量&#xff0c;尽力而为模型&#xff0c;集成服务以及区分服务模型&#xff0c;流量分类与标识。 使用Qos是带宽不够。 每个接口有硬件队列和软件队列&#xff08;队列排满了就不会再排&#xff09;。 企业宽带一般都是上行和下…

使用 Scapy 库编写 ICMP 重定向攻击脚本

一、介绍 ICMP重定向攻击&#xff08;ICMP Redirect Attack&#xff09;是一种网络攻击&#xff0c;攻击者通过发送伪造的ICMP重定向消息&#xff0c;诱使目标主机更新其路由表&#xff0c;以便将数据包发送到攻击者控制的路由器或其他不可信任的设备上。该攻击利用了ICMP协议…

【三维重建NeRF(三)】Mip-NeRF论文解读

本文结合深蓝学院课程学习和本人的理解&#xff0c;欢迎交流指正 文章目录 Mip-NeRF流程简述混叠问题与MipMapMip-NeRF提出的解决办法圆锥台近似计算与集成位置编码(IPE) Mip-NeRF流程简述 Mip-NeRF的大体流程和NeRF基本是一样的&#xff0c;NeRF介绍 创新的部分就是针对NeRF…

定格动态:如何用前端实现视频帧截图

在这样一个图像化极其重要的时代&#xff0c;从视频中提取精彩瞬间&#xff0c;即视频帧截图的技术&#xff0c;已成为前端开发中的一个亮点。JavaScript作为网页动态效果和交互的主力军&#xff0c;其在视频处理领域能力逐渐被挖掘和重视&#xff0c;尤其是视频帧截图技术的应…

GaN功率电子器件中体缺陷相关机制的建模仿真研究

在电力电子器件的外延生长和器件制备过程中&#xff0c;缺陷是不可避免的&#xff0c;大量的缺陷在一定程度上会牺牲器件的击穿电压、导通电阻等性能&#xff0c;同时影响器件的可靠性。近期&#xff0c;河北工业大学和广东工业大学联合开发了缺陷相关的仿真模型&#xff0c;深…

gitblit 环境搭建,服务器迁移记录

下载 Gitblit&#xff1a; http://www.gitblit.com/ JDK&#xff1a;gitblit网站显示需要jdk1.7&#xff0c;这里用的1.8。 Git&#xff1a;到官网下载最新版本安装 1). 分别安装JDK&#xff0c;Git&#xff0c;配置环境变量&#xff0c;下载并解压Gitblit 2). 创建代码仓库 …

每日一题《leetcode--LCR 029.循环有序列表的插入》

https://leetcode.cn/problems/4ueAj6/ 这道题整体上想插入数据有三种情况&#xff1a; 1、整个列表是空列表&#xff0c;需要返回插入的结点 2、整个列表只有一个结点&#xff0c;需要在头结点后插入新结点&#xff0c;随机把新结点的next指向头结点 3、整个列表的结点 >1 …

052、Python 集合及其使用

集合&#xff08;Set&#xff09;是一种无序且元素唯一的数据结构&#xff0c;用于存储不重复的元素&#xff08;即集合具有无序性和互异性两个重要特性&#xff09;。集合可以用于执行集合操作&#xff0c;如并集、交集、差集等。 定义集合 可以使用大括号 {} 或者 set() 函…

供应MT7662TUN/C进口芯片现货

长期供应各品牌进口芯片现货&#xff1a; MT7662TUN/C DLPC4421A DLPC4422A DAD2000 IT6634 DDP4421-HV PMD1000 SiHA120N60E AM8280 AM90N06-03B P15F60HP2 MSD6A838UYGN-8-003D 5AGXBA5D4F31C5G MCZ5209SN STM32L431CCT6 PT2833 ES858 TPS74301RGWR CSD18…

Rust自动生成文件解析

目录 一、生成目录解析二、生成文件解析2.1 Cargo.toml2.2 main函数解析 一、生成目录解析 先使用cargo clean命令删除所有生成的文件&#xff0c;下图显示了目录结构和 main.rs文件 使用cargo new testrust时自动创建出名为testrust的Rust项目。内部主要包含一个src的源码文…