flink watermark 实例分析

WATERMARK 定义了表的事件时间属性,其形式为:

 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 

rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3)/TIMESTAMP_LTZ(3),且是 schema 中的顶层列,它也可以是一个计算列。
watermark是触发计算的机制,只要事件时间<= watermark,就会触发当前行数据的计算,watermark的形象描述如下:
在这里插入图片描述

watermark的窗口触发机制

watermark会根据数据流中event的时间戳发生变化。通常情况下,event都是乱序的,不按时间排序的。watermark的计算逻辑为:当前最大的 event time - 最大允许延迟时间(MaxOutOfOrderness)。在同一个分区内部,当watermark大于或者等于窗口的结束时间时,才能触发该窗口的计算,即watermark>=windows endtime。如下图所示:
在这里插入图片描述
根据上图分析:
MaxOutOfOrderness = 5s,窗口的大小为:10s。
watermark分别为:12:08、12:15、12:30
计算逻辑为:WM(12:08)=12:13 - 5s;WM(12:15)=12:20 - 5s;WM(12:30)=12:35 - 5s

  • 对于 [12:00,12:10) 窗口,需要在WM=12:15时,才能被触发计算,参与计算的event为:event(12:07)/event(12:01)/event(12:07)/event(12:09),event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:20)/event(12:14)/event(12:15)不参与计算,因为还未到窗口时间,也就是event time 为 [12:00,12:10] 窗口内的event才能参与计算。
    注意,如果过了这个窗口期,再收到 [12:00,12:10] 窗口内的event,就算超过了最大允许延迟时间(MaxOutOfOrderness),不会再参与计算,也就是数据被强制丢掉了。
  • 对于 [12:10,12:20][12:20,12:30] 窗口,会在WM=12:30时,被同时触发计算,参与**[12:10,12:20]** 窗口计算的event为:event(12:10)/event(12:12)/event(12:12)/event(12:13)/event(12:14)/event(12:15)/event(12:15)/event(12:18);参与 [12:20,12:30] 窗口计算的event为:event(12:20)/event(12:20);在这个过程中event(12:05)会被丢弃,不会参与计算,因为已经超了最大允许延迟时间(MaxOutOfOrderness)

迟到的事件的处理,在介绍watermark时,提到了现实中往往处理的是乱序event,即当event处于某些原因而延后到达时,往往会发生该event time < watermark的情况,所以flink对处理乱序event的watermark有一个允许延迟的机制,这个机制就是最大允许延迟时间(MaxOutOfOrderness),允许在一定时间内迟到的event仍然视为有效event。

WATERMARK rowtime_column_name 取值两种方式

rowtime_column_name为计算列

CREATE TABLE pageviews (
		mid bigint,
		db string,
		sch string,
		tab string,
		opt string,
		ts bigint,
		ddl string,
		err string,
		src map < string, string >,
		cur map < string, string >,
		cus map < string, string >,
     event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), --计算列,必须为TIMESTAMP(3)/TIMESTAMP_LTZ(3)类型
     WATERMARK FOR event_time AS event_time - INTERVAL '60' SECOND
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '***',
  'topic' = 'topic1',
  'format' = 'json',
  'properties.group.id' = '*****',
	'scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets    latest-offset     earliest-offset
);

rowtime_column_name为事件时间属性

CREATE TABLE dataGen(
  uuid VARCHAR(20),
  name INT,
  age INT,
  ts TIMESTAMP(3), --事件时间属性,字段类型为TIMESTAMP(3)
 WATERMARK FOR ts AS ts
)with(
 'connector' = 'datagen',
  'rows-per-second' = '10',
  'number-of-rows' = '100',
  'fields.age.kind' = 'random',
  'fields.age.min' = '1',
  'fields.age.max' = '10',
  'fields.name.kind' = 'random',
  'fields.name.min' = '1',
  'fields.name.max' = '10'
  );

watermark使用demo

