大数据-Storm流式框架(四)---storm容错机制

1、集群节点宕机

Nimbus服务器     硬件

单点故障?可以搭建HA jStorm搭建  nimbus的HA

nimbus的信息存储到zookeeper中,只要下游没问题(进程退出)nimbus退出就不会有问题,

如果在nimbus宕机,也不能提交作业。

非Nimbus服务器     硬件

supervisor故障时,该节点上所有Task任务都会超时,Nimbus会将这些Task任务重新分配到其他服务器上运行

2、进程挂掉

Worker

挂掉时,Supervisor会重新启动这个进程。如果启动过程中仍然一直失败,并且无法向zookeeper发送心跳,Nimbus会将该Worker执行的任务重新分配到其他服务器上

Supervisor

无状态(所有的状态信息都存放在Zookeeper中来管理)

快速失败(每当遇到任何异常情况,都会自动毁灭)

Nimbus

无状态(所有的状态信息都存放在Zookeeper中来管理)

快速失败(每当遇到任何异常情况,都会自动毁灭)

3、消息的完整性

从Spout中发出的Tuple,以及基于他所产生Tuple(例如上个例子当中Spout发出的句子,以及句子当中单词的tuple等)。由这些消息就构成了一棵tuple树。当这棵tuple树发送完成,并且树当中每一条消息都被正确处理,就表明spout发送消息被“完整处理”,即消息的完整性。

元组树

如果衍生元组出错,则重发根元组

根元组就是spout发送的那个元组

Acker -- 消息完整性的实现机制

Storm的拓扑当中特殊的一些任务

负责跟踪每个Spout发出的Tuple的DAG(有向无环图)

默认每个worker一个acker

保证消息至少处理一次

Storm提供了几种不同级别的保证消息处理,包括尽力而为(best effort),至少一次(at least once),以及通过Trident保证只完全处理一次(exactly once)。

此处指的是至少完全处理一次

当元组树已经用完并且树中的每条消息都已处理完毕时,Storm会认为从水龙头发射的元组是“完全处理”的。如果在指定的超时内无法完全处理其消息树,则认为该元组失败。可以使用Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS对指定的拓扑进行配置,默认为30秒

public interface ISpout extends Serializable {
    void open(Map conf, TopologyContext context, SpoutOutputCollector collector);
    void close();
    void nextTuple();
    void ack(Object msgId);
    void fail(Object msgId);
}

首先,storm调用spout的nextTuple方法发射一个元组。spout使用SpoutOutputCollector在声明的流中发射元组。当发射元组的时候,spout使用messageId标记该元组。

_collector.emit(new Values("field1", "field2", 3) , msgId);

其次,当向消费闪电发送元组的时候,strom追踪该消息树。如果storm发现一个元组被完全处理了,storm就会调用spout的ack方法并将该元组的messageId传送给它。如果元组处理超时,storm就调用spout的fail方法。只会在发射该元组的spout中调用它的ack或者fail方法。

好处是:

首先,当创建了新的元组树边的时候,要通知storm。其次,当完成了一个元组的处理之后要告诉storm。通过这种方式,storm就可以知道元组被完全处理了,然后调用ack方法,或者调用fail方法,如果处理失败。

在元组树中创建一条边,称为锚点。当发送一个新元组的时候就会创建一条边。

public class SplitSentence extends BaseRichBolt {
        OutputCollector _collector;

        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
            _collector = collector;
        }

        public void execute(Tuple tuple) {
            String sentence = tuple.getString(0);
            for(String word: sentence.split(" ")) {
                _collector.emit(tuple, new Values(word));
            }
            _collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("word"));
        }        
    }

每个元组通过在emit中指定输入的元组进行锚点标记

由于被标记锚点,如果在下游处理中处理失败了,元组树顶点的元组会重新发送。

_collector.emit(new Values(word));

上述方式没有对元组进行锚点标记。当元组在下游处理失败了,根元组不会重发。输出的元组也可以标记多个锚点。如果流中有聚合或者join,就比较有用。标记了多个锚点的元组如果处理失败,就会触发多个元组树根元组重发。通过list集合为元组标记多个锚点:

List<Tuple> anchors = new ArrayList<Tuple>();
anchors.add(tuple1);
anchors.add(tuple2);
_collector.emit(anchors, new Values(1, 2, 3));

很明显,手动调用fail方法比通过元组的处理超时从而由storm调用fail方法要更快地重发根元组。由于storm使用内存来存储元组树,如果不及时的ack或者fail有可能导致内存溢出。

storm的拓扑中提供了一组acker用于追踪spout发射的每个元组及其衍生的元组,一旦发现DAG处理完了,就同创建该元组的spout进行确认。

Config.TOPOLOGY_ACKERS用于设置acker的数量。默认情况下一个worker一个acker任务

当拓扑创建了元组,就会为其分配一个随机的64bit的id,acker使用该ID追踪spout发送的每个元组。元组树上的元组都知道这个ID。当在闪电中创建了新的元组,该ids会拷贝给新的元组。当元组确认后,元素发送消息给acker任务,以改变元组树。

