掌握大数据处理利器:Flink 知识点全面总结【上】

1.Flink的特点

Apache Flink 是一个框架分布式处理引擎,用于对无界有界数据流进行状态计算

Flink主要特点如下:
  • 高吞吐和低延迟。每秒处理数百万个事件,毫秒级延迟。
  • 结果的准确性。Flink提供了事件时间(event--time)和处理时间(processing-time)语义。
对于乱序事件流,事件时间语义仍然能提供一致且准确的结果。
  • 精确一次(exactly-once)的状态一致性保证。
  • 可以连接到最常用的外部系统,如Kafka、Hive、JDBC、HDFS、Redis等。
  • 高可用。本身高可用的设置,加上与K8s,YARN和Mesos的紧密集成,再加上从故障中快速恢复和动态扩展任务的能力,Flink能做到以极少的停机时间7×24全天候运行。

2.分层API

        有状态流处理:通过底层API〔处理函数),对最原始数据加工处理。底层API与DataStream API相集成,可以处理复杂的计算。
        DataStream API(流处理)和DataSet API(批处理)封装了底层处理函数,提供了通用的模块,比如转换(transformations,包括map、flatmap等),连接(joins),聚合(aggregations),窗口(windows)操作等。注意:Flink1.l2以后,DataStream API已经实现真正的流批一体,所以DataSet API已经过时。
        Table API是以表为中心的声明式编程,其中表可能会动态变化。Table API遵循关系模型:表有二维数据结构,类似于关系数据库中的表:同时API提供可比较的操作,例如select,project、join,group-by,aggregate等。我们可以在表与DataStream/DataSet之间无缝切换,以允许程序将Table API与DataStream以及DataSet混合使用。
        SQL这一层在语法与表达能力上与Table API类似,但是是以SQL查询表达式的形式表现程序。SQL抽象与Table API交互密切,同时SQL查询可以直接在Table API定义的表上执行。

3.流式数据,批量数据

批处理的特点是有界、大量,非常适合需要访问全套记录才能完成的计算工作,一般用于离线统计。

流处理的特点是无界、实时,  无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作,一般用于实时统计。

在spark的世界观中,一切都是由批次组成的,离线数据是一个大批次,而实时数据是由一个一个无限的小批次组成的。

而在flink的世界观中,一切都是由流组成的,离线数据是有界限的流,实时数据是一个没有界限的流,这就是所谓的有界流和无界流

无界数据流:

无界数据流有一个开始但是没有结束,它们不会在生成时终止并提供数据,必须连续处理无界流,也就是说必须在获取后立即处理event。对于无界数据流我们无法等待所有数据都到达,因为输入是无界的,并且在任何时间点都不会完成。处理无界数据通常要求以特定顺序(例如事件发生的顺序)获取event,以便能够推断结果完整性。

有界数据流:

有界数据流有明确定义的开始和结束,可以在执行任何计算之前通过获取所有数据来处理有界流,处理有界流不需要有序获取,因为可以始终对有界数据集进行排序,有界流的处理也称为批处理

4.单作业模式

会话模式因为资源共享会导致很多问题,所以为了更好地隔离资源,我们可以考虑为每个提交的作业启动一个集群,这就是所谓的单作业(Per-Job)模式

        单作业模式,就是严格的一对一,集群只为这个作业而生。同样由客户端运行应用程序,然后启动集群,作业被提交给 JobManager,进而分发TaskManager 执行。作业作业完成后,集群就会关闭,所有资源也会释放。这样一来,每个作业都有它自己的 JobManager管理,占用独享的资源,即使发生故障,它的 TaskManager 宕机也不会影响其他作业。

这些特性使得单作业模式在生产环境运行更加稳定,所以是实际应用的首选模式

需要注意的是,Flink 本身无法直接这样运行,所以单作业模式一般需要借助一些资源管理框架来启动集群,比如 YARN、Kubernetes。

5.并行度优先级

每一个算子( operator )可以包含一个或多个子任务( operator subtask ),这些子任务在不同的线程、不同的物理机或不同的容器中完全独立地执行。
一个特定算子的 子任务(subtask)的个数 被称为其并行度 parallelism )。

一般情况下,一个流程序的并行度,可以认为就是其所有算子中最大的并行度。一个程序中,不同的算子可能具有不同的并行度。

