基于Flink SQL的实时指标多维分析模型

数据流程介绍

1.创建源表kafka接入消息队列数据,定义字段映射规则;
2.创建目标表es_sink配置Elasticsearch输出;
3.通过多级视图(tmp→tmp_dedup→tmp1/tmp2→tmp3→tmp_groupby)实现数据清洗、去重、状态计算;
4.使用ROLLUP进行多维聚合统计;
5.最终计算结果写入ES,包含成功率等衍生指标。
在这里插入图片描述

Flink SQL 逻辑

SET table.exec.state.ttl=2592000s; --30 days,默认: 0 ms
--MiniBatch 聚合
SET table.exec.mini-batch.enabled = true;
SET table.exec.mini-batch.allow-latency = 1s;
SET table.exec.mini-batch.size = 10000;
--Local-Global 聚合
SET table.optimizer.agg-phase-strategy = TWO_PHASE;
-- 单位:ms, 10天
--SET table.exec.state.ttl = 864000000

CREATE TABLE kafkaTable (
       mid bigint,
       db string,
       sch string,
       tab string,
       opt string,
       ts bigint,
       ddl string,
       err string,
       src map<string,string>,
       cur map<string,string>,
       cus map<string,string>,
       id AS IF(cur['id'] IS NOT NULL , cur['id'], src ['id']),
       task_id AS IF(cur['task_id'] IS NOT NULL , cur['task_id'], src ['task_id']),
       account_id AS IF(cur['account_id'] IS NOT NULL , cur['account_id'], src ['account_id']),
       publish_time AS IF(cur['publish_time'] IS NOT NULL , cur['publish_time'], src ['publish_time']),
       msg_status AS IF(cur['msg_status'] IS NOT NULL , cur['msg_status'], src ['msg_status']),
       send_type AS IF(cur['send_type'] IS NOT NULL , cur['send_type'], src ['send_type']),
       retry_status AS IF(cur['retry_status'] IS NOT NULL , cur['retry_status'], src ['retry_status']),
       update_time as IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']),
       event_time as cast(IF(cur['update_time'] IS NOT NULL , cur['update_time'], src ['update_time']) AS TIMESTAMP(3)), -- TIMESTAMP(3)/TIMESTAMP_LTZ(3)
       proctime AS PROCTIME()
--                           WATERMARK FOR event_time AS event_time - INTERVAL '1' MINUTE     --SECOND
) WITH (
      'connector' = 'kafka',
      'topic' = 'xxx',
      'jdq.client.id' = 'xxx',
      'jdq.password' = 'xxx',
      'jdq.domain' = 'xxx',
      'scan.startup.mode' = 'group-offsets', --  default: group-offsets,other: latest-offset,earliest-offset
      --  'properties.enable.auto.commit',= 'true' -- default:false, 如果为false,则在发生checkpoint时触发offset提交
      'format' = 'binlog'
      );


CREATE TABLE es_sink(
     send_type      STRING
    ,task_id        STRING
    ,month_dim      STRING
    ,day_dim        STRING
    ,grouping_id    INTEGER
    ,init           INTEGER
    ,cancel         INTEGER
    ,succ           INTEGER
    ,fail           INTEGER
    ,cancel_rate    float
    ,succ_rate      float
    ,fail_rate      float
    ,update_date    STRING
    ,PRIMARY KEY (grouping_id,send_type,month_dim,day_dim,task_id) NOT ENFORCED
)
    with (
        'connector' = 'elasticsearch-6',
        'index' = 'index01',
        'document-type' = 'type01',
        'hosts' = 'xx',
        'format' = 'json',
        'filter.null-value'='true',
        'sink.bulk-flush.max-actions' = '1000',
        'sink.bulk-flush.max-size' = '10mb'
        );
-- 维度:
--   - send_type, 发送类型
--   - month_dim,月份维度
--   - day_dim,天维度
--   - task_id,任务ID

CREATE view  tmp as
select
    send_type,
    task_id,
    publish_time,
    msg_status,
    case when UPPER(opt) = 'INSERT' and msg_status='0'  then 1 else 0 end AS init,
    case when UPPER(opt) = 'UPDATE' and msg_status='4' then 1 else 0 end AS cancel,
    case when UPPER(opt) = 'UPDATE' and msg_status='1' then 1 else 0 end AS succ,
    case when UPPER(opt) = 'UPDATE' and msg_status='2' then 1 else 0 end AS fail,
    update_time,
    opt,
    ts,
    id,
    proctime,
    SUBSTRING(publish_time,1,7) as month_dim,
    SUBSTRING(publish_time,1,10) as day_dim
