涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(上)

涤生大数据实战:基于Flink+ODPS历史累计计算项目分析与优化(一)

1.前置知识

ODPS(Open Data Platform and Service)是阿里云自研的一体化大数据计算平台和数据仓库产品,在集团内部离线作为离线数据处理和存储的产品。离线计算任务节点叫做Odps节点,存储的离线表叫做Odps表;

Flink: 实时计算引擎,本文代码开发和测试均基于集团内部实时计算平台,代码细节可能会和Flink 官方社区文档有些许不同,假如用于生产环境测试,参考Apache Flink 官方文档为准,但是技术方案是通用的哈;

https://flink.apache.org/posts/

2.项目背景

现有业务需求是 “根据用户注册以来的累计跑步里程,给用户发放勋章,需要实时的计算出用户【历史~此时刻】的累计跑步数据

比如说,某个用户20210101首次上传跑步记录,之后又多次上传跑步记录,我们需要实时的计算出,在20210101~当前时刻 期间,该用户累计跑了多少公里,累计跑了多少次等指标。上述指标的计算涉及用户历史至今的所有数据(2018~至今该用户所有数据),考虑使用批流结合的方式进行统计。参考批流结合的常用 lambda 方案:

我们将其拆分到“实时+离线”两条链路分别计算,离线链路计算用户历史至昨日的累计数据data1实时链路计算当日实时累计数据data2。然后在对两条链路的数据进行汇总,data1+data2即为用户历史至今日此时刻的累计数据。

这里,离线链路使用odps来做,实时计算使用Flink来做,数据存储涉及 hbase、odps,所用消息中间件是MQ。

3.解决方案

3.1 方案描述 

离线链路设计

离线链路计算目的:为了计算出全量用户【历史至昨日】的累计数据。

任务初始化时,先将历史的存量数据全量计算一次,得到存量累计值;以后每日计算用户昨日的新增数据,即新增累计值 ;两者相加即为用户历史至昨日的累计数据;循环往复,即可每日更新历史累计数据。

对应的数据链路应该长这样:

图片

离线链路计算流程如下:

step1:用户历史数据初始化。假设该计算任务发布的时间为20231010,首先要对用户 历史~20231009 期间的历史数据进行汇总,得到一个 历史存量累计数据 history_data;

step2:从20231010起,对用户每日的增量跑步数据进行汇总,得到该日的增量累计数据 day_data;

step3:将每日的增量累计数据day_data 与 历史存量累计数据history_data 进行求和,作为新的历史存量累计数据 history_data(T-1) = day_data(T-1) + history_data(T-2) ;

step4:重复 step2 和step3 ,每日更新历史存量累计数据 history_data 。

该方案的优点是,历史全量数据只用计算一次,每日只需计算增量部分后再与存量合并即可,节省计算资源。

实时链路设计

实时链路计算目的:实时计算出用户【当日零点至此刻】的累计数据

实时链路的计算逻辑比较简单,对应的计算链路示意图如下:

实时链路计算流程如下:

step1:用户新增的跑步记录通过MQ发送给Flink任务;

step2:Flink节点1对数据去重;

step3:Flink节点2对实时汇总统计 当日零点至此刻 用户的跑步累计数据;step4:将计算结果输出给下游。

实时离线链路融合

实时离线链路融合目的实时得到用户历史至此时刻的汇总数据

从上述的离线、实时链路中,我们分别得到了用户【历史~昨日】累计数据,和【当日凌晨~此刻】累计数据,只需将两者相加即可实时得到用户【历史~此刻】的累计数据:

  1. ODPS 计算出用户 [非当日的历史累计数据],为使用方便,会每天更新全量用户历史累计数据;

  2. 使用Flink节点1 实时计算用户当日上传的跑步累计数据;

  3. 使用 Flink节点2 实时的将离线数据和实时数据汇总起来;

  4. 将汇总结果写入Hbase结果表,同时发送个MQ消息给下游业务方。

