Flink Windows(窗口)详解

Windows(窗口)

Windows是流计算的核心。Windows将流分成有限大小的“buckets”,我们可以在其上应用聚合计算(ProcessWindowFunctionReduceFunctionAggregateFunctionFoldFunction)等。在Flink中编写一个窗口计算的基本结构如下:

Keyed Windows

stream
    .keyBy(...)                
    .window(...)               <-  必须制定: 窗口类型
    [.trigger(...)]            <-  可选: "trigger" (都有默认 触发器),决定窗口什么时候触发
    [.evictor(...)]            <-  可选: "evictor" (默认 没有剔出),剔出窗口中的元素
    [.allowedLateness(...)]    <-  可选: "lateness" (默认 0),不允许又迟到的数据
    [.sideOutputLateData(...)] <-  可选: "output tag" 将迟到的数据输出到 指定流中
    .reduce/aggregate/fold/apply()  <-  必须指定: "function",实现对窗口数据的聚合计算
    [.getSideOutput(...)]      <-  可选: "output tag" 获取Sideout的数据,一般处理迟到数据

Non-Keyed Windows

stream
    .windowAll(...)            <-  必须制定: 窗口类型
    [.trigger(...)]            <-  可选: "trigger" (都有默认 触发器),决定窗口什么时候触发
    [.evictor(...)]            <-  可选: "evictor" (默认 没有剔出),剔出窗口中的元素
    [.allowedLateness(...)]    <-  可选: "lateness" (默认 0),不允许又迟到的数据
    [.sideOutputLateData(...)] <-  可选: "output tag" 将迟到的数据输出到 指定流中
    .reduce/aggregate/fold/apply()  <-  必须指定: "function",实现对窗口数据的聚合计算
    [.getSideOutput(...)]      <-  可选: "output tag" 获取Sideout的数据,一般处理迟到数据

Window Lifecycle(生命周期)

In a nutshell, a window is created as soon as the first element that should belong to this window arrives, and the window is completely removed when the time (event or processing time) passes its end timestamp plus the user-specified allowed lateness (see Allowed Lateness). Flink guarantees removal only for time-based windows and not for other types, e.g. global windows (see Window Assigners).

in addition, each window will have a Trigger (see Triggers) and a function (ProcessWindowFunction, ReduceFunction,AggregateFunction or FoldFunction) (see Window Functions) attached to it. The function will contain the computation to be applied to the contents of the window, while the Trigger specifies the conditions under which the window is considered ready for the function to be applied.

Apart from the above, you can specify an Evictor (see Evictors) which will be able to remove elements from the window after the trigger fires and before and/or after the function is applied.

Window Assigners(窗口分配器)

The window assigner defines how elements are assigned to windows. This is done by specifying the WindowAssigner of your choice in the window(...) (for keyedstreams) or the windowAll() (for non-keyed streams) call.

A WindowAssigner is responsible for assigning each incoming element to one or more windows. Flink comes with pre-defined window assigners for the most common use cases, namely tumbling windows, sliding windows, session windows and global windows. You can also implement a custom window assigner by extending the WindowAssigner class. All built-in window assigners (except the global windows) assign elements to windows based on time, which can either be processing time or event time.

Tumbling Windows

滚动窗口长度固定,滑动间隔等于窗口长度,窗口元素之间没有交叠。

在这里插入图片描述

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce((v1,v2)=>(v1._1,v1._2+v2._2))
.print()
env.execute("window")

Sliding Windows

滑动窗口长度固定,窗口长度大于窗口滑动间隔,元素存在交叠。

在这里插入图片描述

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(SlidingProcessingTimeWindows.of(Time.seconds(4),Time.seconds(2)))
.process(new ProcessWindowFunction[(String,Int),String,String,TimeWindow]{
    override def process(key: String, context: Context,
                         elements: Iterable[(String, Int)],
                         out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        val window = context.window
        println(sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd))
        for(e <- elements){
            print(e+"\t")
        }
        println()
    }
})
env.execute("window")

Session Windows(MergerWindow)

通过计算元素时间间隔,如果间隔小于session gap,则会合并到一个窗口中;如果大于时间间隔,当前窗口关闭,后续的元素属于新的窗口。与滚动窗口和滑动窗口不同的是会话窗口没有固定的窗口大小,底层本质上做的是窗口合并。

在这里插入图片描述

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
.apply(new WindowFunction[(String,Int),String,String,TimeWindow]{
    override def apply(key: String, window: TimeWindow, input: Iterable[(String, Int)], out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        println(sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd))
        for(e<- input){
            print(e+"\t")
        }
        println()
    }
})
env.execute("window")

Global Windows