FROM kafkaTable
where trim(retry_status) = '0'
  and publish_time >= '2025-01-01 00:00:00'
  and
    (    (UPPER(opt) = 'INSERT' and msg_status='0' and position( '_R' in task_id) = 0)
        or   (UPPER(opt) = 'UPDATE' and msg_status in ('1','2','3','4') and position( '_R' in task_id) = 0)
        or   (UPPER(opt) = 'UPDATE' and msg_status='1' and position( '_R' in task_id) > 0)
        );

--去重模式,去重是指对在列的集合内重复的行进行删除,只保留第一行或最后一行数据。在聚合sum或count时,Flink回撤流会对数据进行回撤处理
create view tmp_dedup as
select * from
    (
        select *,
               row_number() over(partition by id,msg_status order by proctime desc) as rn
        from tmp
    ) t
where rn=1;

CREATE view tmp1 as
select
    send_type
     ,task_id
     ,month_dim
     ,day_dim
     ,init
     ,case when cancel = 1 and update_time <= publish_time then 1 else 0 end AS cancel
     ,succ
     ,case when cancel = 1 and update_time > publish_time then 1 else fail end AS fail
     ,update_time
from tmp_dedup
where position( '_R' in task_id) = 0;

CREATE view tmp2 as
select
    send_type
     ,SPLIT_INDEX(task_id,'_R',0) AS task_id
     ,month_dim
     ,day_dim
     ,init
     ,cancel
     ,succ
     ,-1 AS fail
     ,update_time
from tmp_dedup
where position( '_R' in task_id) > 0
and   succ = 1 ;

CREATE view tmp3 as
select
      send_type
     ,task_id
     ,month_dim
     ,day_dim
     ,init
     ,cancel
     ,succ
     ,fail
from tmp1
UNION ALL
select
    send_type
     ,task_id
     ,month_dim
     ,day_dim
     ,init
     ,cancel
     ,succ
     ,fail
from tmp2;


CREATE view  tmp_groupby as
select
--/*+ STATE_TTL('tmp' = '10d') */
    COALESCE(send_type,'N') AS send_type
     ,COALESCE(month_dim,'N') AS month_dim
     ,COALESCE(day_dim,'N') AS day_dim
     ,COALESCE(task_id,'N') AS task_id
     ,case when send_type is null and month_dim is null and day_dim is null and task_id is null then 1
           when send_type is not null and month_dim is null and day_dim is null and task_id is null then 2
           when send_type is not null and month_dim is not null and day_dim is null and task_id is null then 3
           when send_type is not null and month_dim is not null and day_dim is not null and task_id is null then 4
           when send_type is not null and month_dim is not null and day_dim is not null and task_id is not null then 5
    end grouping_id
     ,sum(init) as init
     ,sum(cancel) as cancel
     ,sum(succ) as succ
     ,sum(fail) as fail
from tmp3
--GROUP BY GROUPING SETS ((send_type,account_id,publish_time), (send_type,account_id),(send_type), ())
GROUP BY ROLLUP (send_type,month_dim,day_dim,task_id); --等同于以上

INSERT INTO es_sink
select
    case when trim(send_type) = '1'  then '发送类型1'
         when trim(send_type) = '2'  then '发送类型2'
         else send_type end AS send_type
     ,task_id
     ,month_dim
     ,day_dim
     ,grouping_id
     ,init
     ,cancel
     ,succ
     ,fail
     ,ROUND(cancel*100.0/init,2) AS cancel_rate
     ,ROUND(succ*100.0/(init - cancel),2) AS succ_rate
     ,ROUND(fail*100.0/(init - cancel),2) AS fail_rate
     ,CAST(LOCALTIMESTAMP AS STRING) as update_date
from tmp_groupby
where init > 0
and (init - cancel) > 0;

es mapping