Stream在算子之间传输数据的形式可以是one-to-one(forwarding)的模式也可以是redistributing的模式,具体是哪一种形式,取决于算子的种类。

  • One-to-one

stream(比如在source和map operator之间)维护着分区以及元素的顺序。那意味着flatmap 算子的子任务看到的元素的个数以及顺序跟source 算子的子任务生产的元素的个数、顺序相同,map、fliter、flatMap等算子都是one-to-one的对应关系。类似于spark中的窄依赖

  • Redistributing

stream(map()跟keyBy/window之间或者keyBy/window跟sink之间)的分区会发生改变。每一个算子的子任务依据所选择的transformation发送数据到不同的目标任务。例如,keyBy()基于hashCode重分区、broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribute过程就类似于Spark中的shuffle过程。类似于spark中的宽依赖

 并行度可以有如下几种指定方式

  1. Operator Level(算子级别)(可以使用)

    一个算子、数据源和sink的并行度可以通过调用 setParallelism()方法来指定

    image-20200921112057156

  2. Execution Environment Level(Env级别)(可以使用)

    执行环境(任务)的默认并行度可以通过调用setParallelism()方法指定。为了以并行度3来执行所有的算子、数据源和data sink, 可以通过如下的方式设置执行环境的并行度:执行环境的并行度可以通过显式设置算子的并行度而被重写

    image-20200921112112743

  3. Client Level(客户端级别,推荐使用)(可以使用)

    并行度可以在客户端将job提交到Flink时设定。

    对于CLI客户端,可以通过-p参数指定并行度 ./bin/flink run -p 10 WordCount-java.jar

  4. System Level(系统默认级别,尽量不使用)

    在系统级可以通过设置flink-conf.yaml文件中的parallelism.default属性来指定所有执行环境的默认并行度

并行度的优先级:算子级别 > env级别 > Client级别 > 系统默认级别 (越靠前具体的代码并行度的优先级越高)

6.任务调度执行图

逻辑流图(StreamGraph)→ 作业图(JobGraph)→ 执行图(ExecutionGraph)→ 物理图(Physical Graph)。

1)逻辑流图(StreamGraph

这是根据用户通过 DataStream API编写的代码生成的最初的DAG图,用来表示程序的拓扑结构。这一步一般在客户端完成。

2)作业图(JobGraph

StreamGraph经过优化后生成的就是作业图(JobGraph),这是提交给 JobManager 的数据结构,确定了当前作业中所有任务的划分。主要的优化为:将多个符合条件的节点链接在一起合并成一个任务节点,形成算子链,这样可以减少数据交换的消耗。JobGraph一般也是在客户端生成的,在作业提交时传递给JobMaster。

我们提交作业之后,打开Flink自带的Web UI,点击作业就能看到对应的作业图。

3)执行图(ExecutionGraph

JobMaster收到JobGraph后,会根据它来生成执行图(ExecutionGraph)。ExecutionGraph是JobGraph的并行化版本,是调度层最核心的数据结构。与JobGraph最大的区别就是按照并行度对并行子任务进行了拆分,并明确了任务间数据传输的方式。

4)物理图(Physical Graph

JobMaster生成执行图后,会将它分发给TaskManager;各个TaskManager会根据执行图部署任务,最终的物理执行过程也会形成一张“图”,一般就叫作物理图(Physical Graph)。这只是具体执行层面的图,并不是一个具体的数据结构。

物理图主要就是在执行图的基础上,进一步确定数据存放的位置和收发的具体方式。有了物理图,TaskManager就可以对传递来的数据进行处理计算了。

7.转换算子

数据源读入数据之后,我们就可以使用各种转换算子,将一个或多个DataStream转换为新的DataStream。

基本转换算子

1. 映射(map

map 是大家非常熟悉的大数据操作算子,主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一 一映射”,消费一个元素就产出一个元素

我们只需要基于 DataStream 调用 map()方法就可以进行转换处理。方法需要传入的参数是接口 MapFunction 的实现;返回值类型还是 DataStream。

public class TransMapTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );
        //1.自定义类,实现mapfunction接口
        SingleOutputStreamOperator<String> result1 = stream.map(new UserExtractor());

        // 2.传入匿名类,实现MapFunction
         SingleOutputStreamOperator<String> result2 = stream.map(new MapFunction<Event, String>() {
            @Override
            public String map(Event value) throws Exception {
                return value.user;
            }
        });

        //3.传入lambda表达式
        SingleOutputStreamOperator<String> result3 = stream.map(data -> data.user);

        result1.print();
        //result2.print();
        //result3.print();
        env.execute();
    }
    public static class UserExtractor implements MapFunction<Event, String> {
        @Override
        public String map(Event value) throws Exception {
            return value.user;
        }
    }
}