全局窗口会将所有key相同的元素放到一个窗口中,默认该窗口永远都不会关闭(永远都不会触发),因为该窗口没有默认的窗口触发器Trigger,因此需要用户自定义Trigger。
在这里插入图片描述

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(GlobalWindows.create())
.trigger(CountTrigger.of[GlobalWindow](3))
.apply(new WindowFunction[(String,Int),String,String,GlobalWindow]{
    override def apply(key: String, window: GlobalWindow, input: Iterable[(String, Int)], out: Collector[String]): Unit = {
        println("=======window========")
        for(e<- input){
            print(e+"\t")
        }
        println()
    }
})
env.execute("window")

Window Functions

当系统认定窗口就绪之后会调用Window Functions对窗口实现聚合计算。常见的Window Functions有以下形式: ReduceFunction, AggregateFunction, FoldFunction 或者ProcessWindowFunction|WindowFunction(古董|旧版)

ReduceFunction

class SumReduceFunction extends ReduceFunction[(String,Int)]{
  override def reduce(v1: (String, Int), v2: (String, Int)): (String, Int) = {
    (v1._1,v1._2+v2._2)
  }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.reduce(new SumReduceFunction)// .reduce((v1,v2)=>(v1._1,v1._2+v2._2))
.print()
env.execute("window")

AggregateFunction

class SumAggregateFunction extends AggregateFunction[(String,Int),(String,Int),(String,Int)]{
  override def createAccumulator(): (String,Int) = {
    ("",0)
  }
  override def merge(a: (String,Int), b: (String,Int)): (String,Int) = {
    (a._1,a._2+b._2)
  }
  override def add(value: (String, Int), accumulator: (String,Int)): (String,Int) = {
    (value._1,accumulator._2+value._2)
  }
  override def getResult(accumulator: (String,Int)): (String, Int) = {
    accumulator
  }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("CentOS",9999)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new SumAggregateFunction)
.print()
env.execute("window")

FoldFunction

class SumFoldFunction  extends  FoldFunction[(String,Int),(String,Long)]{
  override def fold(accumulator: (String, Long), value: (String, Int)): (String, Long) = {
    (value._1,accumulator._2+value._2)
  }
}
var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",8877)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
//.fold(("",0L),new SumFoldFunction)
.fold(("",0L))((acc,v)=>(v._1,acc._2+v._2))
.print()
env.execute("window")

ProcessWindowFunction

var env=StreamExecutionEnvironment.getExecutionEnvironment
env.socketTextStream("centos",7788)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1)
.window(SlidingProcessingTimeWindows.of(Time.seconds(4),Time.seconds(2)))
.process(new ProcessWindowFunction[(String,Int),(String,Int),String,TimeWindow]{
    override def process(key: String, context: Context,
                         elements: Iterable[(String, Int)],
                         out: Collector[(String,Int)]): Unit = {
        val results = elements.reduce((v1,v2)=>(v1._1,v1._2+v2._2))
        out.collect(results)
    }
}).print()
env.execute("window")

globalState() | windowState()

  • globalState(), which allows access to keyed state that is not scoped to a window
  • windowState(), which allows access to keyed state that is also scoped to the window
var env=StreamExecutionEnvironment.getExecutionEnvironment

val globalTag = new OutputTag[(String,Int)]("globalTag")

val countsStream = env.socketTextStream("centos", 7788)
.flatMap(_.split("\\s+"))
.map((_, 1))
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(2)))
.process(new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
    var wvds: ValueStateDescriptor[Int] = _
    var gvds: ValueStateDescriptor[Int] = _

    override def open(parameters: Configuration): Unit = {
        wvds = new ValueStateDescriptor[Int]("window-value", createTypeInformation[Int])
        gvds = new ValueStateDescriptor[Int]("global-value", createTypeInformation[Int])
    }

    override def process(key: String, context: Context,
                         elements: Iterable[(String, Int)],
                         out: Collector[(String, Int)]): Unit = {
        val total = elements.map(_._2).sum
        val ws = context.windowState.getState(wvds)
        val gs=context.globalState.getState(gvds)
        val historyWindowValue = ws.value()
        val historyGlobalValue = gs.value()
        out.collect((key, historyWindowValue + total))
        context.output(globalTag, (key, historyGlobalValue + total))
        ws.update(historyWindowValue + total)
        gs.update(historyGlobalValue + total)
    }
})
countsStream.print("窗口统计")
countsStream.getSideOutput(globalTag).print("全局输出")
env.execute("window")

ReduceFunction+ProcessWindowFunction

var env=StreamExecutionEnvironment.getExecutionEnvironment

val globalTag = new OutputTag[(String,Int)]("globalTag")

