Flink入门学习(一)

Flink

1. 概述

分布式、高性能、随时可用以及准确的流处理应用程序打造的开源流处理框架,用于对无界和有界数据流进行有状态计算。Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。

有界流:有定义流的开始,也有定义流的结束,可以在摄取所有数据后再进行计算。所有数据可以被排序,所以并不需要有序获取,通常被称为批处理。
无界流:有定义流的开始,但没有定义流的结束,无休止地产生数据。无界流的数据必须持续处理,即数据被获取后需要立刻处理,流处理。

  • 1.1 Flink批处理和流处理
    Flink分别提供了面向流处理的接口(DataStreamAPI)和面向批处理的接口(DataSetAPI)。因此,Flink既可以完成流处理,也可以完成批处理。tableAPI是针对流处理和批处理的API。
    在这里插入图片描述

Spark中,对于批处理和流处理采用了不同的技术框架,批处理由 SparkSQL 实现,流处理由 Spark Streaming 实现。

  • 1.2 Flink 四大基石

    • 窗口Window

      流计算中一般在对流数据进行操作之前都会先进行开窗,即基于一个什么样的窗口上做这个计算
      Flink提供了开箱即用的各种窗口,比如滑动窗口、滚动窗口、会话窗口以及非常灵活的自定义的窗口
      类似离线批处理分析中开窗函数中窗口大小设置

    • 时间Time

      Flink中窗口计算,基本上都是基于时间设置窗口
      Flink还实现了Watermark的机制,能够支持基于事件时间的处理,能够容忍迟到/乱序的数据
      基于事件时间窗口计算:EventTime事件时间、窗口计算Window、窗口类型

    • 状态State

      Flink计算引擎,自身就是基于状态计算框架,默认情况下程序自己管理状态
      提供一致性的语义,使得用户在编程时能够更轻松、更容易地去管理状态
      提供一套非常简单明了的State API,包括ValueState、ListState、MapState,BroadcastState

    • 检查点Checkpoint

      Flink Checkpoint检查点:保存状态数据
      基于Chandy-Lamport算法实现了一个分布式的一致性的快照,从而提供了一致性的语义
      进行Checkpoint后,可以设置自动进行故障恢复
      保存点Savepoint,人工进行Checkpoint操作,进行程序恢复执行

    参考:
    https://blog.csdn.net/weixin_44133605/article/details/125117615

  • 1.3 Flink特性

    • 同时支持高吞吐、低延迟、高性能的流处理
      Flink 的流处理引擎只需要很少配置就能实现高吞吐率和低延迟。

    • 支持带有事件时间的窗口操作
      Event time使得计算乱序到达的事件或可能延迟到达的事件更加简单。
      大多数窗口计算采用的都是系统时间(Process Time),也是事件传输到计算框架时,系统主机的当前时间。Flink能够支持基于时间事件时间(Event Time)语义进行窗口计算,也就是时间产生的时间。这种基于时间驱动的机制使得事件即使是乱序到达,流系统也能够计算出精确的结果,保持了时间原本产生的时序性。尽量避免网络传输或硬件系统影响。

    • 支持有状态计算的 Exactly-once 语义
      流程序可以在计算过程中维护自定义状态。
      Flink 的 checkpointing 机制保证了即时在故障发生下也能保障状态的 exactly once 语义。
      :::info
      Flink的Exactly-once 指的是:状态只持久化一次到最终的存储介质中(本地数据库/HDFS…)
      无状态简单理解为:每次的执行都不依赖上一次或上N次的执行结果,每次的执行都是独立的。
      有状态简单理解为:执行需要依赖上一次或上N次的执行结果,某次的执行需要依赖前面事件的处理结果。
      :::

    • 支持高度灵活的窗口操作,支持基于 time、count、session,以及 data-driven 的窗口操作
      在流处理应用中,数据是连续不断的,需要通过窗口的方式对数据进行一定范围的聚合计算,窗口可以用灵活的触发条件定制化达到对复杂的流传输模式的支持,用户可以定义不同窗口触发机制来满足不同的需求。

    • 支持具有反压功能的持续流模型
      慢的数据sink节点会反压(backpressure)快的数据源(sources)。

    • 支持基于轻量级分布式快照(Snapshot)实现的容错
      这种机制是非常轻量级的,允许系统拥有高吞吐率的同时还能提供强一致性的保障。

    • Batch和Streaming 一个系统流处理和批处理共用一个引擎
      Flink 为流处理和批处理应用公用一个通用的引擎。批处理应用可以以一种特殊的流处理应用高效地运行。

    • Flink 在 JVM 内部实现了自己的内存管理
      应用可以超出主内存的大小限制,并且承受更少的垃圾收集的开销。

    • 支持迭代计算
      Flink 具有迭代计算的专门支持,增量迭代可以利用依赖计算来更快地收敛。

    • 支持程序自动优化
      避免特定情况下 Shuffle、排序等昂贵操作,中间结果有必要进行缓存

    参考:
    https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/learn-flink/
    https://blog.csdn.net/kwame211/article/details/110422947/

  • 1.4 Flink部署及启动
    Flink 支持多种安装模式:

    • local(本地)——单机模式,一般不使用;

    • standalone——独立模式,Flink 自带集群,开发测试环境使用;

    • yarn——计算资源统一由 Hadoop YARN 管理,生产环境使用。

    详情参考官方文档
    https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/

