大数据-Storm流式框架(七)---Storm事务

storm 事务


需求


storm 对于保证消息处理,提供了最少一次的处理保证。最常见的问题是如果元组可以被
重发,可以用于计数吗?不会重复计数吗?
strom0.7.0 引入了事务性拓扑的概念,可以保证消息仅被严格的处理一次。因此可以以完
全精确的、可扩展的、容错的方式处理类似计数这类的情形。
跟分布式 RPC 类似,事务性拓扑也不是 storm 的新特性,而仅仅是在 storm 原语如数据
流、spout、bolt 和拓扑基础上的高层抽象。


概念


我们先研究最简单的情形,然后进行一个迭代设计,直到达到 storm 的设计级别。


设计一


事务性拓扑的核心观点是数据处理的强有序。
一次只处理一个元组,直到该元组被成功处理,才开始处理下一个元组。
每个元组关联一个事务 ID。
如果元组处理失败,需要重发,重发的时候使用相同的事务 ID。

事务 ID 是一个整数,该整数对每个元组都会增长,也就是第一个元组的事务 ID 是 1,第
二个是 2,以此类推。
强有序保证了即使在元组重发的情况下对该元组也仅仅处理一次。
假设我们要统计一下数据流中元组的全局计数。应该将计数和最新的事务 ID 一起作为一条
记录存储到数据库中。
只能是数据库中的事务 ID 和当前元组的事务 ID 不同的时候,更新数据库中的数据。
3tx c8 db
3tx c12 tuple


考虑两种情形:


1、 数据库中的事务 ID 和当前的事务 ID 不同:由于事务的强有序,数据库中的计数不包
括当前元组。我们可以安全地更新数据库中的计数和事务 ID。
2、 数据库中的事务 ID 和当前元组的事务 ID 相同:当前元组已经统计在数据库中的计数
中了,放弃这次对数据库的更新。因为当前的元组肯定是在更新完数据库之后,通知
storm 处理成功之前出错了。
而且,这种拓扑的设计可以在同一个事务内更新很多源的状态,而且是精确地只处理一
次。如果某些源更新失败了需要重发元组,已经更新成功的源会跳过重试,失败的源会合
理地处理重试。


但是一次只处理一个元组有很大的缺陷:


1、因为等前一个元组完全处理了再处理下一个元组是一件很恐怖的事情 ,效率低,
时间长。
2、大量调用数据库(起码每个元组一次),
3、没有发挥 storm 并发的优势。
因此这种设计不是可扩展的。


设计二


1、在每个事务中处理一批元组。
如果是全局计数,一批元组的数量可以一次性地更新到数据库中。
2、如果批处理失败,就重发这一批失败的元组。
3、不能给每个元组一个事务 ID,而应该给一批元组一个事务 ID
4、而且各批次之间强有序。


看下图:

如果一个批次处理1000个元组,则对数据库的调用就比设计一减少了1000倍。另外,它也利用了storm的并发优势,因为一个批次的元组是可以并发处理的。

依然没有高效地利用资源。因为拓扑中的worker花费了很多时间在等待其他部分的计算完成。例如:

当bolt1完成了它的处理,它会等待其他的bolt的处理完成之后才可以接收到spout发送的下一批数据。

设计三(storm的设计)

一个关键的认知在于:在批处理中并不是所有的步骤都需要强有序

storm通过将一个批的计算划分为两步骤来实现:

  1. 处理阶段:这是可以并行处理很多批的阶段
  2. 提交阶段:该阶段是各批强有序的。如果批次1没有提交成功,是不会提交批次2的。

两步骤作为一个整体,称为“事务”。在一个给定的时刻很多批都在进行计算,但是只有一个批次处于提交阶段。不管是在处理阶段还是在提交阶段,如果一个批次的元组处理失败了,整个事务重新执行(两个阶段)。

设计细节

当使用事务拓扑的时候,storm进行如下处理:

  1. 管理状态:事务性拓扑的所有必须的状态都存储于Zookeeper中。包括当前的事务ID以及定义了每个批次参数的元数据。
  2. 事务调度:storm管理必要的所有数据以决定在任意一个时点哪个事务应该处于处理阶段还是提交阶段
  3. 错误检测:storm利用确认机制框架高效地确定一个批次是成功处理了、成功提交了,还是失败了。storm会恰当地进行重发。用户不需要做任何的确认或锚标记——storm来管理这一切。
  4. 一流的批处理API:storm在普通bolt之上设计了API用于元组的批处理。storm管理所有的调度以确认何时一个任务收到了某个事务的所有元组。storm也会清除或累加每个事务的状态(就像部分计数一样)。