val countsStream = env.socketTextStream("centos", 7788)
.flatMap(_.split("\\s+"))
.map((_, 1))
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(2)))
.reduce(new SumReduceFunction,new ProcessWindowFunction[(String, Int), (String, Int), String, TimeWindow] {
    override def process(key: String, context: Context,
                         elements: Iterable[(String, Int)],
                         out: Collector[(String, Int)]): Unit = {
        val total = elements.map(_._2).sum
        out.collect((key, total))
    }
})
countsStream.print("窗口统计")
countsStream.getSideOutput(globalTag).print("全局输出")
env.execute("window")
var env=StreamExecutionEnvironment.getExecutionEnvironment
val countsStream = env.socketTextStream("centos", 7788)
.flatMap(_.split("\\s+"))
.map((_, 1))
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(4), Time.seconds(2)))
.fold(("",0L),new SumFoldFunction,new ProcessWindowFunction[(String, Long), (String, Long), String, TimeWindow] {
    override def process(key: String, context: Context,
                         elements: Iterable[(String, Long)],
                         out: Collector[(String, Long)]): Unit = {
        val total = elements.map(_._2).sum
        out.collect((key, total))
    }
}).print()
env.execute("window")

WindowFunction(不常用)

遗产或古董,一般用ProcessWindowFunction替代。

In some places where a ProcessWindowFunction can be used you can also use a WindowFunction. This is an older version of ProcessWindowFunction that provides less contextual information and does not have some advances features, such as per-window keyed state. This interface will be deprecated at some point.

env.socketTextStream("centos",7788)
.flatMap(_.split("\\s+"))
.map((_,1))
.keyBy(_._1) //不能按照position进行keyBy()
.window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
.apply(new WindowFunction[(String,Int),(String,Int),String,TimeWindow] {
    override def apply(key: String,
                       window: TimeWindow,
                       input: Iterable[(String, Int)],
                       out: Collector[(String, Int)]): Unit = {
        out.collect((key,input.map(_._2).sum))
    }
}).print()
env.execute("window")

Triggers(触发器)

A Trigger determines when a window (as formed by the window assigner) is ready to be processed by the window function. Each WindowAssigner comes with a default Trigger. If the default trigger does not fit your needs, you can specify a custom trigger using trigger(...).

WindowAssigners触发器
global windowNeverTrigger
event-time windowEventTimeTrigger
processing-time windowProcessingTimeTrigger

The trigger interface has five methods that allow a Trigger to react to different events:

  • The onElement() method is called for each element that is added to a window.
  • The onEventTime() method is called when a registered event-time timer fires.
  • The onProcessingTime() method is called when a registered processing-time timer fires.
  • The onMerge() method is relevant for stateful triggers and merges the states of two triggers when their corresponding windows merge, e.g. when using session windows.
  • Finally the clear() method performs any action needed upon removal of the corresponding window.

DeltaTrigger

var env=StreamExecutionEnvironment.getExecutionEnvironment

val deltaTrigger =  DeltaTrigger.of[(String,Double),GlobalWindow](2.0,new DeltaFunction[(String,Double)] {
    override def getDelta(oldDataPoint: (String, Double), newDataPoint: (String, Double)): Double = {
        newDataPoint._2-oldDataPoint._2
    }
},createTypeInformation[(String,Double)].createSerializer(env.getConfig))

env.socketTextStream("centos",7788)
.map(_.split("\\s+"))
.map(ts=>(ts(0),ts(1).toDouble))
.keyBy(0)
.window(GlobalWindows.create())
.trigger(deltaTrigger)
.reduce((v1:(String,Double),v2:(String,Double))=>(v1._1,v1._2+v2._2))
.print()
env.execute("window")

evictor(剔出)

The evictor has the ability to remove elements from a window after the trigger fires and before and/or after the window function is applied. To do so, the Evictor interface has two methods:

public interface Evictor<T, W extends Window> extends Serializable {
	void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
    void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

}

ErrorEvitor

class ErrorEvictor(isBefore:Boolean) extends Evictor[String,TimeWindow] {
    override def evictBefore(elements: lang.Iterable[TimestampedValue[String]], size: Int, window: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
        if(isBefore){
            evictor(elements,size,window,evictorContext)
        }
    }

    override def evictAfter(elements: lang.Iterable[TimestampedValue[String]], size: Int, window: TimeWindow, evictorContext: Evictor.EvictorContext): Unit = {
        if(!isBefore){
            evictor(elements,size,window,evictorContext)
        }
    }
    private def evictor(elements: lang.Iterable[TimestampedValue[String]], size: Int, window: TimeWindow, evictorContext: Evictor.EvictorContext): Unit={
        val iterator = elements.iterator()
        while(iterator.hasNext){
            val it = iterator.next()
            if(it.getValue.contains("error")){//将 含有error数据剔出
                iterator.remove()
            }
        }
    }
}
var fsEnv=StreamExecutionEnvironment.getExecutionEnvironment