MapFunction 实现类的泛型类型,与输入数据类型和输出数据的类型有关。

在实现 MapFunction 接口的时候,需要指定两个泛型,分别是输入事件和输出事件的类型,还

需要重写一个 map()方法,定义从一个输入事件转换为另一个输出事件的具体逻辑。

2. 过滤(filter)

filter转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤条件,对于每一个流内元素进行判断,若为true则元素正常输出,若为false则元素被过滤掉。

进行filter转换之后的新数据流的数据类型与原数据流是相同的。filter转换需要传入的参数需要实现FilterFunction接口,而FilterFunction内要实现filter()方法,就相当于一个返回布尔类型的条件表达式。

public class TransFilterTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

        //1.传入实现filterfunction的自定义类
        SingleOutputStreamOperator<Event> result1 = stream.filter(new UserFilter());

        // 2.传入匿名类实现FilterFunction
        SingleOutputStreamOperator<Event> result2 = stream.filter(new FilterFunction<Event>() {
            @Override
            public boolean filter(Event e) throws Exception {
                return e.user.equals("Bob");
            }
        });

        //3.传入lambda表达式
        SingleOutputStreamOperator<Event> result3 = stream.filter(data -> data.user.equals("Bob"));

        result1.print();
      //  result2.print();
      //  result3.print();

        env.execute();
    }
    public static class UserFilter implements FilterFunction<Event> {
        @Override
        public boolean filter(Event e) throws Exception {
            return e.user.equals("Mary");
        }
    }
}

3. 扁平映射(flatMap))

flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合,也就是先按照某种规则对数据进行打散拆分,再对拆分后的元素做转换处理.

同 map 一样,flatMap 也可以使用 Lambda 表达式或者 FlatMapFunction 接口实现类的方式

来进行传参,返回值类型取决于所传参数的具体逻辑,可以与原数据流相同,也可以不同。

public class TransFlatmapTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L)
        );

        //1.实现自定义的flatmapfunction
         stream.flatMap(new MyFlatMap()).print("1");

         //2.传入lambda表达式
        stream.flatMap((Event value,Collector<String> out)-> {
            if (value.user.equals("Marry"))
                out.collect(value.url);
            else if(value.user.equals("Bob"))
                out.collect(value.user);
                out.collect(value.url);
                out.collect(value.timestamp.toString());
        }).returns(new TypeHint<String>() {
                })
                .print("2");

        env.execute();
    }
    public static class MyFlatMap implements FlatMapFunction<Event, String> {
        @Override
        public void flatMap(Event value, Collector<String> out) throws Exception {
                out.collect(value.user);
                out.collect(value.url);
                out.collect(value.timestamp.toString());
            }
        }
    }

flatMap 操作会应用在每一个输入事件上面,FlatMapFunction 接口中定义了 flatMap 方法,

用户可以重写这个方法,在这个方法中对输入数据进行处理,并决定是返回 0 个、1 个或多个

结果数据。因此 flatMap 并没有直接定义返回值类型,而是通过一个“收集器”(Collector)来

指定输出。希望输出结果时,只要调用收集器的.collect()方法就可以了;这个方法可以多次调

用,也可以不调用。所以 flatMap 方法也可以实现 map 方法和 filter 方法的功能,当返回结果

是 0 个的时候,就相当于对数据进行了过滤,当返回结果是 1 个的时候,相当于对数据进行了

简单的转换操作。