当通过C衍生出D和E并且确认之后,就会从元组树中移除C。这样可以保证元组树不会过早地完成。

如果拓扑中包含多个acker,当一个元组确认后,如何知道向哪个acker发送确认消息?

storm使用hash取模的方式将一个spout的元组id跟一个acker任务绑定。

acker任务如何知道该向哪个spout任务发送确认消息?spout发射元组的时候会给合适的acker发送一个消息表示对哪个spout的元组负责。acker发现元组树完成了,就知道向哪个spout任务发送完成的消息。

acker不显式地追踪元组。如果有数十万的节点,追踪所有的元组会有耗尽acker内存的风险。acker使用一个定长的空间20字节做这个工作。这个是storm的主要创新。

acker将spout发射的元组id和一个64bit的数字ack val)相关联。ack val代表了该元组树的状态,不管spout发射的元组及其衍生的元组有多少,它仅仅是对所有创建的元组以及确认的元组id求异或xor操作如果最终这个值ack val变成0,表示元组树已经被完全处理。某个元组的id和该64bit的数字异或结果是0的情况极其少见,比如每秒处理10k的元组,需要5000万年才会产生一个错误,造成数据丢失。

元组都会产生和消亡,元组不会凭空产生,也不会凭空消失

元组守恒定理

acker为当前根元组预留一个000二进制数字

messageId

000

100

100

010

110

001

111

100

011

010

001

001

000

如果这个值不是0,则过30s,要求spout重发根元组。

acker

异常情况:
  1. 由于任务死掉,没有确认元组:元组树根节点的元组和丢掉的元组确认会超时,重新发送根元组。
  2. acker死掉:所有的元组确认都会超时,根节点元组重发
  3. spout死掉:spout获取数据的数据源负责重新发送消息。例如:MQ等会将所有打开的消息放回到队列中,之后重新处理。

由此可见,strom的可靠性机制完全是分布式的,可扩展的以及容错的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:/a/104799.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

特约|数码转型思考:Web3.0与银行

日前&#xff0c;欧科云链研究院发布重磅报告&#xff0c;引发银行界及金融监管机构广泛关注。通过拆解全球70余家银行的加密布局&#xff0c;报告认为&#xff0c;随着全球采用率的提升与相关技术的成熟&#xff0c;加密资产已成为银行业不容忽视也不能错过的创新领域。 作为…

机器学习实验一:KNN算法,手写数字数据集(使用汉明距离)(2)

KNN-手写数字数据集&#xff1a; 使用sklearn中的KNN算法工具包&#xff08; KNeighborsClassifier)替换实现分类器的构建&#xff0c;注意使用的是汉明距离&#xff1b; 运行结果&#xff1a;&#xff08;大概要运行4分钟左右&#xff09; 代码&#xff1a; import pandas as…

记录一次时序数据库的实战测试

0x1.前言 ​ 本文章仅用于信息安全防御技术分享&#xff0c;因用于其他用途而产生不良后果&#xff0c;作者不承担任何法律责任&#xff0c;请严格遵循中华人民共和国相关法律法规&#xff0c;禁止做一切违法犯罪行为。文中涉及漏洞均以提交至教育漏洞平台。 0x2.背景 ​ 在某…

023-第三代软件开发-自定义Button

第三代软件开发-自定义Button 文章目录 第三代软件开发-自定义Button项目介绍自定义Button第一类型-加声音第二类型-加样式 第三类型-减声音总结一下存在一点小问题 关键字&#xff1a; Qt、 Qml、 Button、 关键字4、 关键字5 项目介绍 欢迎来到我们的 QML & C 项目&…

Redis缓存(缓存预热,缓存穿透,缓存雪崩,缓存击穿)

目录 一, 缓存 1, 什么是缓存 2, 什么是热点数据(热词) 3, 缓存更新策略 3.1 定期生成 3.2 实时生成 二, Redis缓存可能出现的问题 1, 缓存预热 1.1 什么是缓存预热 1.2 缓存预热的过程 2, 缓存穿透 2.1 什么是缓存穿透 2.2 缓存穿透产生的原因 2.3 缓存穿透的解…

优思学院|中质协六西格玛考试形式是什么样的?

中质协的考试形式主要为单选和多选题&#xff0c;近年也有加了一小部分填空题&#xff0c;和国际认证考试有很大区别&#xff0c;因为美质协&#xff08;ASQ&#xff09;、国际精益六西格玛研究所&#xff08;ILSSI&#xff09;&#xff0c;又或者著名的PMP项目管理认证等都是采…

Java性能分析工具

Arthas 官网&#xff1a;简介 | arthas Arthas 是 Alibaba 开源的 Java 诊断工具&#xff0c;深受开发者喜爱。Arthas 是一款线上监控诊断产品&#xff0c;通过全局视角实时查看应用 load、内存、gc、线程的状态信息&#xff0c;并能在不修改应用代码的情况下&#xff0c;对业…

Spark SQL概述与基本操作

