FlinkSQL【分组聚合-多维分析-性能调优】应用实例分析

FlinkSQL处理如下实时数据需求:
实时聚合不同 类型/账号/发布时间 的各个指标数据,比如:初始化/初始化后删除/初始化后取消/推送/成功/失败 的指标数据。要求实时产出指标数据,数据源是mysql cdc binlog数据。

代码实例

--SET table.exec.state.ttl=86400s; --24 hour,默认: 0 ms
SET table.exec.state.ttl=2592000s; --30 days,默认: 0 ms
--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;

CREATE TABLE kafka_table (
     mid bigint,
     db string,
     sch string,
     tab string,
     opt string,
     ts bigint,
     ddl string,
     err string,
     src map<string,string>,
     cur map<string,string>,
     cus map<string,string>,
     account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),
     publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),
     msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),
     send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type'])
     --event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
     --WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 't1',
  'properties.bootstrap.servers' = 'xx.xx.xx.xx:9092',
  'properties.group.id' = 'g1',
  'scan.startup.mode' = 'earliest-offset',  --group-offsets/earliest-offset/latest-offset
   --  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交
  'format' = 'json'
);



CREATE TABLE es_sink(
     send_type      STRING
    ,account_id     STRING
    ,publish_time   STRING
    ,grouping_id       INTEGER
    ,init           INTEGER
    ,init_cancel    INTEGER
    ,push          INTEGER
    ,succ           INTEGER
    ,fail           INTEGER
    ,init_delete    INTEGER
    ,update_time    STRING
    ,PRIMARY KEY (group_id,send_type,account_id,publish_time) NOT ENFORCED
)
with (
    'connector' = 'elasticsearch-6',
    'index' = 'es_sink',
    'document-type' = 'es_sink',
    'hosts' = 'http://xxx:9200',
    'format' = 'json',
    'filter.null-value'='true',
    'sink.bulk-flush.max-actions' = '1000',
    'sink.bulk-flush.max-size' = '10mb'
);

CREATE view  tmp as
select
    send_type,
    account_id,
    publish_time,
    msg_status,
    case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,
    case when UPPER(opt) = 'UPDATE' and send_type='1' and msg_status='4' then 1 else 0 end AS init_cancel,
    case when UPPER(opt) = 'UPDATE' and msg_status='3' then 1 else 0 end AS push,
    case when UPPER(opt) = 'UPDATE' and (msg_status='1' or msg_status='5') then 1 else 0 end AS succ,
    case when UPPER(opt) = 'UPDATE' and (msg_status='2' or msg_status='6') then 1 else 0 end AS fail,
    case when UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0' then  1 else 0 end AS init_delete,
    event_time,
    opt,
    ts
FROM kafka_table
where (UPPER(opt) = 'INSERT' and msg_status='0' )
or        (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4','5','6'))
or        (UPPER(opt) = 'DELETE' and send_type='1' and msg_status='0');


--send_type=1          send_type=0
--初始化->0             初始化->0
--取消->4
--推送->3               推送->3
--成功->1               成功->5
--失败->2               失败->6

CREATE view  tmp_groupby as
select
 COALESCE(send_type,'N') AS send_type
,COALESCE(account_id,'N') AS account_id
,COALESCE(publish_time,'N') AS publish_time
,case when send_type is null and account_id is null and publish_time is null then 1
         when send_type is not null and account_id is null and publish_time is null then 2
         when send_type is not null and account_id is not null and publish_time is null then 3
         when send_type is not null and account_id is not null and publish_time is not null then 4
         end grouping_id
,sum(init) as init
,sum(init_cancel) as init_cancel
,sum(push) as push
,sum(succ) as succ
,sum(fail) as fail
,sum(init_delete) as init_delete
from tmp
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,account_id,publish_time); --等同于以上

INSERT INTO es_sink
select
     send_type
    ,account_id
    ,publish_time
    ,grouping_id
    ,init
    ,init_cancel
    ,push
    ,succ
    ,fail
    ,init_delete
    ,CAST(LOCALTIMESTAMP AS STRING) as update_time
from tmp_groupby

其他配置

  • flink集群参数
state.backend: rocksdb
state.backend.incremental: true
state.backend.rocksdb.ttl.compaction.filter.enabled: true
state.backend.rocksdb.localdir: /export/io_tmp_dirs/rocksdb
state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpoints
state.savepoints.dir: hdfs://namenode-host:port/flink-savepoints
rest.flamegraph.enabled: true
pipeline.operator-chaining: false
taskmanager.memory.managed.fraction: 0.7
taskmanager.memory.network.min: 128 mb
taskmanager.memory.network.max: 128 mb
taskmanager.memory.framework.off-heap.size: 32mb
taskmanager.memory.task.off-heap.size: 32mb
taskmanager.memory.jvm-metaspace.size: 256mb
taskmanager.memory.jvm-overhead.fraction: 0.03
  • 检查点配置
    在这里插入图片描述

  • job运行资源
    管理节点(JM) 1 个, 节点规格 1 核 4 GB内存, 磁盘 10Gi
    运行节点(TM)10 个, 节点规格 1 核 4 GB内存, 磁盘 80Gi
    单TM槽位数(Slot): 1
    默认并行度:8

  • es mapping