fsEnv.socketTextStream("CentOS",7788)
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.evictor(new ErrorEvictor(true))
.apply(new AllWindowFunction[String,String,TimeWindow] {
    override def apply(window: TimeWindow, input: Iterable[String], out: Collector[String]): Unit = {
        for(e <- input){
            out.collect(e)
        }
        print()
    }
})
.print()

fsEnv.execute("window")

Event Time

Flink在做窗口计算的时候支持以下语义的window:Processing timeEvent timeIngestion time

Processing time:使用处理节点时间,计算窗口

Event time:使用事件产生时间,计算窗口- 精确

Ingestion time:数据进入到Flink的时间,一般是通过SourceFunction指定时间

在这里插入图片描述

默认Flink使用的是ProcessingTime ,因此一般情况下如果用户需要使用 Event time/Ingestion time需要设置时间属性

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//window  操作
fsEnv.execute("event time")

一旦设置基于EventTime处理,用户必须声明水位线的计算策略,系统需要给每一个流计算出水位线时间T,只有窗口的end time T’ < = watermarker(T)的时候,窗口才会被触发。在Flink当中需要用户实现水位线计算的方式,系统并不提供实现。触发水位线的计算方式有两种:①一种是基于定时Interval(推荐)、②通过记录触发,每来一条记录系统会立即更新水位线。

定时

class AccessLogAssignerWithPeriodicWatermarks extends AssignerWithPeriodicWatermarks[AccessLog]{
  private var maxSeeTime:Long=0L
  private var maxOrderness:Long=2000L
  override def getCurrentWatermark: Watermark = {
    return  new Watermark(maxSeeTime-maxOrderness)
  }

  override def extractTimestamp(element: AccessLog, previousElementTimestamp: Long): Long = {
    maxSeeTime=Math.max(maxSeeTime,element.timestamp)
    element.timestamp
  }
}

基于记录

class AccessLogAssignerWithPunctuatedWatermarks extends AssignerWithPunctuatedWatermarks[AccessLog]{
  private var maxSeeTime:Long=0L
  private var maxOrderness:Long=2000L
  override def checkAndGetNextWatermark(lastElement: AccessLog, extractedTimestamp: Long): Watermark = {
    new Watermark(maxSeeTime-maxOrderness)
  }

  override def extractTimestamp(element: AccessLog, previousElementTimestamp: Long): Long = { 
    maxSeeTime=Math.max(maxSeeTime,element.timestamp)
    element.timestamp
  }
}

Watermarker

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)//设置水位线定期计算频率 1s/每次
fsEnv.setParallelism(1)
//模块信息 时间
fsEnv.socketTextStream("CentOS",8888)
.map(line=> line.split("\\s+"))
.map(ts=>AccessLog(ts(0),ts(1).toLong))
.assignTimestampsAndWatermarks(new AccessLogAssignerWithPeriodicWatermarks)
.keyBy(accessLog=>accessLog.channel)
.window(TumblingEventTimeWindows.of(Time.seconds(4)))
.process(new ProcessWindowFunction[AccessLog,String,String,TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[AccessLog], out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        val window = context.window
        val currentWatermark = context.currentWatermark
        println("window:"+sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd)+" \t watermarker:"+sdf.format(currentWatermark))
        for(e<-elements){
            val AccessLog(channel:String,timestamp:Long)=e
            out.collect(channel+"\t"+sdf.format(timestamp))
        }
    }
})
.print()

迟到数据处理

Flink支持对迟到数据处理,如果watermaker - window end < allow late time 记录可以参与窗口计算,否则Flink将too late数据丢弃。

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)//设置水位线定期计算频率 1s/每次
fsEnv.setParallelism(1)
//模块信息 时间
fsEnv.socketTextStream("CentOS",8888)
.map(line=> line.split("\\s+"))
.map(ts=>AccessLog(ts(0),ts(1).toLong))
.assignTimestampsAndWatermarks(new AccessLogAssignerWithPeriodicWatermarks)
.keyBy(accessLog=>accessLog.channel)
.window(TumblingEventTimeWindows.of(Time.seconds(4)))
.allowedLateness(Time.seconds(2))
.process(new ProcessWindowFunction[AccessLog,String,String,TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[AccessLog], out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        val window = context.window
        val currentWatermark = context.currentWatermark
        println("window:"+sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd)+" \t watermarker:"+sdf.format(currentWatermark))
        for(e<-elements){
            val AccessLog(channel:String,timestamp:Long)=e
            out.collect(channel+"\t"+sdf.format(timestamp))
        }
    }
})
.print()

fsEnv.execute("event time")

Flink默认对too late数据采取的是丢弃,如果用户想拿到过期的数据,可以使用sideout方式

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)//设置水位线定期计算频率 1s/每次
fsEnv.setParallelism(1)