聚合算子(Aggregation

1. 按键分区(keyBy)

对于Flink而言,DataStream是没有直接进行聚合的API的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在Flink中,要做聚合,需要先进行分区;这个操作就是通过keyBy来完成的。

keyBy是聚合前必须要用到的一个算子。keyBy通过指定键(key),可以将一条流从逻辑上划分成不同的分区(partitions)。这里所说的分区,其实就是并行处理的子任务。

基于不同的key,流中的数据将被分配到不同的分区中去;这样一来,所有具有相同的key的数据,都将被发往同一个分区。

2. 简单聚合

有了按键分区的数据流KeyedStream,我们就可以基于它进行聚合操作了。Flink为我们内置实现了一些最基本、最简单的聚合API,主要有以下几种:

  1. sum():在输入流上,对指定的字段做叠加求和的操作。
  2. min():在输入流上,对指定的字段求最小值。
  3. max():在输入流上,对指定的字段求最大值。
  4. minBy():与min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而minBy()则会返回包含字段最小值的整条数据。
  5. maxBy():与max()类似,在输入流上针对指定字段求最大值。两者区别与min()/minBy()完全一致。

简单聚合算子使用非常方便,语义也非常明确。这些聚合方法调用时,也需要传入参数;但并不像基本转换算子那样需要实现自定义函数,只要说明聚合指定的字段就可以了。指定字段的方式有两种:指定位置,和指定名称

public class TransformSimpleAggTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100", 3000L),
                new Event("Bob","./prod?id=1", 3300L),
                new Event("Bob", "./home", 3500L),
                new Event("Alice","./prod?id=200", 3200L),
                new Event("Bob","./prod?id=2", 3800L),
                new Event("Bob","./prod?id=3", 4200L)
        );

        // 按键分组之后进行聚合,提取当前用户最后一次访问数据
        stream.keyBy(new KeySelector<Event, String>() {
            @Override
            public String getKey(Event value) throws Exception {
                return value.user;
            }
        }).max("timestamp")
                .print("max:");

        stream.keyBy(data -> data.user)
                .maxBy("timestamp")
                .print("maxBy:");

        env.execute();

    }
}

3. 归约聚合(reduce

与简单聚合类似,reduce 操作也会将 KeyedStream 转换为 DataStream。它不会改变流的元素数据类型,所以输出类型和输入类型是一样的。

调用 KeyedStream 的 reduce 方法时,需要传入一个参数,实现 ReduceFunction 接口。接口在源码中的定义如下:

public interface ReduceFunction<T> extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}

ReduceFunction 接口里需要实现 reduce()方法,这个方法接收两个输入事件,经过转换处理之后输出一个相同类型的事件;所以,对于一组数据,我们可以先取两个进行合并,然后再将合并的结果看作一个数据、再跟后面的数据合并,最终会将它“简化”成唯一的一个数据,这也就是 reduce“归约”的含义。在流处理的底层实现过程中,实际上是将中间“合并的结果”作为任务的一个状态保存起来的;之后每来一个新的数据,就和之前的聚合状态进一步做归约。

public class TransformReduceTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<Event> stream = env.fromElements(
                new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice","./prod?id=100", 3000L),
                new Event("Bob","./prod?id=1", 3300L),
                new Event("Alice","./prod?id=200", 3200L),
                new Event("Bob", "./home", 3500L),
                new Event("Bob","./prod?id=2", 3800L),
                new Event("Bob","./prod?id=3", 4200L)
        );

        //1. 统计每个用户的访问频次
        SingleOutputStreamOperator<Tuple2<String, Long>> clicksByUser = stream.map(new MapFunction<Event, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(Event value) throws Exception {
                        return Tuple2.of(value.user, 1L);
                    }
                }).keyBy(data -> data.f0)
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                });

        //2. 选取当前最活跃的用户
        SingleOutputStreamOperator<Tuple2<String, Long>> result = clicksByUser.keyBy(data -> "key")
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        return value1.f1 > value2.f1 ? value1 : value2;
                    }
                });

        result.print();
        env.execute();
    }
}

8.输出算子sink

连接到外部系统

Flink作为数据处理框架,最终还是要把计算处理的结果写入外部存储,为外部应用提供支持。

Flink的DataStream API专门提供了向外部写入数据的方法:addSink。与addSource类似,addSink方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink程序中所有对外的输出操作,一般都是利用Sink算子完成的。

Sink 一词有“下沉”的意思,有些资料会相对于“数据源”把它翻译为“数据汇”。不论怎样理解,Sink 在 Flink 中代表了将结果数据收集起来、输出到外部的意思,所以我们这里统一把它直观地叫作“输出算子”。