最后,storm需要一个源队列以进行精确的消息批的重发。Apache Kafka就非常适合这种任务的spout,storm-kafka包含了Kafka事务性spout的具体实现。

事务拓扑API

bolt

在事务拓扑中存在三种bolt:

  1. BasicBolt:该bolt不处理批的元组,而仅仅是基于单个元组进行处理并发射。
  2. BatchBolt:该bolt处理批的元组。对每个元组调用execute方法,当一个批次处理完之后就调用finishBatch方法。
  3. 标记为提交器的bolt:该bolt和普通的bolt唯一的区别在于何时调用finishBatch方法。提交器的finishBatch方法在提交阶段调用。保证在提交阶段,所有前置的批都已经成功提交,并且当它提交成功,就完成了它的使命。两种方式将BatchBolt标记为提交器,要么实现ICommitter接口,要么在TransactionalTopologyBuilder中调用setCommitterBolt方法设置bolt。

处理阶段和提交阶段

为了区分提交阶段和处理阶段的差别,让我们看一个案例:

在该拓扑中,只有标记为红色的才是提交器。

在处理阶段,bolt A处理完spout发送的一个批的元组,调用finishBatch方法将它的元组发送给bolt B和Bolt C。Bolt B是一个提交器,因此它会调用execute方法来处理所有的元组但是不会调用finishBatch方法。Bolt C的finishBatch方法也不会调用,因为它不知道是否已经接收了所有来自B的元组(因为bolt B在等待事务提交)。最后,bolt D会接收通过调用C发送过来的任何元组。

只要不是提交事务,B不会调用finishBatch,C不是提交器,也不调用finishBatch,因为它不知道是否接收了B全部的元组。

当批提交的时候,调用B的finishBatch方法。一旦提交完成,C知道它已经接收了所有的元组并调用finishBatch方法。最后,D会接收到完成的批并调用finishBatch方法。

注意,D是一个提交器,当它接收了批的所有元组,不需要等待第二个提交的信号。因为它在提交阶段已经接收到了整个批的数据,直接提交并完成事务。

提交器bolt的行为和提交阶段的batch bolt很像。唯一的区别在于提交器在事务的处理阶段不会调用finishBatch方法。

确认

在事务拓扑中我们不需要做任何的确认和锚标记。

由storm管理。

确认策略进行了极大的优化。

事务的失败

当使用普通bolt的时候,可以调用OutputCollector的fail方法让整个元组树上的元组失败。由于事务拓扑隐藏了确认框架的细节,它们提供了一个不同的让批失败的方法。直接抛出FailedException。跟普通异常不同,该异常只会导致特定批失败并重发,不会让整个进程宕掉。

事务性spout

TransactionalSpout接口跟普通的Spout接口完全不同。TransactionalSpout的实现类发射元组的批,并且必须保证同批元组永远以相同的事务ID发射。

事务spout看起来像这样:

左边的调度器是storm一个普通的spout,当需要发射事务批元组的时候它会挨个儿发送元组。发射器的执行跟storm的bolt很像,负责发射批中的元组。发射器使用全分组订阅调度器的"batch emit"流。

TransactionalSpout在zookeeper中存储少量状态,用于对发出的元组进行幂等,即对于相同的事务id重发的时候要保证数据是一样的。如果获取不到一样的数据,可以使用非幂等事务spout。

分区事务spout

对于事务spout一个常见的情形是从跨多个队列的一组分区中读取消息。例如,这就是TransactionalKafkaSpout做的事情。IPartitionedTransactionalSpout自动执行管理每个分区的状态的簿记工作,以确保幂等可重放性。

配置

对于事务性拓扑有两个重要的配置:

  1. zookeeper:默认,事务拓扑在同一个zookeeper实例中保存状态信息以管理storm集群。可以通过"transactional.zookeeper.servers"和"transactional.zookeeper.port"配置zookeeper。
  2. 允许一次处理的批数量:必须设置一次处理几个批的限制数字。使用"topology.max.spout.pending"。如果不配置,默认是1。