val lateTag = new OutputTag[AccessLog]("latetag")
//模块信息 时间
val keyedWindowStream=fsEnv.socketTextStream("CentOS",8888)
.map(line=> line.split("\\s+"))
.map(ts=>AccessLog(ts(0),ts(1).toLong))
.assignTimestampsAndWatermarks(new AccessLogAssignerWithPeriodicWatermarks)
.keyBy(accessLog=>accessLog.channel)
.window(TumblingEventTimeWindows.of(Time.seconds(4)))
.allowedLateness(Time.seconds(2))
.sideOutputLateData(lateTag)
.process(new ProcessWindowFunction[AccessLog,String,String,TimeWindow] {
    override def process(key: String, context: Context, elements: Iterable[AccessLog], out: Collector[String]): Unit = {
        val sdf = new SimpleDateFormat("HH:mm:ss")
        val window = context.window
        val currentWatermark = context.currentWatermark
        println("window:"+sdf.format(window.getStart)+"\t"+sdf.format(window.getEnd)+" \t watermarker:"+sdf.format(currentWatermark))
        for(e<-elements){
            val AccessLog(channel:String,timestamp:Long)=e
            out.collect(channel+"\t"+sdf.format(timestamp))
        }
    }
})

keyedWindowStream.print("正常:")
keyedWindowStream.getSideOutput(lateTag).print("too late:")

fsEnv.execute("event time")

当流中存在多个水位线,系统在计算的时候取最低。

Joining

Window Join

基本语法

stream.join(otherStream)
    .where(<KeySelector>)
    .equalTo(<KeySelector>)
    .window(<WindowAssigner>)
    .apply(<JoinFunction>)

Tumbling Window Join

在这里插入图片描述

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)
fsEnv.setParallelism(1)
//001 zhangsan 1571627570000
val userStream = fsEnv.socketTextStream("CentOS",7788)
.map(line=>line.split("\\s+"))
.map(ts=>User(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPeriodicWatermarks)
.setParallelism(1)
//001 apple 4.5 1571627570000L
val orderStream = fsEnv.socketTextStream("CentOS",8899)
                        .map(line=>line.split("\\s+"))
                        .map(ts=>OrderItem(ts(0),ts(1),ts(2).toDouble,ts(3).toLong))
                        .assignTimestampsAndWatermarks(new OrderItemWithPeriodicWatermarks)
                        .setParallelism(1)

userStream.join(orderStream)
            .where(user=>user.id)
            .equalTo(orderItem=> orderItem.uid)
            .window(TumblingEventTimeWindows.of(Time.seconds(4)))
            .apply((u,o)=>{
                (u.id,u.name,o.name,o.price,o.ts)
            })
.print()

fsEnv.execute("FlinkStreamSlidingWindowJoin")

Sliding Window Join

在这里插入图片描述

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)
fsEnv.setParallelism(1)
//001 zhangsan 1571627570000
val userStream = fsEnv.socketTextStream("CentOS",7788)
    .map(line=>line.split("\\s+"))
    .map(ts=>User(ts(0),ts(1),ts(2).toLong))
    .assignTimestampsAndWatermarks(new UserAssignerWithPeriodicWatermarks)
    .setParallelism(1)
//001 apple 4.5 1571627570000L
val orderStream = fsEnv.socketTextStream("CentOS",8899)
    .map(line=>line.split("\\s+"))
    .map(ts=>OrderItem(ts(0),ts(1),ts(2).toDouble,ts(3).toLong))
    .assignTimestampsAndWatermarks(new OrderItemWithPeriodicWatermarks)
    .setParallelism(1)

userStream.join(orderStream)
.where(user=>user.id)
.equalTo(orderItem=> orderItem.uid)
.window(SlidingEventTimeWindows.of(Time.seconds(4),Time.seconds(2)))
.apply((u,o)=>{
    (u.id,u.name,o.name,o.price,o.ts)
})
.print()

fsEnv.execute("FlinkStreamTumblingWindowJoin")

Session Window Join

在这里插入图片描述

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)
fsEnv.setParallelism(1)
//001 zhangsan 1571627570000
val userStream = fsEnv.socketTextStream("CentOS",7788)
.map(line=>line.split("\\s+"))
.map(ts=>User(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPeriodicWatermarks)
.setParallelism(1)
//001 apple 4.5 1571627570000L
val orderStream = fsEnv.socketTextStream("CentOS",8899)
.map(line=>line.split("\\s+"))
.map(ts=>OrderItem(ts(0),ts(1),ts(2).toDouble,ts(3).toLong))
.assignTimestampsAndWatermarks(new OrderItemWithPeriodicWatermarks)
.setParallelism(1)
userStream.join(orderStream)
.where(user=>user.id)
.equalTo(orderItem=> orderItem.uid)
.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
.apply((u,o)=>{
    (u.id,u.name,o.name,o.price,o.ts)
})
.print()

fsEnv.execute("FlinkStreamSessionWindowJoin")

Interval Join

The interval join joins elements of two streams (we’ll call them A & B for now) with a common key and where elements of stream B have timestamps that lie in a relative time interval to timestamps of elements in stream A.
在这里插入图片描述

This can also be expressed more formally as b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] ora.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