与 Source 算子非常类似,除去一些 Flink 预实现的 Sink,一般情况下 Sink 算子的创建是通过调用 DataStream 的.addSink()方法实现的。

stream.addSink(new SinkFunction(…));

addSource 的参数需要实现一个 SourceFunction 接口;类似地,addSink 方法同样需要传入一个参数,实现的是 SinkFunction 接口。在这个接口中只需要重写一个方法 invoke(),用来将指定的值写入到外部系统中。这个方法在每条数据记录到来时都会调用:

default void invoke(IN value, Context context) throws Exception

SinkFuntion 多数情况下同样并不需要我们自己实现。Flink 官方提供了一部分的框架的 Sink 连接器。

列出了 Flink 官方目前支持的第三方系统连接器:

除 Flink 官方之外,Apache Bahir 作为给 Spark 和 Flink 提供扩展支持的项目,也实现了一些其他第三方系统与 Flink 的连接器:

除此以外,就需要用户自定义实现 sink 连接器了

输出到文件

Flink 为此专门提供了一个流式文件系统的连接器:StreamingFileSink,它继承自抽象类RichSinkFunction,而且集成了 Flink 的检查点(checkpoint)机制,用来保证精确一次(exactly once)的一致性语义。

StreamingFileSink 为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink支持的文件系统。它可以保证精确一次的状态一致性,大大改进了之前流式文件 Sink 的方式。它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件,这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作;默认的分桶方式是基于时间的,我们每小时写入一个新的桶。换句话说,每个桶内保存的文件,记录的都是 1 小时的输出数据。

StreamingFileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如 Parquet)格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用StreamingFileSink 的静态方法:

行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)。
批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)。
public class SinkToFileTest {
    public static void main(String[] args) throws Exception{
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        DataStream<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=100", 3000L),
                new Event("Bob", "./prod?id=1", 3300L),
                new Event("Alice", "./prod?id=300", 3200L),
                new Event("Bob", "./home", 3500L),
                new Event("Bob", "./prod?id=2", 3800L),
                new Event("Bob", "./prod?id=3", 4200L));

        StreamingFileSink<String> fileSink = StreamingFileSink
                .<String>forRowFormat(new Path("./output"),
                        new SimpleStringEncoder<>("UTF-8"))
                .withRollingPolicy(
                        DefaultRollingPolicy.builder()
                                .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
                                .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
                                .withMaxPartSize(1024 * 1024 * 1024)
                                .build())
                .build();

        // 将Event转换成String写入文件
        stream.map(data -> data.toString()).addSink(fileSink);

        env.execute();
    }
}

输出到Kafka

我们要将数据输出到 Kafka,整个数据处理的闭环已经形成,所以可以完整测试如下:

(1)添加 Kafka 连接器依赖

由于我们已经测试过从 Kafka 数据源读取数据,连接器相关依赖已经引入,这里就不重复

介绍了。

(2)启动 Kafka 集群

(3)编写输出到 Kafka 的示例代码

1.启动zookeeper

命令:bin/zkServer.sh start

2.启动kafka

命令:bin/kafka-server-start.sh -daemon config/server.properties

3.创建生产者

命令:./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic clicks

addSink 传入的参数是一个 FlinkKafkaProducer。这也很好理解,因为需要向 Kafka 写入数据,自然应该创建一个生产者。FlinkKafkaProducer 继承了抽象类TwoPhaseCommitSinkFunction,这是一个实现了“两阶段提交”的 RichSinkFunction。两阶段提交提供了 Flink 向 Kafka 写入数据的事务性保证,能够真正做到精确一次(exactly once)的状态一致性。

4. 运行代码,另开启一个master01窗口,启动一个消费者, 查看是否收到数据

bin/kafka-console-consumer.sh --bootstrap-server  localhost:9092 --topic events

public class SinkToKafkaTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        Properties properties = new Properties();
        properties.put("bootstrap.servers", "master01:9092");

        DataStreamSource<String> stream = env.addSource(new FlinkKafkaConsumer<String>(
                "clicks",
                new SimpleStringSchema(),
                properties
        ));

        SingleOutputStreamOperator<String> result = stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                String[] fields = value.split(",");
                return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString();
            }
        });

        result.addSink(new FlinkKafkaProducer<String>("master01:9092", "events", new SimpleStringSchema()));

        env.execute();
    }
}