2. 架构

  • 2.1 Flink 程序结构
    Flink 程序的基本构建块是流和转换。
    在这里插入图片描述

  • Source: 数据源,Flink 在流处理和批处理上的 source 大概有 4 类:基于本地集合的 source、基于文件的 source、基于网络套接字的 source、自定义的 source。自定义的 source 常见的有 Apache kafka、RabbitMQ 等

  • Transformation:数据转换的各种操作,有 Map / FlatMap / Filter / KeyBy / Reduce / Fold / Aggregations / Window / WindowAll / Union / Window join / Split / Select等,操作很多,可以将数据转换计算成你想要的数据。

  • Sink:接收器,Flink 将转换计算后的数据发送的地点 ,你可能需要存储下来,Flink 常见的 Sink 大概有如下几类:写入文件、打印出来、写入 socket 、自定义的 sink 。自定义的 sink 常见的有 Apache kafka、RabbitMQ、MySQL、ElasticSearch、Apache Cassandra、Hadoop FileSystem 等。

在这里插入图片描述

  • 2.2 Flink 并行数据流
    Flink 程序在执行的时候,会被映射成一个 Streaming Dataflow,一个 Streaming Dataflow 是由一组 Stream 和 Transformation Operator 组成的。在启动时从一个或多个 Source Operator 开始,结束于一个或多个 Sink Operator。
    Flink 程序本质上是并行的和分布式的,在执行过程中,一个流(stream)包含一个或多个流分区,而每一个 operator 包含一个或多个 operator 子任务。操作子任务间彼此独立,在不同的线程中执行,甚至是在不同的机器或不同的容器上。operator 子任务的数量是这一特定 operator 的并行度。相同程序中的不同 operator 有不同级别的并行度。
    在这里插入图片描述
    一个 Stream 可以被分成多个 Stream 的分区,也就是 Stream Partition。一个 Operator 也可以被分为多个 Operator Subtask。如上图中,Source 被分成 Source1 和 Source2,它们分别为 Source 的 Operator Subtask。每一个 Operator Subtask 都是在不同的线程当中独立执行的。一个 Operator 的并行度,就等于 Operator Subtask 的个数。上图 Source 的并行度为 2。而一个 Stream 的并行度就等于它生成的 Operator 的并行度。
    数据在两个 operator 之间传递的时候有两种模式:
    One to One 模式:两个 operator 用此模式传递的时候,会保持数据的分区数和数据的排序;如上图中的 Source1 到 Map1,它就保留的 Source 的分区特性,以及分区元素处理的有序性。

    Redistributing (重新分配)模式:这种模式会改变数据的分区数;每个一个 operator subtask 会根据选择 transformation 把数据发送到不同的目标 subtasks,比如 keyBy()会通过 hashcode 重新分区,broadcast()和 rebalance()方法会随机重新分区;

  • 2.3 Task 和 Operator chain流
    Flink的所有操作都称之为Operator,客户端在提交任务的时候会对Operator进行优化操作,能进行合并的Operator会被合并为一个Operator,合并后的Operator称为Operator chain,实际上就是一个执行链,每个执行链会在TaskManager上一个独立的线程中执行。
    在这里插入图片描述

  • 2.4 任务调度与执行
    在这里插入图片描述
    (1) 当Flink执行executor会自动根据程序代码生成DAG数据流图;

    (2) ActorSystem创建Actor将数据流图发送给JobManager中的Actor;
    (3) JobManager会不断接收TaskManager的心跳消息,从而可以获取到有效的TaskManager;

    (4) JobManager通过调度器在TaskManager中调度执行Task(在Flink中,最小的调度单元就是task,对应就是一个线程);

    (5) 在程序运行过程中,task与task之间是可以进行数据传输的。
    Flink 四大组件

    • 作业管理器(JobManager)
      • 控制一个应用程序执行的主进程,也就是说,每个应用程序都会被一个不同的JobManager 所控制执行。
      • JobManager 会先接收到要执行的应用程序,这个应用程序会包括:作业图(JobGraph)、逻辑数据流图(logical dataflow graph)和打包了所有的类、库和其它资源的JAR包。
      • JobManager 会把JobGraph转换成一个物理层面的数据流图,这个图被叫做“执行图”(ExecutionGraph),包含了所有可以并发执行的任务。
      • JobManager 会向资源管理器(ResourceManager)请求执行任务必要的资源,也就是任务管理器(TaskManager)上的插槽(slot)。一旦它获取到了足够的资源,就会将执行图分发到真正运行它们的 TaskManager上。而在运行过程中,JobManager会负责所有需要中央协调的操作,比如说检查点(checkpoints)的协调。
    • 任务管理器(TaskManager)
      • Flink中的工作进程。通常在Flink中会有多个TaskManager运行,每一个TaskManager都包含了一定数量的插槽(slots)。插槽的数量限制了TaskManager能够执行的任务数量。
      • 启动之后,TaskManager会向资源管理器注册它的插槽;收到资源管理器的指令后,TaskManager就会将一个或者多个插槽提供给JobManager调用。JobManager就可以向插槽分配任务(tasks)来执行了。
      • 在执行过程中,一个TaskManager可以跟其它运行同一应用程序的TaskManager交换数据。
    • 资源管理器(ResourceManager)
      • 主要负责管理任务管理器(TaskManager)的插槽(slot),TaskManger 插槽是Flink中定义的处理资源单元。
      • Flink为不同的环境和资源管理工具提供了不同资源管理器,比如YARN、Mesos、K8s,以及standalone部署。
      • 当JobManager申请插槽资源时,ResourceManager会将有空闲插槽的TaskManager分配给JobManager。如果ResourceManager没有足够的插槽来满足JobManager的请求,它还可以向资源提供平台发起会话,以提供启动TaskManager进程的容器。
    • 分发器(Dispatcher)
      • 可以跨作业运行,它为应用提交提供了REST接口。
      • 当一个应用被提交执行时,分发器就会启动并将应用移交给一个JobManager。
      • Dispatcher也会启动一个Web UI,用来方便地展示和监控作业执行的信息。
      • Dispatcher在架构中可能并不是必需的,这取决于应用提交运行的方式。

    参考:
    https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/learn-flink/overview/