#POST app_cust_syyy_private_domain_syyy_group_msg/app_cust_syyy_private_domain_syyy_group_msg/_mapping
{
    "app_cust_syyy_private_domain_syyy_group_msg": {
        "properties": {
            "send_type": {
                "type": "keyword",
                "ignore_above": 256
            },
            "account_id": {
                "type": "keyword"
            },
           "publish_time": {
           	"type": "keyword",
           	"fields": {
           		"text": {
           			"type": "keyword"
           		},
           		"date": {
           			"type": "date",
           			"format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis",
           			"ignore_malformed":"true" # 忽略错误的各式
           		}
           	}
           },
            "grouping_id": {
                "type": "integer"
            },
            "init": {
                "type": "integer"
            },
            "init_cancel": {
                "type": "integer"
            },
            "query": {
                "type": "integer"
            },
            "succ": {
                "type": "integer"
            },
            "fail": {
                "type": "integer"
            },
            "init_delete": {
                "type": "integer"
            },
            "update_time": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            }
        }
    }
}

性能调优

是否开启【MiniBatch 聚合】和【Local-Global 聚合】对分组聚合场景影响巨大,尤其是在数据量大的场景下。

  • 如果未开启,在分组聚合,数据更新状态时,每条数据都会触发聚合运算,进而更新StateBackend (尤其是对于 RocksDB StateBackend,火焰图上反映就是一直在update rocksdb),造成上游算子背压特别大。此外,生产中非常常见的数据倾斜会使这个问题恶化,并且容易导致 job 发生反压。
    在这里插入图片描述

  • 在开启【MiniBatch 聚合】和【Local-Global 聚合】后,配置如下:

--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;

开启配置好会在DAG上添加两个环节MiniBatchAssignerLocalGroupAggregate
在这里插入图片描述

对结果的影响

开启了【MiniBatch 聚合】和【Local-Global 聚合】后,一天处理不完的数据,在10分钟内处理完毕

输出结果

在这里插入图片描述在这里插入图片描述

参考:
Group Aggregation
Streaming Aggregation Performance Tuning

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/326253.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【.net core】yisha框架,bootstrap-table组件增加固定列功能

需要引入 bootstrap-table-fixed-columns.css和bootstrap-table-fixed-columns.js文件 文件代码&#xff1a; bootstrap-table-fixed-columns.css样式文件代码 .fixed-table-header-columns, .fixed-table-body-columns {position: absolute;background-color: #fff;displa…

【Vue3】3-2 : 组件的概念及组件的基本使用方式

本书目录&#xff1a;点击进入 一、组件的概念 1.1、【案例】评分组件与按钮组件的抽离过程 二、组件的使用 - 抽离结构 2.1、【案例】简易首页 &#xff1e; 效果 &#xff1e; 代码 - 原始 &#xff1e; ​​​​​​​代码 - 组件抽离结构 &#xff1e; ​​​​…

【汇编】实验11 编写子程序

综合一下学过的指令就行了&#xff0c;比较简单。 assume cs:code data segmentdb "Beginners All-purpose Symbolic Instruction Code.",0 data ends code segment begin:mov ax,datamov ds,axmov si,0call lettercmov ax,4c00hint 21h letterc:mov cl,[si]mov ch,…

【线路图】世微AP5160宽电压降压型恒流芯片 LED电源 带调光SOT23-6

这是一款14-18V 3A 电流的PCB设计方案. 运用的是世微AP5160 电源驱动IC,这是一款效率高&#xff0c;稳定可靠的 LED 灯恒流驱动控制芯片&#xff0c;内置高精度比较器&#xff0c;固定 关断时间控制电路&#xff0c;恒流驱动电路等&#xff0c;特别适合大功率 LED 恒流驱动。 …

Wpf 使用 Prism 实战开发Day12

待办事项接口增删&#xff08;CURD&#xff09;改查实现 一.添加待办事项控制器&#xff08;ToDoController&#xff09; 控制器类需要继承 ControllerBase 基类需要添加 [ApiController] 特性以及 [Route] 特性Route&#xff08;路由&#xff09; 特性参数规则&#xff0c;一般…

pycharm debug显示的变量过多

问题&#xff1a; https://blog.csdn.net/Hodors/article/details/117535731 解决方法&#xff1a; 把"Show console variables by default"前面的勾取消掉就行 参考&#xff1a; https://stackoverflow.com/questions/48969556/hide-console-variables-in-pychar…

【Leetcode 程序员面试金典 02.08】 —— 环路检测 |双指针

面试题02.08. 环路检测 给定一个链表&#xff0c;如果它是有环链表&#xff0c;实现一个算法返回环路的开头节点。若环不存在&#xff0c;请返回null。 如果链表中有某个节点&#xff0c;可以通过连续跟踪next指针再次到达&#xff0c;则链表中存在环。 为了表示给定链表中的…