结果:


2025年的第一天我居然还在复习【枯死】

新的一年,梦虽遥,追则能达。愿虽艰,持则可圆。祝大家所愿皆所成,多喜乐,长安宁。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/946071.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

[论文阅读] (34)ESWA2024 基于SGDC的轻量级入侵检测系统

《娜璋带你读论文》系列主要是督促自己阅读优秀论文及听取学术讲座&#xff0c;并分享给大家&#xff0c;希望您喜欢。由于作者的英文水平和学术能力不高&#xff0c;需要不断提升&#xff0c;所以还请大家批评指正&#xff0c;非常欢迎大家给我留言评论&#xff0c;学术路上期…

《向量数据库指南》——Milvus Cloud 2.5:Sparse-BM25引领全文检索新时代

Milvus Cloud BM25:重塑全文检索的未来 在最新的Milvus Cloud 2.5版本中,我们自豪地引入了“全新”的全文检索能力,这一创新不仅巩固了Milvus Cloud在向量数据库领域的领先地位,更为用户提供了前所未有的灵活性和效率。作为大禹智库的向量数据库高级研究员,以及《向量数据…

常用的数据库类型都有哪些

在Java开发和信息系统架构中&#xff0c;数据库扮演着存储和管理数据的关键角色。数据库种类繁多&#xff0c;各有特色&#xff0c;适用于不同的应用场景。 1. 关系型数据库&#xff08;RDBMS&#xff09;&#xff1a; • 关系型数据库是最为人熟知的数据库类型&#xff0c;数据…

计算机网络—————考研复试

第一章、计算机网络体系结构 1. OSI参考模型和TCP/IP模型&#xff1a; OSI与TCP/IP的记忆方法&#xff1a;只需把OSI的七层记住&#xff0c;将应用层、表示层、会话层一起记&#xff0c;到TCP/IP变成应用层。物理层和数据链路层换成网络接口层。把网络层换个字变成网际层。 而…

从2024看2025前端发展趋势

前言 又至年关&#xff0c;回顾整个2024年&#xff0c;前端行业仍旧百废待兴&#xff0c;IT业界同样也未见有所起色&#xff0c;AI风潮也从狂热兴奋逐步走向了冷静稳定阶段&#xff0c;造成此形势感观并非单一行业或者某一企业之特例&#xff0c;实为政经等综合影响之结果。因…

国内机器视觉产业链全解析

欢迎关注《光场视觉》 简单的&#xff0c;我们可以把机器视觉产业链可以分为底层开发商&#xff08;核心零部件和软件提供商&#xff09;、集成和软件服务商&#xff08;二次开发&#xff09;&#xff0c;核心零部件及软件又可以再细分为光源、镜头、工业相机、图像采集卡、图…

node.js之---事件循环机制

事件循环机制 Node.js 事件循环机制&#xff08;Event Loop&#xff09;是其核心特性之一&#xff0c;它使得 Node.js 能够高效地处理大量并发的 I/O 操作。Node.js 基于 非阻塞 I/O&#xff0c;使用事件驱动的模型来实现异步编程。事件循环是 Node.js 实现异步编程的基础&…

如何在没有 iCloud 的情况下将数据从 iPhone 传输到 iPhone

概括 您可能会遇到将数据从 iPhone 转移到 iPhone 的情况&#xff0c;尤其是当您获得新的 iPhone 15/14 时&#xff0c;您会很兴奋并希望将数据转移到它。 使用iCloud最终可以做到这一点&#xff0c;但它的缺点也不容忽视&#xff0c;阻碍了你选择它。例如&#xff0c;您需要…

HTML——26.像素单位

<!DOCTYPE html> <html><head><meta charset"UTF-8"><title>像素</title></head><body><!--像素&#xff1a;1.指设备屏幕上的一个点&#xff0c;单位px&#xff0c;如led屏上的小灯朱2.当屏幕分辨率固定时&…

智能商业分析 Quick BI