2. 算子

Flink和Spark类似,也是一种一站式处理的框架;既可以进行批处理(DataSet),也可以进行实时处理(DataStream)。
将Flink的算子分为两大类:一类是DataSet,一类是DataStream。

  • 3.1 DataSet 批处理算子

    • Source算子

      • fromCollection
        从本地集合读取数据
        eg:
      val env = ExecutionEnvironment.getExecutionEnvironment
      val textDataSet: DataSet[String] = env.fromCollection(
      List("1,张三", "2,李四", "3,王五", "4,赵六")
      
      • readTextFile
        从文件中读取
        eg:
      val textDataSet: DataSet[String]  = env.readTextFile("/data/a.txt")
      
    • Transform转换算子
      基于Source算子操作

      • map
        将DataSet中的每一个元素转换为另外一个元素
        eg:
      // 使用map将List转换为一个Scala的样例类
      
      case class User(name: String, id: String)
       
      val userDataSet: DataSet[User] = textDataSet.map {
        text =>
          val fieldArr = text.split(",")
          User(fieldArr(0), fieldArr(1))
      }
      userDataSet.print()
      
      • flatMap
        将DataSet中的每一个元素转换为0…n个元素。
        eg:
      // 使用flatMap操作,将集合中的数据:
      // 根据第一个元素,进行分组
      // 根据第二个元素,进行聚合求值 
      val result = textDataSet.flatMap(line => line)
            .groupBy(0) // 根据第一个元素,进行分组
            .sum(1) // 根据第二个元素,进行聚合求值
            
      result.print()
      
      • mapPartition
        将一个分区中的元素转换为另一个元素
        eg:
      // 使用mapPartition操作,将List转换为一个scala的样例类
      case class User(name: String, id: String)
      val result: DataSet[User] = textDataSet.mapPartition(line => {
            line.map(index => User(index._1, index._2))
          })
      result.print()
      
      • filter
        过滤出来一些符合条件的元素,返回boolean值为true的元素
        eg:
      val source: DataSet[String] = env.fromElements("java", "scala", "java")
      val filter:DataSet[String] = source.filter(line => line.contains("java"))//过滤出带java的数据
      filter.print()
      
      • reduce
        可以对一个dataset或者一个group来进行聚合计算,最终聚合成一个元素
        eg:
      // 使用 fromElements 构建数据源
      val source = env.fromElements(("java", 1), ("scala", 1), ("java", 1))
      // 使用map转换成DataSet元组
      val mapData: DataSet[(String, Int)] = source.map(line => line)
      // 根据首个元素分组
      val groupData = mapData.groupBy(_._1)
      // 使用reduce聚合
      val reduceData = groupData.reduce((x, y) => (x._1, x._2 + y._2))
      // 打印测试
      reduceData.print()
      
      • reduceGroup
        将一个dataset或者一个group聚合成一个或多个元素。
        reduceGroup是reduce的一种优化方案;
        它会先分组reduce,然后在做整体的reduce;这样做的好处就是可以减少网络IO
        eg:
       // 使用 fromElements 构建数据源
      val source: DataSet[(String, Int)] = env.fromElements(("java", 1), ("scala", 1), ("java", 1))
      // 根据首个元素分组
      val groupData = source.groupBy(_._1)
      // 使用reduceGroup聚合
      val result: DataSet[(String, Int)] = groupData.reduceGroup {
            (in: Iterator[(String, Int)], out: Collector[(String, Int)]) =>
              val tuple = in.reduce((x, y) => (x._1, x._2 + y._2))
              out.collect(tuple)
          }
      // 打印测试
      result.print()
      
      • minBy和maxBy
        选择具有最小值或最大值的元素
        eg:
      // 使用minBy操作,求List中每个人的最小值
      // List("张三,1", "李四,2", "王五,3", "张三,4")
      case class User(name: String, id: String)
      // 将List转换为一个scala的样例类
      val text: DataSet[User] = textDataSet.mapPartition(line => {
            line.map(index => User(index._1, index._2))
          })
          
      val result = text
        .groupBy(0) // 按照姓名分组
        .minBy(1)   // 每个人的最小值
      
      • Aggregate
        在数据集上进行聚合求最值(最大值、最小值),只能作用于元组上
        eg:
      val data = new mutable.MutableList[(Int, String, Double)]
          data.+=((1, "yuwen", 89.0))
          data.+=((2, "shuxue", 92.2))
          data.+=((3, "yuwen", 89.99))
      // 使用 fromElements 构建数据源
      val input: DataSet[(Int, String, Double)] = env.fromCollection(data)
      // 使用group执行分组操作
      val value = input.groupBy(1)
                  // 使用aggregate求最大值元素
                  .aggregate(Aggregations.MAX, 2) 
      // 打印测试
      value.print()       
      
      • distinct
        去重
        eg:
      // 数据源使用上一题的
      // 使用distinct操作,根据科目去除集合中重复的元组数据
      val value: DataSet[(Int, String, Double)] = input.distinct(1)
      value.print()
      
      • first
        取前N个元素
        eg:
      input.first(2) // 取前两个数
      
      • join
        将两个DataSet按照一定条件连接到一起,形成新的DataSet
        eg:
      // s1 和 s2 数据集格式如下:
      // DataSet[(Int, String,String, Double)]
       
      val joinData = s1.join(s2)  // s1数据集 join s2数据集
                   .where(0).equalTo(0) {     // join的条件
            (s1, s2) => (s1._1, s1._2, s2._2, s1._3)
      }
      
      • leftOuterJoin
        左外连接,左边的Dataset中的每一个元素,去连接右边的元素
        此外还有:
        rightOuterJoin:右外连接,左边的Dataset中的每一个元素,去连接左边的元素
        fullOuterJoin:全外连接,左右两边的元素,全部连接
        leftOuterJoin eg:
       val data1 = ListBuffer[Tuple2[Int,String]]()
          data1.append((1,"zhangsan"))
          data1.append((2,"lisi"))
          data1.append((3,"wangwu"))
          data1.append((4,"zhaoliu"))
       
      val data2 = ListBuffer[Tuple2[Int,String]]()
          data2.append((1,"beijing"))
          data2.append((2,"shanghai"))
          data2.append((4,"guangzhou"))
       
      val text1 = env.fromCollection(data1)
      val text2 = env.fromCollection(data2)
       
      text1.leftOuterJoin(text2).where(0).equalTo(0).apply((first,second)=>{
            if(second==null){
              (first._1,first._2,"null")
            }else{
              (first._1,first._2,second._2)
            }
          }).print()
      
      • cross
        交叉操作,通过形成这个数据集和其他数据集的笛卡尔积,创建一个新的数据集
        和join类似,但是这种交叉操作会产生笛卡尔积,在数据比较大的时候,是非常消耗内存的操作
        eg:
      val cross = input1.cross(input2){
            (input1 , input2) => (input1._1,input1._2,input1._3,input2._2)
          }
      cross.print()
      
      • union
        联合操作,创建包含来自该数据集和其他数据集的元素的新数据集,不会去重
        eg:
      val unionData: DataSet[String] = elements1.union(elements2).union(elements3)
      // 去除重复数据
      val value = unionData.distinct(line => line)
      
      • rebalance
        数据均衡,解决数据倾斜问题
        eg:
      // 使用rebalance操作,避免数据倾斜
      val rebalance = filterData.rebalance()
      
      • partitionByHash
        按照指定的key进行hash分区
        eg:
      val data = new mutable.MutableList[(Int, Long, String)]
      data.+=((1, 1L, "Hi"))
      data.+=((2, 2L, "Hello"))
      data.+=((3, 2L, "Hello world"))
       
      val collection = env.fromCollection(data)
      val unique = collection.partitionByHash(1).mapPartition{
        line =>
          line.map(x => (x._1 , x._2 , x._3))
      }
       
      unique.writeAsText("hashPartition", WriteMode.NO_OVERWRITE)
      env.execute()
      
      • partitionByRange
        根据指定的key对数据集进行范围分区
        eg:
      val data = new mutable.MutableList[(Int, Long, String)]
      data.+=((1, 1L, "Hi"))
      data.+=((2, 2L, "Hello"))
      data.+=((3, 2L, "Hello world"))
      data.+=((4, 3L, "Hello world, how are you?"))
       
      val collection = env.fromCollection(data)
      val unique = collection.partitionByRange(x => x._1).mapPartition(line => line.map{
        x=>
          (x._1 , x._2 , x._3)
      })
      unique.writeAsText("rangePartition", WriteMode.OVERWRITE)
      env.execute()
      
      • sortPartition
        根据指定的字段值进行分区的排序
        eg:
       val data = new mutable.MutableList[(Int, Long, String)]
      data.+=((1, 1L, "Hi"))
      data.+=((2, 2L, "Hello"))
      data.+=((3, 2L, "Hello world"))
      data.+=((4, 3L, "Hello world, how are you?"))
      
      val ds = env.fromCollection(data)
      val result = ds
        .map { x => x }.setParallelism(2)
        .sortPartition(1, Order.DESCENDING)//第一个参数代表按照哪个字段进行分区
        .mapPartition(line => line)
        .collect()
      
      println(result)
      
    • Sink算子

      • collect
        将数据输出到本地集合
        eg:
      result.collect()
      
      • writeAsText
        将数据输出到文件
        Flink支持多种存储设备上的文件,包括本地文件,hdfs文件等
        Flink支持多种文件的存储格式,包括text文件,CSV文件等
        eg:
      // 将数据写入本地文件
      result.writeAsText("/data/a", WriteMode.OVERWRITE)
       
      // 将数据写入HDFS
      result.writeAsText("hdfs://node01:9000/data/a", WriteMode.OVERWRITE)
      
  • 3.2 DataStream流处理算子

    • Source算子
      Flink可以使用 StreamExecutionEnvironment.addSource(source) 来为我们的程序添加数据来源。
      Flink在流处理上的source和在批处理上的source基本一致,大约有四大类:基于本地集合的source、基于文件的source、基于socket的source、自定义的source。
      Kafka数据写入Flink eg:
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("group.id", "consumer-group")
    properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    properties.setProperty("auto.offset.reset", "latest")
     
    val source = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema(), properties))
    
    • Transform 转换算子

      • Map
        将DataStream中的每一个元素转换为另外一个元素
        eg:
      dataStream.map { x => x * 2 }
      
      • FlatMap
        采用一个数据元并生成零个,一个或多个数据元。将句子分割为单词的flatmap函数
        eg:
      dataStream.flatMap { str => str.split(" ") }
      
      • Filter
        计算每个数据元的布尔函数,并保存函数返回true的数据元。过滤掉零值的过滤器
        eg:
      dataStream.filter { _ != 0 }
      
      • KeyBy
        逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,keyBy()是使用散列分区实现的。指定键有不同的方法。
        此转换返回KeyedStream,其中包括使用被Keys化状态所需的KeyedStream。
        eg:
      dataStream.keyBy(0) 
      
      • Reduce
        被Keys化数据流上的“滚动”Reduce。将当前数据元与最后一个Reduce的值组合并发出新值
        eg:
      keyedStream.reduce { _ + _ }  
      
      • Fold
        具有初始值的被Keys化数据流上的“滚动”折叠。将当前数据元与最后折叠的值组合并发出新值
        eg:
      val result: DataStream[String] =  keyedStream.fold("start")((str, i) => { str + "-" + i }) 
      
      • Aggregations
        在被Keys化数据流上滚动聚合。min和minBy之间的差异是min返回最小值,而minBy返回该字段中具有最小值的数据元(max和maxBy相同)。
        eg:
      keyedStream.sum(0);
      keyedStream.min(0);
      keyedStream.max(0);
      keyedStream.minBy(0);
      keyedStream.maxBy(0);
      
      • Window
        可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个Keys中的数据进行分组
        eg:
      dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); 
      
      • WindowAll
        Windows可以在常规DataStream上定义。Windows根据某些特征(例如,在最后5秒内到达的数据)对所有流事件进行分组。
        注意:在许多情况下,这是非并行转换。所有记录将收集在windowAll 算子的一个任务中。
        eg:
      dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
      
      • Window Apply
        将一般函数应用于整个窗口。
        注意:如果您正在使用windowAll转换,则需要使用AllWindowFunction。
        下面是一个手动求和窗口数据元的函数
        eg:
      windowedStream.apply { WindowFunction }
      allWindowedStream.apply { AllWindowFunction }
      
      • Window Reduce
        将函数缩减函数应用于窗口并返回缩小的值
        eg:
      windowedStream.reduce { _ + _ }
      
      • Window Fold
        将函数折叠函数应用于窗口并返回折叠值
        eg:
      val result: DataStream[String] = windowedStream.fold("start", (str, i) => { str + "-" + i }) 
      
      • Window Join
        在给定Keys和公共窗口上连接两个数据流
        eg:
      dataStream.join(otherStream)
      .where(<key selector>).equalTo(<key selector>)
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))
      .apply (new JoinFunction () {...})
      
      • Union
        两个或多个数据流的联合,创建包含来自所有流的所有数据元的新流。注意:如果将数据流与自身联合,则会在结果流中获取两次数据元
        eg:
      dataStream.union(otherStream1, otherStream2, ...)
      
      • Interval Join
        在给定的时间间隔内使用公共Keys关联两个被Key化的数据流的两个数据元e1和e2,以便e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
        eg:
      dataStream.intervalJoin(otherKeyedStream)
      .between(Time.milliseconds(-2), Time.milliseconds(2)) 
      .upperBoundExclusive(true) 
      .lowerBoundExclusive(true) 
      .process(new IntervalJoinFunction() {...})
      
      • Window CoGroup
        在给定Keys和公共窗口上对两个数据流进行Cogroup
        eg:
      dataStream.coGroup(otherStream)
      .where(0).equalTo(1)
      .window(TumblingEventTimeWindows.of(Time.seconds(3)))
      .apply (new CoGroupFunction () {...})
      
      • Connect
        “连接”两个保存其类型的数据流。连接允许两个流之间的共享状态
        eg:
      DataStream<Integer> someStream = ... DataStream<String> otherStream = ... ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream)
      // ... 代表省略中间操作
      
      • CoMap,CoFlatMap
        类似于连接数据流上的map和flatMap
        eg:
      connectedStreams.map(
      (_ : Int) => true,
      (_ : String) => false)connectedStreams.flatMap(
      (_ : Int) => true,
      (_ : String) => false)
      
      • Split
        根据某些标准将流拆分为两个或更多个流
        eg:
      val split = someDataStream.split(
        (num: Int) =>
          (num % 2) match {
            case 0 => List("even")
            case 1 => List("odd")
          })      
      
      • Select
        从拆分流中选择一个或多个流
        eg:
      SplitStream<Integer> split;DataStream<Integer> even = split.select("even");DataStream<Integer> odd = split.select("odd");DataStream<Integer> all = split.select("even","odd")
      
    • Sink算子
      支持将数据输出到:
      本地文件、本地集合、HDFS (参考批处理)
      除此之外,还支持:
      sink到kafka、sink到mysql、sink到redis

    参考:
    dataset api: https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/dataset/overview/
    datastream api:
    https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/overview/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/33500.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Springboot 集成Prometheus 数据采集 使用grafana 监控报告告警 邮件配置

