0基础学习PyFlink——水位线(watermark)触发计算

在《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》和《0基础学习PyFlink——个数滑动窗口(Sliding Count Windows)》中,我们发现如果窗口中元素个数没有把窗口填满,则不会触发计算。
在这里插入图片描述

为了解决长期不计算的问题,我们引入了在《0基础学习PyFlink——时间滚动窗口(Tumbling Time Windows)》和《0基础学习PyFlink——时间滑动窗口(Sliding Time Windows)》的方案。但是这个方案引入另外一个问题,就是每次处理数据可能不尽相同。这是因为它们使用了“处理时间”(Processing Time)来作为窗口划分的参考系,而每次程序处理时间会根据当前负载情况有很大的不同。这样我们对同一批数据做处理时,可能会得出不同的Window切分方案。
在这里插入图片描述
于是我们引入《0基础学习PyFlink——事件时间和运行时间的窗口》方案。它可以使用源自数据本身的“事件时间”(Event Time)作为Time Window的参考系,这样在不同负载、不同时间,相同数据的时间参考系是一样的,进而可以得出一致的结果。
在这里插入图片描述
但是现实中,我们没法保证上述数据是按照上面的顺序到达Flink的。
比如下面这个例子,红色部分都是乱序的,那么Flink如何处理这些数据呢?
在这里插入图片描述
只有两种可能性:

  1. 直接抛弃;
  2. 等待一段时间统一处理,超过等待的时间直接抛弃。因为不可能一直等下去,否则什么时候处理呢?

这些即有别于Count Window,也有别于Time Window。这个时候就要引入水位线(watermark)技术来解决这个问题。
在详细讲解之前,我们需要明确一些基本知识:

  1. EventTime就是Timestamp,即我们可以通过制定Timestamp函数设定元素的EventTime。
  2. EventTime从属于元素。
  3. Watermark源于EventTime和max_out_of_orderness(等待无序数据的时间),即Watermark=EventTime-max_out_of_orderness。
  4. Watermark从属于流。
  5. Window的Start源于EventTime;End源于Start和窗口时间,即End=Start+WindowTme;这是一个左闭右开的区间,即[Start, End)。
  6. Window从属于流,只有Watermark>=Window End时才会触发计算(且窗口中要有元素)。
  7. Window在单向递增前进,比如从[0,10)变成[10,20)、[20,30)……[90,100)。
  8. Wartermark单向递增前进,它不会因为新进入的元素EventTime较小,而导致Wartermark向变小的趋势发展。
    在这里插入图片描述
    上图中,第一个元素(A,1)的EventTime通过自定义公式可以得到101,于是初始的Window的Start值是该值向下取可以被Window Size整除的最大值,即100;这个进一步确认了第一个窗口是[100,105)。
    watermark是通过eventtime计算出来的,上例中我们希望如果事件在窗口时间之外到来则抛弃,即不等待任何时间,即Window End+0,即Eventtime-0。
    (A,0)数据来到的时候,watermark不会因为其Eventtime为100,比流中的watermark值(101)小而改变,依然维持watermark单调递增。这个在(A,2)和(A,5)到来时也是如此。
    (A,8)元素的到来,会让流的watermark变成108。这个值会越过当前窗口[100,105),于是会触发计算。计算的元素要求eventtime在上述区间内,即(A,1)、(A,0)、(A,3)和(A,4);
    (A,10)元素的到来,会让流的watermark变成110。这个值会越过当前窗口[100,110),于是会触发计算。计算的元素要求eventtime在上述区间内,即(A,8)、(A,6)、(A,7)和(A,9);而(A,2)因为不在这个区间内,就被抛弃了。我们也可以认为(A,2)迟到而被抛弃。
    为了更好讲述原理,上述例子存在一个假设:watertime更新是随着元素一个个进入而改变的。而实际元素进入个数不太确定,比如可能会两个两个进入,那么就会变成如下。主要区别就是(A,5)参与了第二次窗口计算,虽然它迟到了,而且watermark计算方法也不打算等待任何一个迟到的数据,但是它和(A,10)一起进入时间戳计算逻辑,导致触发的时机被滞后,从而“幸运”的赶上了最后一轮窗口计算。如果它稍微再晚一点到来,它也会被抛弃。
    在这里插入图片描述

测试代码