Quick BI 是阿里云提供的一款智能商业分析&#xff08;BI&#xff09;工具&#xff0c;旨在帮助企业快速获取业务洞察、优化决策过程、提升数据分析效率。通过强大的数据可视化和分析功能&#xff0c;Quick BI 能够帮助用户轻松连接多种数据源、创建多维度的报表和仪表盘&#…

multisim仿真搭建三极管开关电路,低电平(5V)控制高电平(12V)输出

通过三极管搭建电路&#xff0c;低电平&#xff08;5V&#xff09;控制高电平&#xff08;12V&#xff09;输出 低电平输入&#xff1a;当输入信号为低电平时&#xff08;0V&#xff09;&#xff0c;三极管Q1处于截止状态。上拉电阻R1的存在&#xff0c;Q2输入端被拉到低电平&a…

Python跨年烟花

目录 系列文章 写在前面 技术需求 完整代码 下载代码 代码分析 1. 程序初始化与显示设置 2. 烟花类 (Firework) 3. 粒子类 (Particle) 4. 痕迹类 (Trail) 5. 烟花更新与显示 6. 主函数 (fire) 7. 游戏循环 8. 总结 注意事项 写在后面 系列文章 序号直达链接爱…

LeetCode - 初级算法 数组(删除排序数组中的重复项)

免责声明:本文来源于个人知识与公开资料,仅用于学术交流。 删除排序数组中的重复项 这篇文章讨论如何从一个非严格递增的数组 nums 中删除重复的元素,使每个元素只出现一次,并返回新数组的长度。因为数组是排序的,只要是相同的肯定是挨着的,所以我们需要遍历所有数组,然…

【yolov5】实现FPS游戏人物检测,并定位到矩形框上中部分,实现自瞄

介绍 本人机器学习小白&#xff0c;通过语言大模型百度进行搜索&#xff0c;磕磕绊绊的实现了初步效果&#xff0c;能有一些锁头效果&#xff0c;但识别速度不是非常快&#xff0c;且没有做敌友区分&#xff0c;效果不是非常的理想&#xff0c;但在4399小游戏中爽一下还是可以…

Java jni调用nnom rnn-denoise 降噪

介绍&#xff1a;https://github.com/majianjia/nnom/blob/master/examples/rnn-denoise/README_CN.md 默认提供了一个wav的例子 #include <stdint.h> #include <stdlib.h> #include <stdio.h> #include <math.h> #include <string.h>#include …

Windows系统 系统盘瘦身策略之文件迁移

1 Android Studio 1.1 .android 该文件夹路径一般在 C:\Users\<user_name>\.android 迁移步骤&#xff1a; ①关闭 Android Studio ②打开环境变量设置&#xff0c;添加以下环境变量 变量名&#xff1a;ANDROID_SDK_HOME 变量值&#xff1a;你自己的路径【不用单独创建.…

SQLiteDataBase数据库

XML界面设计 <?xml version"1.0" encoding"utf-8"?> <LinearLayout xmlns:android"http://schemas.android.com/apk/res/android"xmlns:tools"http://schemas.android.com/tools"android:layout_width"match_paren…

Midjourney技术浅析(七):图像风格化

Midjourney 通过风格迁移&#xff08;Style Transfer&#xff09;和图像滤镜&#xff08;Image Filters&#xff09;技术&#xff0c;使用户能够将生成的图像转换为不同的艺术风格或视觉效果。 一、风格迁移&#xff08;Style Transfer&#xff09; 1.1 风格迁移的定义 风格…

Edge安装问题,安装后出现:Could not find Edge installation

解决&#xff1a;需要再安装&#xff08;MicrosoftEdgeWebView2RuntimeInstallerX64&#xff09;。 网址&#xff1a;https://developer.microsoft.com/zh-cn/microsoft-edge/webview2/?formMA13LH#download 如果已经安装了edge&#xff0c;那就再下载中间这个独立程序安装就…

【JAVA高级篇教学】第六篇:Springboot实现WebSocket

在 Spring Boot 中对接 WebSocket 是一个常见的场景&#xff0c;通常用于实现实时通信。以下是一个完整的 WebSocket 集成步骤&#xff0c;包括服务端和客户端的实现。本期做个简单的测试用例。 目录 一、WebSocket 简介 1. 什么是 WebSocket&#xff1f; 2. WebSocket 的特…