目录 Springboot 相关 Pom 重点包 如果有需要可以增加安全包-一般内部机房没啥事-&#xff08;非必选&#xff09; Application.yml配置文件-&#xff08;非必选&#xff09; Application.properties management.endpoints.web.exposure.include介绍 启动类 查看监控信…

用于语义图像分割的弱监督和半监督学习:弱监督期望最大化方法

这时一篇2015年的论文&#xff0c;但是他却是最早提出在语义分割中使用弱监督和半监督的方法&#xff0c;SAM的火爆证明了弱监督和半监督的学习方法也可以用在分割上。 这篇论文只有图像级标签或边界框标签作为弱/半监督学习的输入。使用期望最大化(EM)方法&#xff0c;用于弱…

【Solr】中文分词配置

提示&#xff1a;在设置中文分词前需确保已经生成过core&#xff0c;未生成core的可以使用&#xff1a;solr create -c "自定义名称"进行定义。 未分词前的效果预览&#xff1a; 下载分词器&#xff1a; 下载地址: https://mvnrepository.com/artifact/com.github.m…

Spring Cloud 之注册中心 Eureka 精讲

&#x1f353; 简介&#xff1a;java系列技术分享(&#x1f449;持续更新中…&#x1f525;) &#x1f353; 初衷:一起学习、一起进步、坚持不懈 &#x1f353; 如果文章内容有误与您的想法不一致,欢迎大家在评论区指正&#x1f64f; &#x1f353; 希望这篇文章对你有所帮助,欢…