#POST index01/type01/_mapping
{
    "type01": {
        "properties": {
            "grouping_id": {
                "type": "byte"
            },
            "send_type": {
                "type": "keyword",
                "ignore_above": 256
            },
           "month_dim": {
           	"type": "keyword",
           	"fields": {
           		"text": {
           			"type": "keyword"
           		},
           		"date": {
           			"type": "date",
           			"format": "yyyy-MM",
           			"ignore_malformed":"true" --忽略错误的各式
           		}
           	}
           },
            "day_dim": {
           	"type": "keyword",
           	"fields": {
           		"text": {
           			"type": "keyword"
           		},
           		"date": {
           			"type": "date",
           			"format": "yyyy-MM-dd",
           			"ignore_malformed":"true"
           		}
           	}
           },
            "task_id": {
                "type": "keyword"
            },
            "init": {
                "type": "integer"
            },
            "cancel": {
                "type": "integer"
            },
            "succ": {
                "type": "integer"
            },
            "fail": {
                "type": "integer"
            },
            "cancel_rate": {
                "type": "float"
            },
            "succ_rate": {
                "type": "float"
            },
            "fail_rate": {
                "type": "float"
            },
            "update_date": {
                "type": "date",
                "format": "yyyy-MM-dd HH:mm:ss.SSS||yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/984596.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

超分之DeSRA

Desra: detect and delete the artifacts of gan-based real-world super-resolution models.DeSRA&#xff1a;检测并消除基于GAN的真实世界超分辨率模型中的伪影Xie L, Wang X, Chen X, et al.arXiv preprint arXiv:2307.02457, 2023. 摘要 背景&#xff1a; GAN-SR模型虽然…

UIToolkit(一)

1 前言 UI Toolkit 是一种基于 Web 技术的 GUI 框架&#xff0c;是为了解决 UGUI 效率问题而设计的新一代 UI 系统&#xff08;UGUI 的介绍详见→UGUI概述&#xff09;。与 UGUI 不同&#xff0c;UI Toolkit 没有采用 GameObject 的方式&#xff0c;而是参考了 Web 技术的 XML …

Unsloth - 微调 Phi-4 + 修复 Bug

文章目录 Phi-4 错误修复1、分词器错误修复2、微调错误修复3、聊天模板问题 &#x1f4a1; 我们的问题修复有效吗&#xff1f;&#x1f999; Llama-fication&#x1f9a5; 动态 4 位量化&#x1f6e0;️ Finetuning Phi-4性能基准测试 本文翻译自&#xff1a;Phi-4 Finetuning …

多视图几何--对极几何--从0-1理解对极几何

1对极几何 1.1本质矩阵 1.1.1几何约束与推导 如图所示&#xff0c;物体点 P P P&#xff0c;图像点 p 1 , p 2 p_1,p_2 p1​,p2​,相机中心 o 1 , o 2 o_1,o_2 o1​,o2​五点共面的关系称为对极几何。 o 1 , o 2 o_1,o_2 o1​,o2​连线称为基线&#xff0c;其与图像的交点称为…

SpringBoot3.3.0集成Knife4j4.5.0实战

原SpringBoot2.7.18升级至3.3.0之后&#xff0c;Knife4j进行同步升级(Spring Boot 3 只支持OpenAPI3规范)&#xff0c;从原3.0.3(knife4j-spring-boot-starter)版本升级至4.5.0(knife4j-openapi3-jakarta-spring-boot-starter)&#xff0c;以下是升级过程与注意事项等 版本信息…

一招解决Pytorch GPU版本安装慢的问题

Pytorch是一个流行的深度学习框架&#xff0c;广泛应用于计算机视觉、自然语言处理等领域。安装Pytorch GPU版本可以充分利用GPU的并行计算能力&#xff0c;加速模型的训练和推理过程。接下来&#xff0c;我们将详细介绍如何在Windows操作系统上安装Pytorch GPU版本。 查看是否…

Linux——system V共享内存

共享内存区是最快的IPC(进程内通信)形式&#xff0c;不再通过执行进入内核的系统调用来传递彼此的数据 1.共享内存的原理 IPC通信的本质是让不同的进程先看到同一份资源&#xff0c;然后再进行通信&#xff0c;所以想要通过共享内存进行通信&#xff0c;那么第一步一定是让两个…

初识数组

数组的大概内容(自学)上篇 数组的创建和赋值 创建&#xff1a; int [] name new int [5]; int name [] new int [5]; int [] name {1,2.3,4,5}; 赋值&#xff1a; int [] score {1,2,3}; int [] score new int [] {1,2,3}; int [] score;//声明 score new int []…

OSPF-单区域的配置

一、单区域概念&#xff1a; 单区域OSPF中&#xff0c;整个网络被视为一个区域&#xff0c;区域ID通常为0&#xff08;骨干区域&#xff09;。所有的路由器都在这个区域内交换链路状态信息。 补充知识点&#xff1a; OSPF为何需要loopback接口&#xff1a; 1.Loopback接口的…

c++介绍锁二

锁主要在两个以上的线程中使用&#xff0c;当多个线程访问共享资源时&#xff0c;我们需要使用锁&#xff0c;开保证共享资源的唯一性。 当两个线程访问不带锁的共享资源时&#xff0c;如下代码 #include<array> #include<thread> #include<iostream> usin…

Ubuntu系统部署.NET 8网站项目

一、使用XShell连接 Ubuntu系统初次连接时默认的用户名为&#xff1a;ubuntu&#xff0c;使用此用户名与系统登录密码进行连接。 登录成功效果如下图&#xff1a; 二、root用户登录 linux下有超级用户&#xff08;root&#xff09;和普通用户&#xff0c;普通用户不能直接操…

学习资料电子版 免费下载的网盘网站(非常全!)

我分享一个私人收藏的电子书免费下载的网盘网站&#xff08;学习资料为主&#xff09;&#xff1a; link3.cc/sbook123 所有资料都保存在网盘了&#xff0c;直接转存即可&#xff0c;非常的便利&#xff01; 包括了少儿&#xff0c;小学&#xff0c;初中&#xff0c;中职&am…

图形编辑器基于Paper.js教程24:图像转gcode的重构,元素翻转,旋转

前段时间在雕刻图片时&#xff0c;旋转图片&#xff0c;翻转图片后&#xff0c;发现生成准确的gcode&#xff0c;虽然尺寸对&#xff0c;但是都是以没有旋转&#xff0c;没有翻转的图片进行生成的。后来思考了一下&#xff0c;发现这真是一个大bug&#xff0c;无论图片如何选择…

无公网IP也能远程控制Windows:Linux rdesktop内网穿透实战

文章目录 前言1. Windows 开启远程桌面2. Linux安装rdesktop工具3. Win安装Cpolar工具4. 配置远程桌面地址5. 远程桌面连接测试6. 设置固定远程地址7. 固定地址连接测试 前言 如今远程办公已经从一种选择变成了许多企业和个人的必修课&#xff0c;而如何在Linux系统上高效地访…

一文了解汽车图像传感器

2024年底,安森美做了题为"How Automotive Image Sensors Transform the Future of Autonomous Driving"的演讲,这里结合其内容对自动驾驶图像传感器做一个介绍。 当前的自动驾驶感知技术主要有两大技术路线:一种是仅使用摄像头作为传感器进行信息采集的纯…

Talking Head Review (数字人算法综述)

文章目录 引言3D Model basedGeneFace背景方案实验 GeneFace背景方案实现细节实验 Real3D-Portrait背景方案实现细节实验 MimicTalk背景方案实现细节实验 face-vid2vid背景方案实现细节实验 MegaPortraits背景方案实现细节实验 VASA-1背景方案实现细节实验 LivePortrait背景方案…

DeepSeekR1之四_在RAGFlow中配置DeepSeekR1模型

DeepSeekR1之四_在RAGFlow中配置DeepSeekR1模型 文章目录 DeepSeekR1之四_在RAGFlow中配置DeepSeekR1模型1. 通过Ollama下载模型1. 下载DeepSeekR1模型2. 下载嵌入模型 2. 查看本地的Ollama模型3. 模型提供商中添加模型1. 打开模型提供商2. 选择Ollama待添加模型3. 添加DeepSee…

【 <一> 炼丹初探:JavaWeb 的起源与基础】之 JavaWeb 项目的部署:从开发环境到生产环境

<前文回顾> 点击此处查看 合集 https://blog.csdn.net/foyodesigner/category_12907601.html?fromshareblogcolumn&sharetypeblogcolumn&sharerId12907601&sharereferPC&sharesourceFoyoDesigner&sharefromfrom_link <今日更新> 一、开发环境…

可视化图解算法:反转链表

1. 题目 描述 给定一个单链表的头结点pHead(该头节点是有值的&#xff0c;比如在下图&#xff0c;它的val是1)&#xff0c;长度为n&#xff0c;反转该链表后&#xff0c;返回新链表的表头。 数据范围&#xff1a; 0<≤n≤1000 要求&#xff1a;空间复杂度 O(1) &#xf…

P8685 [蓝桥杯 2019 省 A] 外卖店优先级--优先队列“数组”!!!!!

P8685 [蓝桥杯 2019 省 A] 外卖店优先级 题目 解析优先队列如何判断是否使用优先队列&#xff1f;省略规则优先队列常用操作大顶堆 vs 小顶堆定义队列h队列数组 代码 题目 解析 每个外卖店会在不同的时间点收到订单&#xff0c;我们可以看见测试用例的时间顺序是不同的&#x…