import time
from pyflink.common import Duration, WatermarkStrategy, Time, Types
from pyflink.datastream.window import TumblingEventTimeWindows, TimeWindow, TumblingProcessingTimeWindows
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.datastream import StreamExecutionEnvironment,RuntimeExecutionMode, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes
from pyflink.datastream.functions import AllWindowFunction, ProcessFunction, ProcessAllWindowFunction, KeyedProcessFunction
from pyflink.table.expressions import lit, col
from pyflink.table.window import Tumble
from pyflink.common.time import Instant
from pyflink.table.udf import udf
from pyflink.common import Row

            
class WindowFunc(AllWindowFunction[tuple, tuple, TimeWindow]):
    def apply(self, window, inputs):
        out = "**************************WindowFunc**************************" \
                "\nwindow: start:{} end:{} \ninputs: {}" \
                "\n**************************WindowFunc**************************" \
                    .format(Instant.of_epoch_milli(window.start), Instant.of_epoch_milli(window.end), inputs)
        print(out)
      
        for value in inputs:
            yield (value, Instant.of_epoch_milli(window.start), Instant.of_epoch_milli(window.end))

class TimestampAssignerAdapter(TimestampAssigner):
    def extract_timestamp(self, value, record_timestamp: int):
        return value[1] * 1000
    
class TimestampAssignerProcessFunctionAdapter(ProcessFunction):
    def process_element(self, value, ctx: 'ProcessFunction.Context'):
        out_put = "-----------------------TimestampAssignerProcessFunctionAdapter {}-----------------------" \
                    "\nvalue: {} \ttimestamp: {} \tcurrent_processing_time: {} \tcurrent_watermark: {}" \
                    "\n-----------------------TimestampAssignerProcessFunctionAdapter-----------------------" \
                        .format(int(time.time()), value, Instant.of_epoch_milli(ctx.timestamp()),
                                Instant.of_epoch_milli(ctx.timer_service().current_processing_time()),
                                Instant.of_epoch_milli(ctx.timer_service().current_watermark()))
                        
        print(out_put)
        
        yield (value, Instant.of_epoch_milli(ctx.timestamp()), 
               Instant.of_epoch_milli(ctx.timer_service().current_processing_time()),
               Instant.of_epoch_milli(ctx.timer_service().current_watermark()))

def gen_random_int_and_timestamp():
    stream_execute_env = StreamExecutionEnvironment.get_execution_environment()
    # stream_execute_env.set_runtime_mode(RuntimeExecutionMode.STREAMING)
    stream_execute_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
    stream_execute_env.set_parallelism(1)
    stream_execute_env.get_config().set_auto_watermark_interval(2)
    
    stream_table_env = StreamTableEnvironment.create(stream_execution_environment=stream_execute_env)
    ordinal_num_start = 0
    ordinal_num_end = 10
    rows_per_second = 1
    
    schame = Schema.new_builder().column('in_ord', DataTypes.INT()) \
                                .build()
                                
    table_descriptor = TableDescriptor.for_connector('datagen') \
                        .schema(schame) \
                        .option('fields.in_ord.kind', 'sequence') \
                        .option('fields.in_ord.start', str(ordinal_num_start)) \
                        .option('fields.in_ord.end', str(ordinal_num_end)) \
                        .option('rows-per-second', str(rows_per_second)) \
                        .build()
          
    stream_table_env.create_temporary_table('source', table_descriptor)
    
    table = stream_table_env.from_path('source')
    
    @udf(result_type=DataTypes.ROW([DataTypes.FIELD("in_ord", DataTypes.INT()), DataTypes.FIELD("calc_order", DataTypes.INT())]), input_types=[DataTypes.INT()])
    def colFunc(oneCol):
        ordinal_num_data_map = {0: 1, 1: 0, 2: 3, 3: 4, 4: 8, 5: 6, 6: 7, 7: 2, 8: 9, 9: 10, 10: 5}
        # ordinal_num_data_map = {0: 16, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8, 9: 9,
        #                       10: 10, 11: 11, 12: 12, 13: 13, 14: 14, 15: 15, 16: 0, 17: 17, 18: 18, 19: 19,
        #                       20: 20, 21: 121, 22: 122, 23: 123, 24: 124, 25: 125, 26: 126, 27: 127, 28: 128, 29: 129,}
        data = ordinal_num_data_map[oneCol] + 100
        return Row(oneCol, data)
    
    input_table=table.map(colFunc(col('in_ord')))
    
    datastream = stream_table_env.to_data_stream(input_table)
    
    ###############################################################################################    
    # datastream.window_all(TumblingProcessingTimeWindows.of(Time.milliseconds(10))) \
    #                     .apply(WindowFunc())
    ###############################################################################################
    # watermark_strategy = WatermarkStrategy.no_watermarks().with_timestamp_assigner(TimestampAssignerAdapter())
    # datastream_with_watermark=datastream.assign_timestamps_and_watermarks(watermark_strategy)
    # datastream_with_watermark.process(TimestampAssignerProcessFunctionAdapter())
    
    # datastream_with_watermark.window_all(TumblingEventTimeWindows.of(Time.milliseconds(10))) \
    #                     .apply(WindowFunc())        
    ###############################################################################################
    # watermark_strategy = WatermarkStrategy.for_monotonous_timestamps().with_timestamp_assigner(TimestampAssignerAdapter())
    watermark_strategy = WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_seconds(0)).with_timestamp_assigner(TimestampAssignerAdapter())
    datastream_with_watermark=datastream.assign_timestamps_and_watermarks(watermark_strategy)
    
    datastream_with_watermark.process(TimestampAssignerProcessFunctionAdapter())
    
    
    datastream_with_watermark.window_all(TumblingEventTimeWindows.of(Time.seconds(5))) \
                        .apply(WindowFunc())
    ###############################################################################################

    stream_execute_env.execute()
    