nginx配置开机启动(Windows环境)

文章目录 1、下载nginx&#xff0c;并解压2、配置nginx.conf&#xff0c;并启动Nginx3、开机自启动 1、下载nginx&#xff0c;并解压 2、配置nginx.conf&#xff0c;并启动Nginx 两种方法&#xff1a; 方法一&#xff1a;直接双击nginx.exe&#xff0c;双击后一个黑色弹窗一闪…

ELK日志收集系统集群实验

ELK日志收集系统集群实验 目录 一、实验拓扑 二、环境配置 三、 安装node1与node2节点的elasticsearch 1. 安装 2.配置 3.启动elasticsearch服务 4.查看节点信息 四、在node1安装elasticsearch-head插件 1.安装node 2.拷贝命令 3.安装elasticsearch-head 4.修改el…

【机器学习】十大算法之一 “PCA”

作者主页&#xff1a;爱笑的男孩。的博客_CSDN博客-深度学习,活动,python领域博主爱笑的男孩。擅长深度学习,活动,python,等方面的知识,爱笑的男孩。关注算法,python,计算机视觉,图像处理,深度学习,pytorch,神经网络,opencv领域.https://blog.csdn.net/Code_and516?typeblog个…

【夜深人静学数据结构与算法 | 第十一篇】枚举算法