这里需要有两点需要注意:

1、根据业务特点,这里将离线计算结果作为维表使用:

Flink任务的下游业务方更关注当日上传过跑步记录的用户的数据更新情况,ODPS结果表作为维表用,Flink任务只对当日上传跑步记录的用户进行查询,得到“非当日历史统计数据”,在与“当日新增跑步数据”相加,即可得到该历史至今的最终的统计数据(更新hbase结果表),符合需求;

我们的跑步用户中大部分的用户不会每天都上传跑步记录,这些人的结果数据不会发生改变。若将ODPS表作为源表,则依旧会为这些用户更新数据,浪费计算资源。

【优化】odps表作为维表,不适合大数据量的情况,大数据量使用hbase表作为维表比较合适。这里将odps表数据同步到hbase表中,再拿该hbase表作为维表。

2、初始化下游结果表:在整个任务跑起来前,需要先使用ODPS表的bizdate分区数据初始化hbase结果表,然后再由实时任务对结果表进行更新;

最终的方案示意图如下:

3.2 存在的问题

上面的lambda方案有个问题,每日凌晨零点过后,实时任务已开始计算新的一天数据,而离线任务计算尚未结束,这时会出现一个离线数据缺失的窗口期。重点分析一下框图中“实时数据+离线数据”的部分:

正常情况

当一个用户在T日实时上传了自己的跑步记录,Flink节点1会计算出其 [当日0点起至此刻] 的跑步累计数据data1,Flink节点2会根据该用户id取hbase维表里查询其 [历史~T-1日] 的累计数据 data2 (hbase表里数据由odps每日更新,即T-1日的存量累计汇总数据),将data1和data2二者汇总,就可得到 用户历史至此时刻的汇总数据;

异常情况 

在凌晨(比如说,在00:00~00:30),ODPS正在计算最新分区数据(T-1日的数据)的期间,新的分区还没生成完,或者ODPS计算已经完成,但odps表同步base表同步任务还未完成,此时若发生了查询,会发生什么?

会使用老分区的数据(T-2日的数据,而不是期望的T-1日数据),导致数据不准。

【问题描述】

在凌晨时分,ODPS计算T-1日数据期间,如果发生了对T-1日的数据查询,则无法获取到期望的T-1日数据,会继续使用T-2日的数据

这里“无法获取正确数据”的时间长度 = ODPS计算时间 + ODPS同步数据到Hbase的时间

【原因】

Flink查询维表时 使用维表当前的数据快照,本次查询完成后再发生的维表更新不会对已有查询造成影响。

【举例】

case1(ODPS计算未完成):

27号,Flink任务计算27号当天的用户累计数据,同时查询odps维表的 26号分区 中该用户的历史累计数据,两者相加,得到27号的实时累计结果;

28号凌晨,ODPS正在计算27号分区的数据,任务还未结束,27号分区数据尚不可用;而Flink任务已经开始计算28号当天的用户累计数据,此刻发生了一次维表查询,期望从维表中查到该用户27号统计的历史累计数据,然而由于27号数据未准备好,则维表会返回26号的历史累计数据,这会导致数据计算错误,相当于丢失了该用户27号的数据。

case2(ODPS计算完成,但odps表同步habse表任务未完成):

28号凌晨,ODPS的计算已完成,odps表正在同步数据到hbase表期间,如果Flink发生了查询,期望获取用户27号的最新数据,但由于还没有更新完成,还是会用26号的数据,会造成类似的错误结果。

上面所述问题是批流融合的 lambda 框架常会遇到的问题,因此必须思考优化方案来解决上述问题。优化方案将在下一篇文章展现,敬请期待!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/379602.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

华为第二批难题一:基于预训练AI模型的元件库生成

我的理解:华为的这个难道应该是想通过大模型技术,识别元件手册上的图文内容,与现有建库工具结合,有潜力按标准生成各种库模型。 正好,我们正在研究,利用知识图谱技术快速生成装配模型,其中也涉…