目录 一、Spark SQL概述 &#xff08;1&#xff09;概念 &#xff08;2&#xff09;特点 &#xff08;3&#xff09;Spark SQL与Hive异同 &#xff08;4&#xff09;Spark的数据抽象 二、Spark Session对象执行环境构建 (1)Spark Session对象 &#xff08;2&#xff09;代码演…

【深度学习实验】循环神经网络(五):基于GRU的语言模型训练(包括自定义门控循环单元GRU)

文章目录 一、实验介绍二、实验环境1. 配置虚拟环境2. 库版本介绍 三、实验内容&#xff08;一&#xff09;自定义门控循环单元&#xff08;GRU&#xff0c;Gated Recurrent Unit&#xff09;1. get_params2. init_gru_state3. gru &#xff08;二&#xff09;创建模型0. 超参数…

3. 博弈树 (15分)

下棋属于一种博弈游戏&#xff0c;博弈过程可以用树&#xff08;博弈树&#xff09;来表示。假设游戏由两个人&#xff08; A 和 B &#xff09;玩&#xff0c;开始由某个人从根结点开始走&#xff0c;两个人轮流走棋&#xff0c;每次只能走一步&#xff0c; 下一步棋只能选择当…

OTA语音芯片NV040C在智能电动牙刷的应用

以往我们对牙齿的清洁是使用的是手动方式进行&#xff0c;用柔软的牙刷刷毛去进行牙齿的清洁。但现在我们拥有了一种新颖的刷牙方式&#xff0c;靠电力去驱动、清洁我们的牙齿。电动牙刷的刷头通过快速旋转&#xff0c;产生高频振动&#xff0c;将牙膏迅速分解为细小的泡沫&…

支付宝支付接入流程

一、 接入准备 支付宝支付流程没有微信那么复杂&#xff0c;而且支付宝支持沙箱。登录支付宝开放平台控制台 点击开发工具中的沙箱 接口加密方式&#xff0c;我这里使用的是自定义密钥。生成密钥的方式 使用支付宝官方提供的密钥工具&#xff0c;唯一要注意的是支付宝密钥工具…

idea + Docker-Compose 实现自动化打包部署(仅限测试环境)

一、修改docker.service文件&#xff0c;添加监听端口 vi /usr/lib/systemd/system/docker.service ExecStart/usr/bin/dockerd -H fd:// --containerd/run/containerd/containerd.sock -H tcp://0.0.0.0:2375 -H unix://var/run/docker.sock重启docker服务 systemctl daemo…

线性代数3:矢量方程

一、前言 欢迎回到系列文章的第三篇文章&#xff0c;内容是线性代数的基础知识&#xff0c;线性代数是机器学习背后的基础数学。在我之前的文章中&#xff0c;我介绍了梯队矩阵形式。本文将介绍向量、跨度和线性组合&#xff0c;并将这些新想法与我们已经学到的内容联系起来。本…

基于 Redis + Lua 脚本实现分布式锁,确保操作的原子性

1.加锁的Lua脚本&#xff1a; lock.lua --- -1 failed --- 1 success--- getLock key local result redis.call(setnx , KEYS[1] , ARGV[1]) if result 1 then--PEXPIRE:以毫秒的形式指定过期时间redis.call(pexpire , KEYS[1] , 3600000) elseresult -1;-- 如果value相同&…

数据清洗(data clean)

整理了下数据清洗的基本流程&#xff0c;异常值部分整理在了EDA中。

什么是简单网络管理协议(SNMP)

简单网络管理协议&#xff08;SNMP&#xff1a;Simple Network Management Protocol&#xff09;是由互联网工程任务组&#xff08;IETF&#xff1a;Internet Engineering Task Force &#xff09;定义的一套网络管理协议。该协议基于简单网关监视协议&#xff08;SGMP&#xf…

React中的Virtual DOM(看这一篇就够了)

文章目录 前言了解Virtual DOMreact创建虚拟dom的方式React Element虚拟dom的流程虚拟dom和真实dom的对比后言 前言 hello world欢迎来到前端的新世界 &#x1f61c;当前文章系列专栏&#xff1a;react合集 &#x1f431;‍&#x1f453;博主在前端领域还有很多知识和技术需要掌…

DevOps持续集成-Jenkins(3)

文章目录 DevOpsDevOps概述Jenkins实战3&#xff1a;实战1和实战2的加强版&#xff08;新增SonarQube和Harbor&#xff09;⭐环境准备⭐项目架构图对比Jenkins实战1和实战2&#xff0c;新增内容有哪些&#xff1f;SonarQube教程采用Docker安装SonarQube &#xff08;在Jenkins所…

生成树协议:监控 STP 端口和交换机

什么是生成树协议 生成树协议 &#xff08;STP&#xff09; 用于网络交换机&#xff0c;以防止循环和广播风暴。在局域网 &#xff08;LAN&#xff09; 中&#xff0c;两条或多条冗余路径可以连接到同一网段。当交换机或网桥从所有可用端口传输帧时&#xff0c;这些帧开始在网…