数据流如何工作
    1. 事务spout是一个包含了调度器spout和发射器bolt的子拓扑。
    2. 调度器是一个并行度为1的普通spout。
    3. 发射器是一个并行度为P的bolt,使用全分组订阅调度器spout的"batch"流。
    4. 当调度器spout确认了事务应该进入处理阶段了,就发射包含TransactionalAttempt和元数据的事务元组到"batch"流。
    5. 由于使用了全分组,每个发射器都会接收到通知,知道该发射事务中它们自己部分的元组了
    6. 在整个拓扑中,storm自动管理锚标记和确认机制,以确认何时事务完成了处理阶段。这里的关键是“根元组由调度器创建”,如果处理成功了,调度器会接收到确认消息,如果不管什么原因处理不成功(失败或超时),则接收到"fail"。
    7. 如果处理阶段成功了,所有前置的事务也成功提交了,调度器发射一个包含了TransactionAttempt的元组给"commit"流。
    8. 所有提交的bolt通过全分组订阅提交流信息,因此它们会接收到一个什么时候发生提交的通知。
    9. 跟处理阶段一样,调度器使用确认框架确认提交阶段是否成功。如果收到ack,就在zookeeper中将该事务标记为已完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/108710.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

leetCode 2578. 最小和分割 + 排序 + 贪心 + 奇偶分组(构造最优解)

2578. 最小和分割 - 力扣(LeetCode) 给你一个正整数 num ,请你将它分割成两个非负整数 num1 和 num2 ,满足: num1 和 num2 直接连起来,得到 num 各数位的一个排列。 换句话说,num1 和 num2 中所…

黑客在Pwn2Own Toronto上以58个零日漏洞赚取超过100万美元

Pwn2Own Toronto 2023黑客大赛已经圆满结束,安全研究人员通过攻击消费类产品的58个零日漏洞(以及多个漏洞碰撞)赚取了1,038,500美元。此次比赛由趋势科技的零日倡议(Zero Day Initiative,简称ZDI)组织&…

目标检测及锚框、IoU

文章目录 1. 目标检测2. 锚框3. IoU - 交并比4. 赋予锚框标号5. 使用非极大值抑制(NMS)输出 1. 目标检测 物体检测(目标检测)是计算机视觉和数字图像处理的热门方向,意在判断一幅图像上是否存在感兴趣物体&#xff0c…

在pycharm中,远程操作服务器上的jupyter notebook

一、使用场景 现在我们有两台电脑,一台是拥有高算力的服务器,另一台是普通的轻薄笔记本电脑。如何在服务器上运行jupyter notebook,同时映射到笔记本电脑上的pycharm客户端中进行操作呢? 二、软件 pycharm专业版,jupy…

【Python · PyTorch】线性代数 微积分

本文采用Python及PyTorch版本如下: Python:3.9.0 PyTorch:2.0.1cpu 本文为博主自用知识点提纲,无过于具体介绍,详细内容请参考其他文章。 线性代数 & 微积分 1. 线性代数1.1 基础1.1.1 标量1.1.2 向量长度&…

【LeetCode】7. 整数反转