目录 前言&#xff1a; 枚举算法&#xff1a; 优点&#xff1a; 枚举算法的种类&#xff1a; 枚举算法案例&#xff1a; 343. 整数拆分 - 力扣&#xff08;LeetCode&#xff09; 12. 整数转罗马数字 - 力扣&#xff08;LeetCode&#xff09; 总结&#xff1a; 前言&…

【手撕算法|动态规划系列No.1】leetcode1137. 第 N 个泰波那契数

个人主页&#xff1a;平行线也会相交 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 平行线也会相交 原创 收录于专栏【手撕算法系列专栏】【LeetCode】 &#x1f354;本专栏旨在提高自己算法能力的同时&#xff0c;记录一下自己的学习过程&#xff0c;希望…

exe的python文件打包

【步骤01】 【在命令行中用pip工具安装Pyinstaller模块】 pip install Pyinstaller 步骤02】 【切换命令行的路径到你要打包的Python源文件的文件夹路径下】 【下面是我要打包的Python源文件&#xff08;散点坐标图.py&#xff09;及其文件夹路径】 【步骤03】 【执行Pyi…

使用SSH远程直连Docker容器

文章目录 1. 下载docker镜像2. 安装ssh服务3. 本地局域网测试4. 安装cpolar5. 配置公网访问地址6. SSH公网远程连接测试7.固定连接公网地址8. SSH固定地址连接测试 转载自cpolar极点云文章&#xff1a;SSH远程直连Docker容器 在某些特殊需求下,我们想ssh直接远程连接docker 容器…