if __name__ == '__main__':
    gen_random_int_and_timestamp()

-----------------------TimestampAssignerProcessFunctionAdapter 1699856800-----------------------
value: Row(in_ord=0, calc_order=101) timestamp: Instant<101, 0> current_processing_time: Instant<1699856800, 705000000> current_watermark: Instant<-9223372036854776, 192000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856802-----------------------
value: Row(in_ord=1, calc_order=100) timestamp: Instant<100, 0> current_processing_time: Instant<1699856802, 700000000> current_watermark: Instant<100, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856802-----------------------
value: Row(in_ord=2, calc_order=103) timestamp: Instant<103, 0> current_processing_time: Instant<1699856802, 702000000> current_watermark: Instant<100, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856804-----------------------
value: Row(in_ord=3, calc_order=104) timestamp: Instant<104, 0> current_processing_time: Instant<1699856804, 700000000> current_watermark: Instant<102, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856804-----------------------
value: Row(in_ord=4, calc_order=108) timestamp: Instant<108, 0> current_processing_time: Instant<1699856804, 709000000> current_watermark: Instant<102, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
WindowFunc
window: start:Instant<100, 0> end:Instant<105, 0>
inputs: [Row(in_ord=0, calc_order=101), Row(in_ord=1, calc_order=100), Row(in_ord=2, calc_order=103), Row(in_ord=3, calc_order=104)]
WindowFunc
-----------------------TimestampAssignerProcessFunctionAdapter 1699856806-----------------------
value: Row(in_ord=5, calc_order=106) timestamp: Instant<106, 0> current_processing_time: Instant<1699856806, 701000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856806-----------------------
value: Row(in_ord=6, calc_order=107) timestamp: Instant<107, 0> current_processing_time: Instant<1699856806, 705000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856808-----------------------
value: Row(in_ord=7, calc_order=102) timestamp: Instant<102, 0> current_processing_time: Instant<1699856808, 700000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856808-----------------------
value: Row(in_ord=8, calc_order=109) timestamp: Instant<109, 0> current_processing_time: Instant<1699856808, 701000000> current_watermark: Instant<107, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856809-----------------------
value: Row(in_ord=9, calc_order=110) timestamp: Instant<110, 0> current_processing_time: Instant<1699856809, 440000000> current_watermark: Instant<108, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
-----------------------TimestampAssignerProcessFunctionAdapter 1699856809-----------------------
value: Row(in_ord=10, calc_order=105) timestamp: Instant<105, 0> current_processing_time: Instant<1699856809, 441000000> current_watermark: Instant<108, 999000000>
-----------------------TimestampAssignerProcessFunctionAdapter-----------------------
WindowFunc
window: start:Instant<105, 0> end:Instant<110, 0>
inputs: [Row(in_ord=4, calc_order=108), Row(in_ord=5, calc_order=106), Row(in_ord=6, calc_order=107), Row(in_ord=8, calc_order=109), Row(in_ord=10, calc_order=105)]
WindowFunc
WindowFunc
window: start:Instant<110, 0> end:Instant<115, 0>
inputs: [Row(in_ord=9, calc_order=110)]

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.3/dev/event_timestamps_watermarks.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/141987.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