题目链接 文章目录 Python3官方解法 ⟮ O ( ∣ x ∣ ) 、 O ( 1 ) ⟯ \lgroup O(|x|)、O(1)\rgroup ⟮O(∣x∣)、O(1)⟯写法2写法3 C官方解法 ⟮ O ( ∣ x ∣ ) 、 O ( 1 ) ⟯ \lgroup O(|x|)、O(1)\rgroup ⟮O(∣x∣)、O(1)⟯ Python3 官方解法 ⟮ O ( ∣ x ∣ ) 、 O ( 1…

数据库调优(Mysql)

1 索引 索引是帮助数据库高效查询的一种数据结构: 查询语句:select * from t where t.Col2 89; 不加索引进行数据库查询时,每次都需要将所有数据遍历一次,直到找到符合目标的数据。 加上索引之后,可以根据数据结构不同…

数据结构【DS】B树

m阶B树的核心特性: Q:根节点的子树数范围是多少?关键字数的范围是多少? A:根节点的子树数∈[2, m],关键字数∈[1, m-1]。 Q:其他结点的子树数范围是多少?关键字数范围是多少? Q:对任…

F5修复了允许远程代码执行攻击的BIG-IP认证绕过漏洞

图片 导语 近日,网络安全公司Praetorian Security的研究人员发现了一项影响F5 BIG-IP配置工具的严重漏洞,该漏洞被命名为CVE-2023-46747。攻击者可以通过远程访问配置工具来执行未经身份验证的远程代码,从而对系统进行攻击。本文将详细介绍该…

Linux(Centos7)操作记录

1、nginx -t #Nginx配置文件检查 上述截图代表检查没问题 上述截图检查配置文件配置错误,并提示错误文件位置 2、systemctl restart nginx #重启Nginx 重启Nginx失败 3、systemctl status nginx.service #查看Nginx服务状态 80端口被占导致服务启动失败 4、n…

QT OpenGL (1)2D Painting Example

2D Painting Example 为方便查阅,此文是原网站文档翻译与整理,如有侵权,请与本人联系。 官网 目录 2D Painting Example概述Helper类定义Helper类实现Widget类定义Widget类实现GLWidget类定义GLWidget类实现Window 类定义Window 类实现运行示…

STM32 PWM配置及呼吸灯

PWM的英文全称是"Pulse Width Modulation",中文翻译为"脉冲宽度调制"。 在PWM中可以调节的其实只有两个东西,一个叫做可调周期(调频率),另一个叫做占空比(高电平/周期)。 而…

STM32F103的中断

文章目录 STM32F103的NVICSTM32F103 的中断优先级分组 STM32F103的NVIC CM3 内核支持 256 个中断,其中包含了 16 个内核中断和 240 个外部中断,并且具有 256级的可编程中断设置。 CM3中每个中断通道都具备自己的8位中断优先级控制字节, 但ST…

ArcGIS Maps SDK for JS:隐藏地图边框

文章目录 1 问题描述2 解决方案 1 问题描述 近期,将ArcGIS Api for JS v4.16更新到了ArcGIS Maps SDK for JS v4.27,原本去除地图的css代码失效了。 v4.26及以前版本 ,需要用.esri-view-surface--inset-outline:focus::after 控制边框属性。…

Vuex模块化(modules)与namespaced(命名空间)的搭配

Vuex模块化(modules)与namespaced(命名空间)的搭配 Vuex模块化(modules)格式 原理:可以对Vuex的actions,mutations,state,getters四个属性综合成一个部分&a…

假如我有一台服务器,我会让它提供三种服务

一、提供照片上传、存储和下载服务 随着移动互联网时代的持续快速发展,PC互联网日益势微,各大互联网门户网站的博客、空间也跟着凋零, 作为博客、空间的标配功能的相册也随之被关闭。 2019年3月6日网易相册发布停运公告并于当年5月8日正式停…

【Android】Android Framework系列---CarPower电源管理

Android Framework系列—CarPower电源管理 智能座舱通常包括中控系统、仪表系统、IVI系统 、后排娱乐、HUD、车联网等。这些系统需要由汽车电源进行供电。由于汽车自身的特殊供电环境(相比手机方便的充电环境,汽车的蓄电池如果没有电是需要专业人士操作…

【观察】Dell APEX云平台:引领多云时代上云新范式

毫无疑问,过去十多年是云计算发展的黄金十年,云计算理念不断被市场所接受,但随着企业上云深入和认知度的不断增加,摆在很多企业面前的选择题也发生了新变化,即从过去企业上云或不上云的纠结,转变成今天如何…

【数据结构练习题】删除有序数组中的重复项

✨博客主页:小钱编程成长记 🎈博客专栏:数据结构练习题 🎈相关博文:消失的数字 — 三种解法超详解 删除有序数组中的重复项 1.🎈题目2. 🎈解题思路3. 🎈具体代码🎇总结 1…

【Spring】Spring MVC请求响应

文章目录 1. 请求1.1 传递单个参数1.2 传递多个参数1.3 传递对象1.4 后端参数重命名1.5 传递数组1.6 传递集合1.7 传递JSON对象1.8 获取URL中参数1.9 上传⽂件1.10 获得Cookie1.11 获得Session1.12 获得Header 2. 响应2.1 返回静态界面2.2 返回数据2.3 返回HTML代码片段2.4 返回…