SpringBoot 实现 elasticsearch 查询操作(RestHighLevelClient 的案例实战)

文章目录 1. 环境准备1. 查询全部2. 根据 name 查询 match 分词查询3. 根据 name 和 品牌查询 multiMatch 分词查询4. 根据 brand 查询 match 分词查询5. 按照价格 范围查询6. 精确查询7. boolQuery8. 分页9. 高亮查询9. 公共解析 上一节讲述了 SpringBoot 实现 elasticsearch …

【图像处理OpenCV(C++版)】——5.3 图像平滑之均值平滑(滤波)

前言&#xff1a; &#x1f60a;&#x1f60a;&#x1f60a;欢迎来到本博客&#x1f60a;&#x1f60a;&#x1f60a; &#x1f31f;&#x1f31f;&#x1f31f; 本专栏主要结合OpenCV和C来实现一些基本的图像处理算法并详细解释各参数含义&#xff0c;适用于平时学习、工作快…

Linux终端与进程的关系 ( 1 ) -【Linux通信架构系列】

系列文章目录 C技能系列 Linux通信架构系列 C高性能优化编程系列 深入理解软件架构设计系列 高级C并发线程编程 期待你的关注哦&#xff01;&#xff01;&#xff01; 现在的一切都是为将来的梦想编织翅膀&#xff0c;让梦想在现实中展翅高飞。 Now everything is for the…