Docker安装详细步骤及相关环境安装配置(mysql、jdk、redis、自己的私有仓库Gitlab 、C和C++环境以及Nginx服务代理)

目录 一、从空白系统中克隆Centos7系统 二、使用xshell连接docker_tigerhhzz虚拟机​编辑 三、在CentOS7基础上安装Docker容器 四、在Docker中进行安装Portainer 4.1、在Docker中安装MySQL 4.2、在Docker中安装JDK8&#xff0c;安装Java环境 4.3、Docker安装redis&#…

聚观早报 |京东11.11公布成绩单;2023数字科技生态大会

【聚观365】11月13日消息 京东11.11公布成绩单 2023数字科技生态大会 TikTok深受英国中小企业青睐 周鸿祎称大模型2年内可“进”智能汽车 双11全国快递业务量达 6.39 亿件 京东11.11公布成绩单 京东11.11公布成绩单&#xff1a;截至11月11日晚23:59&#xff0c;2023年京东…

桌面云架构讲解(VDI、IDV、VOI/TCI、RDS)

目录 云桌面架构 VDI 虚拟桌面基础架构 IDV 智能桌面虚拟化 VOI/TCI VOI 虚拟系统架构 TCI 透明计算机架构 RDS 远程桌面服务 不同厂商云桌面架构 桌面传输协议 什么是云桌面 桌面云是虚拟化技术成熟后发展起来的一种应用&#xff0c;桌面云通常也称为云桌面、VDI等 …

OpenCV 笔记(6):像素间的基本关系——邻域、邻接、通路、连通、距离

像素是图像的基本元素&#xff0c;像素与像素之间存在着某些联系&#xff0c;理解像素间的基本关系是数字图像处理的基础。常见的像素间的基本关系包括&#xff1a;邻域、邻接、通路、连通、距离。 Part11. 邻域 邻域表示了像素之间的连接关系。 像素(x,y)的邻域&#xff0c;是…

计算机二级Office真题解析 excel减免税,订单,成绩

第一题 1.将“Excel 减免税.xlsx”文件另存为 excel.xlsx&#xff0c;最后提交该文件&#xff08;1 分&#xff09;。 2.将“对应代码.xlsx”文件中的 sheet1 工作表插入到 excel.xlsx 中&#xff0c;工作 表名重命名为“代码”&#xff08;3 分&#xff09;。 3.在"序号&…

从关键新闻和最新技术看AI行业发展(2023.10.23-11.5第九期) |【WeThinkIn老实人报】

Rocky Ding 公众号&#xff1a;WeThinkIn 写在前面 【WeThinkIn老实人报】旨在整理&挖掘AI行业的关键新闻和最新技术&#xff0c;同时Rocky会对这些关键信息进行解读&#xff0c;力求让读者们能从容跟随AI科技潮流。也欢迎大家提出宝贵的优化建议&#xff0c;一起交流学习&…

将随机数设成3407,让你的深度学习模型再涨一个点!文再附3种随机数设定方法

随机数重要性 深度学习已经在计算机视觉领域取得了巨大的成功&#xff0c;但我们是否曾想过为什么同样的模型在不同的训练过程中会有不同的表现&#xff1f;为什么使用同样的代码&#xff0c;就是和别人得到的结果不一样&#xff1f;怎么样才能保证自己每次跑同一个实验得到的…

Django中Cookie和Session的使用

目录 一、Cookie的使用 1、什么是Cookie&#xff1f; 2、Cookie的优点 3、Cookie的缺点 4、Django中Cookie的使用 二、Session的使用 1、什么是Session&#xff1f; 2、Session的优点 3、Session的缺点 4、Django中Session的使用 三、Cookie和Session的对比 总结 D…

Vue 小黑记事本组件板

渲染功能&#xff1a; 1.提供数据&#xff1a; 提供在公共的父组件 App.vue 2.通过父传子&#xff0c;将数据传递给TodoMain 3.利用 v-for渲染 添加功能&#xff1a; 1.收集表单数据 v-model 2.监听事件&#xff08;回车点击都要添加&#xff09; 3.子传父&#xff0c;讲…