CREATE TABLE kafka_table(
		mid bigint,
		db string,
		sch string,
		tab string,
		opt string,
		ts bigint,
		ddl string,
		err string,
		src map < string, string >,
		cur map < string, string >,
		cus map < string, string >,
     group_name as COALESCE(cur['group_name'], src['group_name']),
     batch_number as COALESCE(cur['batch_number'], src['batch_number']),
     event_time as cast(TO_TIMESTAMP_LTZ(ts,3) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
     WATERMARK FOR event_time AS event_time - INTERVAL '2' MINUTE     --SECOND
) WITH (
  'connector' = 'kafka',
  'properties.bootstrap.servers' = '***',
  'topic' = 'topic1',
  'format' = 'json',
  'properties.group.id' = '*****',
	'scan.startup.mode' = 'earliest-offset'-- 取值 : group-offsets    latest-offset     earliest-offset
);

watermark在over聚合中的使用

--RANGE:每个group_name计算当前group_name前10分钟内收到的同一group_name的所有总数
select
 group_name
,event_time
,COUNT(group_name) OVER w1 as cnt
from kafka_table
where UPPER(opt) <> 'DELETE'
WINDOW w1 AS (
  PARTITION BY group_name
  ORDER BY event_time
  RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW)

watermark在windows聚合中的使用

--求每10分钟的滚动窗口内同一group_name的所有总数
create view tmp as
SELECT group_name,event_time FROM kafka_table where UPPER(opt) <> 'DELETE';

select window_start,window_end,window_time,group_name,count(*) as cnt from
TABLE(TUMBLE(TABLE tmp, DESCRIPTOR(event_time), INTERVAL '10' MINUTES))
group by window_start,window_end,window_time,group_name

参考:
Window Aggregation
Over Aggregation

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/262241.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【让云服务器更灵活】iptables转发tcp/udp端口请求

iptables转发tcp/udp端口请求 文章目录 前言一、路由转发涉及点二、转发如何配置本机端口转发到本机其它端口本机端口转发到其它机器 三、固化iptables总结 前言 路由转发是计算机网络中的一种重要概念&#xff0c;特别是在网络设备和系统之间。它涉及到如何处理和传递数据包&…

【湖仓一体尝试】MYSQL和HIVE数据联合查询

爬了两天大大小小的一堆坑&#xff0c;今天把一个简单的单机环境的流程走通了&#xff0c;记录一笔。 先来个完工环境照&#xff1a; mysqlhadoophiveflinkicebergtrino 得益于IBM OPENJ9的优化&#xff0c;完全启动后的内存占用&#xff1a; 1&#xff09;执行联合查询后的…

《A++ 敏捷开发》-1 如何改善

1 如何改善 敏捷开发过程改进案例 5月 A公司一直专门为某电信公司提供针对客服、线上播放等服务。 张工是公司的中层管理者&#xff0c;管理好几个开发团队&#xff0c;有5位项目经理向他汇报。 他听说老同学的团队都开始用敏捷开发&#xff0c;很感兴趣&#xff0c;便参加了…

YACS(上海计算机学会竞赛平台)三星级挑战——两数之和

题目描述 给定 n 个整数 a[1]​,a[2]​,⋯,a[n]​&#xff0c;并且保证 a[1​]≤a[2​]≤⋯≤a[n]​ 再给定一个目标值 t&#xff0c;请判断能否找到 a[i]​ 与 a[j]​&#xff0c;ai​aj​t 且 i≠j。 输入格式 第一行&#xff1a;单个整数n&#xff1b; 第二行&#xf…

油猴脚本教程案例【键盘监听】-编写 ChatGPT 快捷键优化

文章目录 1. 元数据1. name2. namespace3. version4. description5. author6. match7. grant8. icon 2. 编写函数.1 函数功能2.1.1. input - 聚焦发言框2.1.2. stop - 取消回答2.1.3. newFunction - 开启新窗口2.1.4. scroll - 回到底部 3. 监听键盘事件3.1 监听X - 开启新对话…

3D模型人物换装系统(二 优化材质球合批降低DrawCall)

3D模型人物换装系统 介绍原理合批材质对比没有合批材质核心代码完整代码修改总结 介绍 本文使用2018.4.4和2020.3.26进行的测试 本文没有考虑法线贴图合并的问题&#xff0c;因为生成法线贴图有点问题&#xff0c;放在下一篇文章解决在进行优化 如果这里不太明白换装的流程可以…

Python---socket之send和recv原理剖析

1. 认识TCP socket的发送和接收缓冲区 当创建一个TCP socket对象的时候会有一个发送缓冲区和一个接收缓冲区&#xff0c;这个发送和接收缓冲区指的就是内存中的一片空间。 2. send原理剖析 send是不是直接把数据发给服务端? 不是&#xff0c;要想发数据&#xff0c;必须得…

STL--stack、queue实现

STL中&#xff0c;vector、list 是容器&#xff0c;自己存储一系列的数据进行增删查改&#xff0c;而 stack、queue 是一种特殊的容器&#xff0c;叫容器适配器&#xff0c;提供一种特定的接口来访问底层容器。 STL--stack实现 template<class T, class Container deque&…

Springboot实现定时任务

一、定时任务是什么&#xff1f; 定时执行任务&#xff0c;只有电脑不关机就可以在特定的时间去执行相应的代码&#xff0c;例如抢购脚本等 二、使用步骤 1.无需引入springboot自带 package com.ltx.blog_ltx;import org.springframework.boot.SpringApplication; import o…

2023 年人工智能研究与技术排名前 10 的国家

人工智能研究是一项全球性的工作。虽然美国和中国因其对人工智能的贡献而备受关注&#xff0c;但事实是&#xff0c;世界各国都在涉足这项技术&#xff0c;尝试新的突破&#xff0c;并吸引投资者的关注。 斯坦福大学的《2023年人工智能报告》估计&#xff0c;到 2022 年&#…

hbase用shell命令新建表报错ERROR: KeeperErrorCode = NoNode for /hbase/master

或者HMster开启后几秒消失问题解决 报错如图&#xff1a; 首先jps命令查看当前运行的内容有没有HMaster,如果没有&#xff0c;开启一下hbase,稍微等一会儿&#xff0c;再看一下HMaster,如果仍和下图一样没有&#xff0c;就基本找到问题了 本人问题原因&#xff1a;hbase-site…

嵌入式中断理解

一、概念 中断&#xff1a; 在主程序运行过程中&#xff0c;出现了特定的中断触发条件&#xff08;中断源&#xff09;&#xff0c;使得CPU暂停当前正在运行的程序&#xff0c;转而去处理中断程序&#xff0c;处理完成后又返回原来被暂停的位置继续运行。 中断优先级&#x…

AIGC绘画Midjourney光线关键词、构图关键词、色调关键词

Unity3D特效百例案例项目实战源码Android-Unity实战问题汇总游戏脚本-辅助自动化Android控件全解手册再战Android系列Scratch编程案例软考全系列Unity3D学习专栏蓝桥系列ChatGPT和AIGC &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff0c;以及各种资源分…

树莓派,opencv,Picamera2利用舵机云台追踪特定颜色对象(PID控制)

一、需要准备的硬件 Raspiberry 4b两个SG90 180度舵机&#xff08;注意舵机的角度&#xff0c;最好是180度且带限位的&#xff0c;切勿选360度舵机&#xff09;二自由度舵机云台&#xff08;如下图&#xff09;Raspiberry CSI 摄像头 组装后的效果&#xff1a; 二、项目目标…

STM32 使用ARM仿真器设置

STM32单片机程序下载到单片机芯片中有两种方式&#xff0c;①编译生成HEX&#xff0c;使用程序烧录软件刷到单片机芯片里。②使用ARM仿真器下载程序。使用ARM仿真器的优势是&#xff0c;在工程编译没问题直接在Keil软件里就可以将程序下载到单片机里&#xff0c;并且程序可以在…

6. 行为模式 - 观察者模式

亦称&#xff1a; 事件订阅者、监听者、Event-Subscriber、Listener、Observer 意图 观察者模式是一种行为设计模式&#xff0c; 允许你定义一种订阅机制&#xff0c; 可在对象事件发生时通知多个 “观察” 该对象的其他对象。 问题 假如你有两种类型的对象&#xff1a; ​ 顾…

如何在 openKylin 上使用 ONLYOFFICE 桌面编辑器

文章作者&#xff1a;ajun ONLYOFFICE 桌面编辑器是一款基于依据 AGPL v.3 许可进行分发的开源办公套件。使用这款应用&#xff0c;您无需保持网络连接状态即可处理存储在计算机上的文档。 本文章基于中国根操作系统 openKylin 操作系统&#xff0c;使用软件商店快速安装与手…

IPC之九:使用UNIX Domain Socket进行进程间通信的实例

socket 编程是一种用于网络通信的编程方式&#xff0c;在 socket 的协议族中除了常用的 AF_INET、AF_RAW、AF_NETLINK等以外&#xff0c;还有一个专门用于 IPC 的协议族 AF_UNIX&#xff0c;IPC 是 Linux 编程中一个重要的概念&#xff0c;常用的 IPC 方式有管道、消息队列、共…

深入了解Spring MVC工作流程

目录 1. MVC架构简介 2. Spring MVC的工作流程 2.1. 客户端请求的处理 2.2. 视图解析和渲染 2.3. 响应生成与返回 3. Spring MVC的关键组件 3.1. DispatcherServlet 3.2. HandlerMapping 3.3. Controller 3.4. ViewResolver 4. 结论 Spring MVC&#xff08;Model-Vi…

Node.js-模块化(二)

1. 模块化的基本概念 1.1 什么是模块化 模块化是指解决一个复杂问题时&#xff0c;自顶向下逐层将系统拆分成若干模块的过程。对于整个系统来说&#xff0c;模块是可组合、分解和更换的单元。 1.2 编程领域中的模块化 编程领域中的模块化&#xff0c;就是遵守固定的规则&…