C高级重点

1、请简要描述一下Linux文件系统的层级结构&#xff0c;包括不同目录的作用和功能。 Linux的文件系统结构是一个倒插树结构&#xff0c;所有的文件都从根目录出发。 2、find指令的用途 find 查找的路径 -name 文件名 ----->在指定路径下&#xff0c;以文件名为条件查找文…

总结vue3 的一些知识点:​Vue3 起步

目录 引言 Vue3 混入 实例 选项合并 实例 实例 全局混入 实例 Vue3 起步 Vue 3.0 实例 data 选项 实例 方法 总结 引言 Vue 进阶系列教程将在本号持续发布&#xff0c;一起查漏补缺学个痛快&#xff01;若您有遇到其它相关问题&#xff0c;非常欢迎在评论中留言讨…

Ubuntu 20.04.02 LTS安装virtualbox7.0

ubuntu22.04的软件仓库也有virtualbox&#xff0c;不过版本较老。 使用安装命令&#xff1a;sudo apt install virtualbox 如果想要安装最新版&#xff0c;那么需要去官网下载deb包或者使用官方的仓库。 这里采用安装Oracle官方仓库的方法。 执行如下命令&#xff1a; wge…

HTTP调用:你考虑到超时、重试、并发了吗?

今天&#xff0c;我们一起聊聊进行 HTTP 调用需要注意的超时、重试、并发等问题。 与执行本地方法不同&#xff0c;进行 HTTP 调用本质上是通过 HTTP 协议进行一次网络请求。网络请求必然有超时的可能性&#xff0c;因此我们必须考虑到这三点&#xff1a; 首先&#xff0c;框架…

抖音本地生活团购服务商

抖音本地生活团购服务商市场前景非常广阔。随着移动互联网的普及和人们对本地生活服务需求的增加&#xff0c;本地生活团购行业已成为一个快速增长的市场。而抖音平台拥有庞大的用户基础和强大的社交媒体传播力&#xff0c;为本地生活团购服务商提供了巨大的发展机遇。 抖音…

【博客674】警惕Prometheus 中的重复样本和无序时间戳错误

警惕Prometheus 中的重复样本和无序时间戳错误 1、场景 您的 Prometheus 服务器日志中是否遇到过以下错误&#xff1f; "Error on ingesting out-of-order samples" "Error on ingesting samples with different value but same timestamp" "dupli…