【嵌入式设计】Main Memory:SPM 便签存储器 | 缓存锁定 | 读取 DRAM 内存 | DREM 猝发(Brust)

目录 0x00 便签存储器&#xff08;Scratchpad memory&#xff09; 0x01 缓存锁定&#xff08;Cache lockdown&#xff09; 0x02 读取 DRAM 内存 0x03 DREM Banking 0x04 DRAM 猝发&#xff08;DRAM Burst&#xff09; 0x00 便签存储器&#xff08;Scratchpad memory&#…

Flutter有状态组件StatefulWidget生命周期

StatefulWidget是Flutter中的一个有状态的组件&#xff0c;它的生命周期相对复杂一些。下面是StatefulWidget的生命周期方法及其调用顺序&#xff1a; 1. createState(): 当StatefulWidget被插入到Widget树中时&#xff0c;会调用createState()方法来创建与之关联的State对象。…

软路由R4S+iStoreOS实现公网远程桌面局域网内电脑

软路由R4SiStoreOS实现公网远程桌面局域网内电脑 文章目录 软路由R4SiStoreOS实现公网远程桌面局域网内电脑简介 一、配置远程桌面公网地址配置隧道 二、家中使用永久固定地址 访问公司电脑具体操作方法是&#xff1a;2.1 登录页面2.2 再次配置隧道2.3 查看访问效果 简介 上篇…

力扣511. 游戏玩法分析 I

答案&#xff1a; select player_id,min(event_date) as first_login from Activity a group by player_id我最开始写的错误答案是这样的&#xff1a; select player_id,event_date as first_login from Activity a group by player_id having event_date min(event_date…

Docker - DockerFile

Docker - DockerFile DockerFile 描述 dockerfile 是用来构建docker镜像的文件&#xff01;命令参数脚本&#xff01; 构建步骤&#xff1a; 编写一个dockerfile 文件docker build 构建成为一个镜像docker run 运行脚本docker push 发布镜像&#xff08;dockerhub&#xff0…

无监督学习的集成方法:相似性矩阵的聚类

在机器学习中&#xff0c;术语Ensemble指的是并行组合多个模型&#xff0c;这个想法是利用群体的智慧&#xff0c;在给出的最终答案上形成更好的共识。 这种类型的方法已经在监督学习领域得到了广泛的研究和应用&#xff0c;特别是在分类问题上&#xff0c;像RandomForest这样…

【KVM-5】KVM架构

前言 大家好&#xff0c;我是秋意零。今天分析的内容是KVM架构。 &#x1f47f; 简介 &#x1f3e0; 个人主页&#xff1a; 秋意零&#x1f525; 账号&#xff1a;全平台同名&#xff0c; 秋意零 账号创作者、 云社区 创建者&#x1f9d1; 个人介绍&#xff1a;在校期间参与…

正点原子嵌入式linux驱动开发——Linux IIO驱动

工业场合里面也有大量的模拟量和数字量之间的转换&#xff0c;也就是常说的ADC和DAC。而且随着手机、物联网、工业物联网和可穿戴设备的爆发&#xff0c;传感器的需求只持续增强。比如手机或者手环里面的加速度计、光传感器、陀螺仪、气压计、磁力计等&#xff0c;这些传感器本…

计算机视觉(CV)技术的优势和挑战

计算机视觉技术在很多领域具有很大的优势,例如: 自动化:计算机视觉技术可以帮助实现自动化生产和检测,省去了人力成本和时间成本。 准确性:计算机视觉技术可以提高生产和检测的准确性,降低了人工操作产生的误差。 速度:计算机视觉技术可以实现高速速度的生产和检测,提高…

flv.js在vue中的使用

Flv.js 是 HTML5 Flash 视频&#xff08;FLV&#xff09;播放器&#xff0c;纯原生 JavaScript 开发&#xff0c;没有用到 Flash。由 bilibili 网站开源。它的工作原理是将 FLV 文件流转码复用成 ISO BMFF&#xff08;MP4 碎片&#xff09;片段&#xff0c;然后通过 Media Sour…

【vue实战项目】通用管理系统:封装token操作和网络请求

目录 1.概述 2.封装对token的操作 3.封装axios 1.概述 前文我们已经完成了登录页&#xff1a; 【vue实战项目】通用管理系统&#xff1a;登录页-CSDN博客 接下来我们要封装一下对token的操作和网络请求操作。之所以要封装这部分内容是因为token我们登陆后的所有请求都要携…