val fsEnv = StreamExecutionEnvironment.getExecutionEnvironment
fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
fsEnv.getConfig.setAutoWatermarkInterval(1000)
fsEnv.setParallelism(1)
//001 zhangsan 1571627570000
val userStream = fsEnv.socketTextStream("CentOS",7788)
.map(line=>line.split("\\s+"))
.map(ts=>User(ts(0),ts(1),ts(2).toLong))
.assignTimestampsAndWatermarks(new UserAssignerWithPeriodicWatermarks)
.setParallelism(1)
.keyBy(_.id)


//001 apple 4.5 1571627570000L
val orderStream = fsEnv.socketTextStream("CentOS",8899)
.map(line=>line.split("\\s+"))
.map(ts=>OrderItem(ts(0),ts(1),ts(2).toDouble,ts(3).toLong))
.assignTimestampsAndWatermarks(new OrderItemWithPeriodicWatermarks)
.setParallelism(1)
.keyBy(_.uid)


userStream.intervalJoin(orderStream)
.between(Time.seconds(-1),Time.seconds(1))
.process(new ProcessJoinFunction[User,OrderItem,String]{
    override def processElement(left: User, right: OrderItem, ctx: ProcessJoinFunction[User, OrderItem, String]#Context, out: Collector[String]): Unit = {
        println(left+" \t"+right)
        out.collect(left.id+" "+left.name+" "+right.name+" "+ right.price+" "+right.ts)
    }
})
.print()

fsEnv.execute("FlinkStreamSessionWindowJoin")

Flink HA

The JobManager coordinates every Flink deployment. It is responsible for both scheduling and resource management.

By default, there is a single JobManager instance per Flink cluster. This creates a single point of failure (SPOF): if the JobManager crashes, no new programs can be submitted and running programs fail.

With JobManager High Availability, you can recover from JobManager failures and thereby eliminate the SPOF. You can configure high availability for both standalone and YARN clusters.

Standalone Cluster High Availability

The general idea of JobManager high availability for standalone clusters is that there is a single leading JobManager at any time and multiple standby JobManagers to take over leadership in case the leader fails. This guarantees that there is no single point of failureand programs can make progress as soon as a standby JobManager has taken leadership. There is no explicit distinction between standby and master JobManager instances. Each JobManager can take the role of master or standby.

在这里插入图片描述

搭建过程

先决条件(略)

  • 安装JDK
  • 安装HADOOP HDFS-HA
  • 安装Zookeeper

Flink环境构建

  • 配置HADOOP_CLASSPATH