abap 将xstring转换成PDF展示

收到外围系统的xstring之后&#xff0c;如何在sap中将其打开呢 1.创建一个屏幕 2.绘制一个customer control 3.创建流逻辑 4.流逻辑如下&#xff1a; DATA: go_html_container TYPE REF TO cl_gui_custom_container, go_html_control TYPE REF TO cl_gui_html_viewer, lv_u…

基于SSM的社区老年人关怀服务系统

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

OpenCV-Python(42):摄像机标定

目标 学习摄像机畸变以及摄像机的内部参数和外部参数根据摄像机相关参数对畸变图像进行修复 基础说明 今天的低价单孔摄像机(照相机)会给图像带来很多畸变。畸变主要有两种:径向畸变和切向畸变。如下图所示用红色直线将棋盘的两个边标注出来&#xff0c;但是你会发现棋盘的边…

C++力扣题目40--组合总和II

力扣题目链接(opens new window) 给定一个数组 candidates 和一个目标数 target &#xff0c;找出 candidates 中所有可以使数字和为 target 的组合。 candidates 中的每个数字在每个组合中只能使用一次。 说明&#xff1a; 所有数字&#xff08;包括目标数&#xff09;都是…

Linux Mii management/mdio子系统分析之五 PHY状态机分析及其与net_device的关联

&#xff08;转载&#xff09;原文链接&#xff1a;https://blog.csdn.net/u014044624/article/details/123303714 前面几章基本上完成了mdio模块驱动模型的分析&#xff0c;本篇文章主要讲述phy device的状态机以及phy device与net_device的关联。Phy device主要是对phy的抽象…

[LitCTF 2023]easy_shark

解压缩&#xff0c;发现需要输入密码&#xff0c;使用010打开&#xff0c;发现frflags和deflags都被修改了&#xff0c;这就会造成压缩包伪加密 把他们都改为0&#xff0c;再打开 将流量包使用wirshark打开 过滤http&#xff0c;并追踪 得到以下信息 看到了一个类似于flag格…

Graham扫描凸包算法

凸包&#xff08;Convex Hull&#xff09;是包含给定点集合的最小凸多边形。凸包算法有多种实现方法&#xff0c;其中包括基于递增极角排序、Graham扫描、Jarvis步进法等。下面&#xff0c;我将提供一个简单的凸包算法实现&#xff0c;基于Graham扫描算法。 Graham扫描算法是一…

hf-mirror 使用

文章目录 命令下载搜索下载gated model 根据这篇文章&#xff1a; 大模型下载使我痛苦 得知 Huggingface 镜像站 https://hf-mirror.com 命令下载 网站首页会介绍下载方法 更多用法&#xff08;多线程加速等&#xff09;详见这篇文章。简介&#xff1a; 方法一&#xff1a;…

DBeaver安装步骤

DBeaver 是一个基于 Java 开发&#xff0c;免费开源的通用数据库管理和开发工具&#xff0c;使用非常友好的 ASL 协议。可以通过官方网站或者 Github 进行下载。 由于 DBeaver 基于 Java 开发&#xff0c;可以运行在各种操作系统上&#xff0c;包括&#xff1a;Windows、Linux…

用Photoshop来制作GIF动画

录了个GIF格式的录屏文件&#xff0c;领导让再剪辑下&#xff0c;于是用Photoshop2023进行剪辑&#xff0c;录屏文件有约1400帧&#xff0c;PS保存为GIF格式时&#xff0c;还是挺耗时的&#xff0c;平时少用PS来进行GIF剪辑&#xff0c;编辑后的GIF不能动&#xff0c;网上搜索的…

Servlet- Response

一、预览 介绍完Servlet-Resquest的相关内容后&#xff0c;接下来就是Servlet- Response的内容。读者阅读完本篇文章后将可以自如地解析请求、设置响应&#xff0c;完成对客户端的响应。 二、Response体系结构 Response的体系结构与Request完全一样&#xff0c;其中ServletRe…

【LeetCode】203. 移除链表元素(简单)——代码随想录算法训练营Day03

题目链接&#xff1a;203. 移除链表元素 题目描述 给你一个链表的头节点 head 和一个整数 val &#xff0c;请你删除链表中所有满足 Node.val val 的节点&#xff0c;并返回 新的头节点 。 示例 1&#xff1a; 输入&#xff1a;head [1,2,6,3,4,5,6], val 6 输出&#xff…

低代码高逻辑谱写IT组织和个人的第二成长曲线 | 专访西门子Mendix中国区总经理王炯

在今天快速演进的数字化转型浪潮中&#xff0c;低代码平台已经成为推动企业敏捷适应市场变化的关键引擎。在此背景下&#xff0c;西门子Mendix作为市场上的领导者&#xff0c;以其创新的低代码解决方案不断地刷新着行业标准。 近日&#xff0c;LowCode低码时代访谈了西门子Men…