[C/C++] -- JSON for Modern C++

JSON for Modern C(nlohmann/json)是一个流行的 C JSON 库,由德国开发者nlohmann编写。这个库提供了简洁而灵活的 API,使得在C中解析和生成JSON数据变得非常方便。 1.JSON简介 JSON(JavaScript Object Notation&…

6、5 门关于 AI 和 ChatGPT 的免费课程,带您从 0-100

5 门关于 AI 和 ChatGPT 的免费课程,带您从 0-100 想在 2024 年免费了解有关 AI 和 ChatGPT 的更多信息吗? 图片由 DALLE 3 提供 活着是多么美好的时光啊。还有什么比现在更适合了解生成式人工智能(尤其是 ChatGPT)等人工智能元素的呢!许多人对这个行业感兴趣,但有些…

(五)elasticsearch 源码之查询流程分析

https://www.cnblogs.com/darcy-yuan/p/17039526.html 1.概述 上文我们讨论了es(elasticsearch,下同)索引流程,本文讨论es查询流程,以下是基本流程图 2.查询流程 为了方便调试代码,笔者在电脑上启动了了…

【大厂AI课学习笔记】【1.5 AI技术领域】(7)图像分割

今天学习到了图像分割。 这是我学习笔记的脑图。 图像分割,Image Segmentation,就是将数字图像分割为若干个图像子区域(像素的集合,也被称为超像素),改变图像的表达方式,以更容易理解和分析。 …

[WUSTCTF2020]朴实无华(特详解)

一开始说header出问题了 就先dirsaerch扫一遍 发现robot.txt 访问一下 去看看&#xff0c;好好好&#xff0c;肯定不是得 他一开始说header有问题&#xff0c;不妨抓包看看&#xff0c;果然有东西 访问看看&#xff0c;乱码修复一下&#xff0c;在之前的博客到过 <img src…

LeetCode Python - 5.最长回文子串

文章目录 题目答案运行结果 题目 给你一个字符串 s&#xff0c;找到 s 中最长的回文子串。 如果字符串的反序与原始字符串相同&#xff0c;则该字符串称为回文字符串。 示例 1&#xff1a; 输入&#xff1a;s “babad” 输出&#xff1a;“bab” 解释&#xff1a;“aba” 同…

2024牛客寒假算法基础集训营2部分题解

Tokitsukaze and Bracelet 链接&#xff1a;登录—专业IT笔试面试备考平台_牛客网 来源&#xff1a;牛客网 题目描述 《绯染天空》是一款由 key 社与飞机社共同开发的角色扮演游戏&#xff0c;剧情内容由著名的剧本作家麻枝准编写。它是一款氪金手游&#xff0c;但也有 st…

Blender教程(基础)-衰减编辑-20

1、新建一个平面并细分 如下图所示菜单衰减工具 选中一个点上下移动、图形形变衰减 再点击箭头上下移动过程中不要松开鼠标&#xff0c;此时按鼠标中键实现衰减区域的快速调节。 也可以再菜单栏输入参数调节 调节形状 shiftA添加经纬球 按数字1切换正交前视 切换…

生态位模拟——草稿笔记

文章目录 前言ENM初识一、所需软件安装1.1. 下载ArcGIS软件&#xff1a;1.2. 下载 MaxEnt软件&#xff1a;1.3. 下载ENMtools&#xff1a; 二、数据准备与处理2.1. 物种分布数据2.2. 环境因子数据2.3. 地图数据2.4. 物种分布点去冗余2.4.1. 使用spThin包中的thin函数2.4.2. 或者…

贵金属交易包括哪些?香港有哪些贵金属交易平台?

随着金融市场的不断发展&#xff0c;贵金属交易作为一种投资方式&#xff0c;越来越受到投资者的关注。贵金属交易不仅具有投资价值&#xff0c;还能够为投资者提供规避风险和保值的工具。本文将介绍贵金属交易的种类和香港的贵金属交易平台。 一、贵金属交易的种类 贵金属交…

UE4运用C++和框架开发坦克大战教程笔记(十九)(第58~60集)完结

UE4运用C和框架开发坦克大战教程笔记&#xff08;十九&#xff09;&#xff08;第58~60集&#xff09;完结 58. 弹窗显示与隐藏59. UI 面板销毁60. 框架完成与总结 58. 弹窗显示与隐藏 这节课我们先来补全 TransferMask() 里对于 Overlay 布局类型面板的遮罩转移逻辑&#xff…

Vuex介绍和使用

1. 什么是Vuex Vuex 是一个专为 Vue.js 应用程序开发的状态管理模式和库。它解决了在大型 Vue.js 应用程序中共享和管理状态的问题&#xff0c;使得状态管理变得更加简单、可预测和可维护。 在 Vue.js 应用中&#xff0c;组件之间的通信可以通过 props 和事件进行&#xff0c…

从github上拉取项目到pycharm中

有两种方法&#xff0c;方法一较为简单&#xff0c;方法二用到了git bash&#xff0c;推荐方法一 目录 有两种方法&#xff0c;方法一较为简单&#xff0c;方法二用到了git bash&#xff0c;推荐方法一方法一&#xff1a;方法二&#xff1a; 方法一&#xff1a; 在github上复制…

SpringCloud-微服务项目架构

在当今软件开发领域&#xff0c;微服务架构正成为构建灵活、可伸缩、独立部署的应用的首选&#xff0c;微服务架构作为一种灵活而强大的设计模式&#xff0c;通过将系统拆分为独立的、自治的服务&#xff0c;使得应用更容易维护、扩展和升级。本文将探讨微服务项目架构的关键特…

WordPress函数wptexturize的介绍及用法示例,字符串替换为HTML实体

在查看WordPress你好多莉插件时发现代码中使用了wptexturize()函数用来随机输出一句歌词&#xff0c;下面boke112百科就跟大家一起来学习一下WordPress函数wptexturize的介绍及用法示例。 WordPress函数wptexturize介绍 wptexturize( string $text, bool $reset false ): st…

质数基础筛法

文章目录 埃氏筛线性筛 埃氏筛 埃氏筛是一种筛素数的方法&#xff0c;埃氏筛的思想很重要&#xff0c;主要是时间复杂度 朴素的埃氏筛的时间复杂度是 O ( n l o g n ) O(nlogn) O(nlogn) 这个复杂度是调和级数 vector<int>p; int vis[N];void solve() {rep(i,2,n){if(…

爪哇部落算法组2024新生赛热身赛题解

第一题&#xff08;签到&#xff09;&#xff1a; 1、题意&#xff1a; 2、题解: 我们观察到happynewyear的长度是12个字符&#xff0c;我们直接从前往后遍历0到n - 12的位置&#xff08;这里索引从0开始&#xff09;&#xff0c;使用C的substr()函数找到以i开头的长度为12的字…

形态学算法应用之连通分量提取的python实现——图像处理

原理 连通分量提取是图像处理和计算机视觉中的一项基本任务&#xff0c;旨在识别图像中所有连通区域&#xff0c;并将它们作为独立对象处理。在二值图像中&#xff0c;连通分量通常指的是所有连接在一起的前景像素集合。这里的“连接”可以根据四连通或八连通的邻接关系来定义…

基于华为云欧拉操作系统(HCE OS)容器化部署传统应用(Redis+Postgresql+Git+SpringBoot+Nginx)

写在前面 博文内容为 华为云欧拉操作系统入门级开发者认证(HCCDA – Huawei Cloud EulerOS)实验笔记整理认证地址&#xff1a;https://edu.huaweicloud.com/certificationindex/developer/9bf91efb086a448ab4331a2f53a4d3a1博文内容涉及一个传统 Springboot 应用HCE部署&#x…