[root@CentOSX ~]# vi .bashrc
HADOOP_HOME=/usr/hadoop-2.9.2
JAVA_HOME=/usr/java/latest
PATH=$PATH:$/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
CLASSPATH=.
export JAVA_HOME
export PATH
export CLASSPATH
export HADOOP_HOME
HADOOP_CLASSPATH=`hadoop classpath`
export HADOOP_CLASSPATH
[root@CentOSX ~]# source .bashrc
[root@CentOSX ~]# echo $HADOOP_CLASSPATH
/usr/hadoop-2.9.2/etc/hadoop:/usr/hadoop-2.9.2/share/hadoop/common/lib/*:/usr/hadoop-2.9.2/share/hadoop/common/*:/usr/hadoop-2.9.2/share/hadoop/hdfs:/usr/hadoop-2.9.2/share/hadoop/hdfs/lib/*:/usr/hadoop-2.9.2/share/hadoop/hdfs/*:/usr/hadoop-2.9.2/share/hadoop/yarn/lib/*:/usr/hadoop-2.9.2/share/hadoop/yarn/*:/usr/hadoop-2.9.2/share/hadoop/mapreduce/lib/*:/usr/hadoop-2.9.2/share/hadoop/mapreduce/*:/usr/hadoop-2.9.2/contrib/capacity-scheduler/*.jar

  • 上传Flink,配置Flink
[root@CentOSX ~]# tar -zxf flink-1.8.1-bin-scala_2.11.tgz -C /usr/
[root@CentOSA ~]# cd /usr/flink-1.8.1
[root@CentOSA flink-1.8.1]# vi conf/flink-conf.yaml

#==============================================================================
# Common
#==============================================================================
taskmanager.numberOfTaskSlots: 4
parallelism.default: 3
#==============================================================================
# High Availability
#==============================================================================
 high-availability: zookeeper
 high-availability.storageDir: hdfs:///flink/ha/
 high-availability.zookeeper.quorum: CentOSA:2181,CentOSB:2181,CentOSC:2181
 high-availability.zookeeper.path.root: /flink
 high-availability.cluster-id: /default_ns
#==============================================================================
# Fault tolerance and checkpointing
#==============================================================================
 state.backend: rocksdb
 state.checkpoints.dir: hdfs:///flink-checkpoints
 state.savepoints.dir: hdfs:///flink-savepoints
 state.backend.incremental: true

[root@CentOSX flink-1.8.1]# vi conf/masters
CentOSA:8081
CentOSB:8081
CentOSC:8081
[root@CentOSA flink-1.8.1]# vi conf/slaves
CentOSA
CentOSB
CentOSC

启动Flink集群

[root@CentOSA flink-1.8.1]# ./bin/start-cluster.sh
Starting HA cluster with 3 masters.
Starting standalonesession daemon on host CentOSA.
Starting standalonesession daemon on host CentOSB.
Starting standalonesession daemon on host CentOSC.
Starting taskexecutor daemon on host CentOSA.
Starting taskexecutor daemon on host CentOSB.
Starting taskexecutor daemon on host CentOSC.

等集群启动完成后,查看JobManager任务的日志,在lead主机中可以看到:

 http://xxx:8081 was granted leadership with leaderSessionID=f5338c3f-c3e5-4600-a07c-566e38bc0ff4

测试HA

登陆获取leadership的节点,然后执行以下指令

[root@CentOSB flink-1.8.1]# ./bin/jobmanager.sh stop

查看其它节点,按照上诉的测试方式,可以查找leadership日志输出的节点,该节点就是master节点。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/61193.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

一、ADAS技术概述

根据《国家车联网产业标准体系建设指南》对智能网联汽车定义&#xff1a;智能网联汽车是指搭载先进的车载传感器、控制器、执行器等装置&#xff0c;并融合现代通信与网络技术&#xff0c;实现车与X&#xff08;人、车、路、云端等&#xff09;智能信息交换、共享&#xff0c;具…

阶段总结(linux基础)

目录 一、初始linux系统 二、基本操作命令 三、目录结构 四、文件及目录管理命令 查看文件内容 创建文件 五、用户与组管理 六、文件权限与压缩管理 七、磁盘管理 八、系统程序与进程管理 管理机制 文件系统损坏 grub引导故障 磁盘资源耗尽 程序与进程的区别 查…

【云原生】Serverless 技术架构分析

一、什么是Serverless? 1、Serverless技术简介 ​ Serverless&#xff08;无服务器架构&#xff09;指的是由开发者实现的服务端逻辑运行在无状态的计算容器中&#xff0c;它由事件触发&#xff0c; 完全被第三方管理&#xff0c;其业务层面的状态则被开发者使用的数据库和存…

交换机Vlan实验

介绍 Vlan表示虚拟局域网。 常见的网络安全技术 VlanACL Vlan的作用 Vlan隔离了广播域&#xff0c;增加了网络的安全性。 知识点 默认vlan vlan1 是默认vlan&#xff0c;主要机器开机了&#xff0c;默认所有的接口都属于Vlan1 交换机的接口模式 Access : 这个模式用来…

【设计模式】-建造者模式

Java建造者模式&#xff1a;创建复杂对象的灵活构建者 在软件开发中&#xff0c;我们经常遇到需要创建一个复杂对象的情况。如果使用传统的构造函数进行对象创建&#xff0c;可能会导致构造函数参数过多&#xff0c;难以管理和维护。建造者模式&#xff08;Builder Pattern&am…

MongoDB 使用总结

&#x1f353; 简介&#xff1a;java系列技术分享(&#x1f449;持续更新中…&#x1f525;) &#x1f353; 初衷:一起学习、一起进步、坚持不懈 &#x1f353; 如果文章内容有误与您的想法不一致,欢迎大家在评论区指正&#x1f64f; &#x1f353; 希望这篇文章对你有所帮助,欢…

无人驾驶实战-第五课(动态环境感知与3D检测算法)

激光雷达的分类&#xff1a; 机械式Lidar&#xff1a;TOF、N个独立激光单元、旋转产生360度视场 MEMS式Lidar&#xff1a;不旋转 激光雷达的输出是点云&#xff0c;点云数据特点&#xff1a; 简单&#xff1a;x y z i &#xff08;i为信号强度&#xff09; 稀疏&#xff1a;7%&…

【肺炎分类数据集】数据量非常充足的新冠肺炎分类数据共享

一、肺炎数据集介绍&#x1f349;&#xff1a; 1.1 格式&#x1f388; 按照标准的格式分为了①训练集train&#xff08;134138575198张&#xff09;&#xff0c;②验证集val&#xff08;8816张&#xff09;&#xff0c;③测试集test&#xff08;234390624张&#xff09;&#…

Windows server上用nginx部署vue3项目

Windows server上用nginx部署vue3项目 一、Node中node_modules文件夹及package.json文件的作用说明二、VUE3项目打包三、Windows Server上的Nginx部署 一、Node中node_modules文件夹及package.json文件的作用说明 node_modules是安装node后用来存放用包管理工具下载安装的包的…

【项目 计网3】Socket介绍 4.9字节序 4.10字节序转换函数

文章目录 4.8 Socket介绍4.9字节序简介字节序举例 4.10字节序转换函数 4.8 Socket介绍 所谓 socket&#xff08;套接字&#xff09;&#xff0c;就是对网络中不同主机上的应用进程之间进行双向通信的端点的抽象。一个套接字就是网络上进程通信的一端&#xff0c;提供了应用层进…

VUE之JWT前后端分离认证,学生管理系统

参考资料: SpringBoot搭建教程 SpringCloud搭建教程 JWT视频教程 JWT官网 Vue视频教程 JWT视频参考资料、VUE视频资料,及前后端demo 特别有参考价值的JWT博客1 特别有参考价值的JWT博客2 cookie、localstorage和sessionStorage的区别1 cookie、localstorage和sessi…

总结七大排序!

排序总览 外部排序&#xff1a;依赖硬盘&#xff08;外部存储器&#xff09;进行的排序。对于数据集合的要求特别高&#xff0c;只能在特定场合下使用&#xff08;比如一个省的高考成绩排序&#xff09;。包括桶排序&#xff0c;基数排序&#xff0c;计数排序&#xff0c;都是o…

【数据库】将excel数据导入mysql数据库

环境&#xff1a;Windows10 mysql8以上 将你要导入的excel表另存为txt格式 打开txt格式文件&#xff0c;删除表头行并另存为并更改编码方式&#xff08;由于与数据库的编码不同&#xff0c;会导致导入报错&#xff09; 通过命令行登录数据库 winr cmd进入 进入装mysql的目录位…

软件设计原则

文章目录 一、软件设计原则1. 开闭原则2. 里氏代换原则3. 依赖倒转原则4. 接口隔离原则5. 迪米特法则6. 合成复用原则 一、软件设计原则 在软件开发中&#xff0c;为了提高软件系统的可维护性和可复用性&#xff0c;增加软件的可扩展性和灵活性&#xff0c;程序员要尽量根据软件…

【ASP.NET MVC】使用动软(四)(12)

一、筛选器类和Cookie实现路由 需解决的问题&#xff1a; 网站登录往往需要用户名密码验证&#xff0c;为避免重复验证&#xff0c;一般采用Cookie 、Session等技术来保持用户的登录状态&#xff1a; Session是在服务端保存的一个数据结构&#xff0c;用来跟踪用户的状态&…

Last-Mile Embodied Visual Navigation 论文阅读

论文阅读 题目&#xff1a;Last-Mile Embodied Visual Navigation 作者&#xff1a;JustinWasserman, Karmesh Yadav 来源&#xff1a;CoRL 时间&#xff1a;2023 代码地址&#xff1a;https://jbwasse2.github.io/portfolio/SLING Abstract 现实的长期任务&#xff08;例如…

antv/l7地图,鼠标滚动,页面正常滑动-- 我们忽略的deltaY

背景 在官网项目中&#xff0c;需要使用一个地图&#xff0c;展示产品的分布区域及数量。希望的交互是&#xff0c;鼠标放上标点&#xff0c;tooltip展示地点和数量等信息。鼠标滚动&#xff0c;则页面随着滚动。但是鼠标事件是被地图代理了的&#xff0c;鼠标滚动意味着地图的…

4,链表【p5】

链表 4.1哈希表简介4.2有序表简介4.3链表4.3.1例1-反转单向和双向链表4.3.2例2-打印两个有序链表的公共部分4.3.3面试时链表解题的方法论4.3.4例3-判断一个链表是否为回文结构4.3.4.1快慢指针 4.3.5例4-将单向链表按某值划分成左边小、中间相等、右边大的形式4.3.6例5-复制好友…

RabbitMQ - 简单案例

目录 0.引用 1.Hello world 2.轮训分发消息 2.1 抽取工具类 2.2 启动两个工作线程接受消息 2.4 结果展示 3.消息应答 3.1 自动应答 3.2 手动消息应答的方法 3.3 消息自动重新入队 3.4 消息手动应答代码 4.RabbitMQ 持久化 4.1 队列如何实现持久化 4.2 消息实现持久化 5.不…

DBSCAN聚类

一、概述 DBSCAN(Density-Based Spatial Clustering of Applications with Noise)是一种基于密度的聚类算法&#xff0c;簇集的划定完全由样本的聚集程度决定。聚集程度不足以构成簇落的那些样本视为噪声点&#xff0c;因此DBSCAN聚类的方式也可以用于异常点的检测。 二、算法…