【SpringCloud】Hystrix源码解析

e246a1dda09849a5a89535a62441565d.png

hystrix是一个微服务容错组件,提供了资源隔离、服务降级、服务熔断的功能。这一章重点分析hystrix的实现原理

1、服务降级

当服务实例所在服务器承受的压力过大或者受到网络因素影响没法及时响应请求时,请求会阻塞堆积,情况严重的话整个系统都会瘫痪,hystrix为此提供了一种容错机制:当服务实例没法及时响应请求,可以采用服务降级的方式快速失败,维持系统的稳定性

服务降级和@HystrixCommand注解绑定,查看它的源码

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface HystrixCommand {

    ...

    String fallbackMethod() default "";

}

源码提供的信息很少,要分析注解的功能得找到处理注解的类:HystrixCommandAspect

@Aspect
public class HystrixCommandAspect {
    ...
    
    // 环绕通知
    @Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
    public Object methodsAnnotatedWithHystrixCommand(ProceedingJoinPoint joinPoint) throws Throwable {
        Method method = AopUtils.getMethodFromTarget(joinPoint);
        Validate.notNull(method, "failed to get method from joinPoint: %s", new Object[]{joinPoint});
        if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
            throw new IllegalStateException("method cannot be annotated with HystrixCommand and HystrixCollapser annotations at the same time");
        } else {
            MetaHolderFactory metaHolderFactory = (MetaHolderFactory)META_HOLDER_FACTORY_MAP.get(HystrixCommandAspect.HystrixPointcutType.of(method));
            MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
            HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
            ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ? metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

            try {
                Object result;
                if (!metaHolder.isObservable()) {
                    // 代理执行方法
                    result = CommandExecutor.execute(invokable, executionType, metaHolder);
                } else {
                    result = this.executeObservable(invokable, executionType, metaHolder);
                }

                return result;
            } catch (HystrixBadRequestException var9) {
                throw var9.getCause();
            } catch (HystrixRuntimeException var10) {
                throw this.hystrixRuntimeExceptionToThrowable(metaHolder, var10);
            }
        }
    }
}

从命名上我们能看出这是一个切面,说明服务降级是通过aop代理实现的,跟踪CommandExecutor的execute方法

调用链:
-> CommandExecutor.execute
-> castToExecutable(invokable, executionType).execute()
-> HystrixCommand.execute
-> this.queue().get()
public Future<R> queue() {

	// 获取Future对象
	final Future<R> delegate = this.toObservable().toBlocking().toFuture();
	Future<R> f = new Future<R>() {
		...

		public R get() throws InterruptedException, ExecutionException {
			return delegate.get();
		}

		public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
			return delegate.get(timeout, unit);
		}
	};
	...
}

HystrixCommand类的queue方法返回了一个Future对象,在线程任务中常用Future对象来获取任务执行完成后返回的结果。这里的Future对象是通过this.toObservable().toBlocking().toFuture()创建的,点击查看toObservable方法,它返回一个Observable对象

public Observable<R> toObservable() {
   ...
   final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        public Observable<R> call() {
              return 
((CommandState)AbstractCommand.this.commandState.get()).equals(AbstractCommand.CommandState.UNSUBSCRIBED) ? Observable.never() : 
              // 传入指令执行任务
              AbstractCommand.this.applyHystrixSemantics(AbstractCommand.this);
        }
   };
   ...
	return Observable.defer(new Func0<Observable<R>>() {
		public Observable<R> call() {
                ...
                // 有订阅者订阅了才创建Observable对象
				Observable<R> hystrixObservable = 
                Observable.defer(applyHystrixSemantics).map(wrapWithAllOnNextHooks);
				Observable afterCache;
				if (requestCacheEnabled && cacheKey != null) {
					HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, AbstractCommand.this);
					HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache)AbstractCommand.this.requestCache.putIfAbsent(cacheKey, toCache);
					if (fromCache != null) {
						toCache.unsubscribe();
						AbstractCommand.this.isResponseFromCache = true;
						return AbstractCommand.this.handleRequestCacheHitAndEmitValues(fromCache, AbstractCommand.this);
					}

					afterCache = toCache.toObservable();
				} else {
					afterCache = hystrixObservable;
				}

				return afterCache.doOnTerminate(terminateCommandCleanup).doOnUnsubscribe(unsubscribeCommandCleanup).doOnCompleted(fireOnCompletedHook);
			...
		}
	});        
}

Observable对象的创建任务委托了给了AbstractCommand.this.applyHystrixSemantics方法

private Observable<R> applyHystrixSemantics(AbstractCommand<R> _cmd) {
	this.executionHook.onStart(_cmd);
	// 是否允许请求,判断熔断状态
	if (this.circuitBreaker.allowRequest()) {
		final TryableSemaphore executionSemaphore = this.getExecutionSemaphore();
		final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
		Action0 singleSemaphoreRelease = new Action0() {
			public void call() {
				if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
					executionSemaphore.release();
				}

			}
		};
		Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
			public void call(Throwable t) {
				AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, AbstractCommand.this.commandKey);
			}
		};
		if (executionSemaphore.tryAcquire()) {
			try {
				this.executionResult = this.executionResult.setInvocationStartTime(System.currentTimeMillis());
				// 执行任务
				return this.executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);
			} catch (RuntimeException var7) {
				return Observable.error(var7);
			}
		} else {
			return this.handleSemaphoreRejectionViaFallback();
		}
	} else {
		// 处于熔断状态,执行备用任务
		return this.handleShortCircuitViaFallback();
	}
}

this.circuitBreaker.allowReques返回true表示没有熔断,走executeCommandAndObserve方法

private Observable<R> executeCommandAndObserve(AbstractCommand<R> _cmd) {
	...
	Observable execution;
	if ((Boolean)this.properties.executionTimeoutEnabled().get()) {
		// 添加了超时监控
		execution = this.executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator(_cmd));
	} else {
		execution = this.executeCommandWithSpecifiedIsolation(_cmd);
	}

   ...
	// handleFallback:不同异常状况下使用不同的处理方法
	Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
		public Observable<R> call(Throwable t) {
			Exception e = AbstractCommand.this.getExceptionFromThrowable(t);
			AbstractCommand.this.executionResult = AbstractCommand.this.executionResult.setExecutionException(e);
			if (e instanceof RejectedExecutionException) {
				return AbstractCommand.this.handleThreadPoolRejectionViaFallback(e);
			} else if (t instanceof HystrixTimeoutException) {
				// 抛出超时异常时,做超时处理
				return AbstractCommand.this.handleTimeoutViaFallback();
			} else if (t instanceof HystrixBadRequestException) {
				return AbstractCommand.this.handleBadRequestByEmittingError(e);
			} else if (e instanceof HystrixBadRequestException) {
				AbstractCommand.this.eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, AbstractCommand.this.commandKey);
				return Observable.error(e);
			} else {
				return AbstractCommand.this.handleFailureViaFallback(e);
			}
		}
	};
   ...

	return execution.doOnNext(markEmits).doOnCompleted(markOnCompleted)
		  // 调用handleFallback处理异常
		  .onErrorResumeNext(handleFallback).doOnEach(setRequestContext);
}
private static class HystrixObservableTimeoutOperator<R> implements Observable.Operator<R, R> {
        final AbstractCommand<R> originalCommand;

        public HystrixObservableTimeoutOperator(AbstractCommand<R> originalCommand) {
            this.originalCommand = originalCommand;
        }

        public Subscriber<? super R> call(final Subscriber<? super R> child) {
            final CompositeSubscription s = new CompositeSubscription();
            child.add(s);
            final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(this.originalCommand.concurrencyStrategy, new Runnable() {
                public void run() {
                    // 3.抛出超时异常
                    child.onError(new HystrixTimeoutException());
                }
            });
            HystrixTimer.TimerListener listener = new HystrixTimer.TimerListener() {
                // 1.判断是否超时
                public void tick() {
                    if (HystrixObservableTimeoutOperator.this.originalCommand.isCommandTimedOut.compareAndSet(AbstractCommand.TimedOutStatus.NOT_EXECUTED, AbstractCommand.TimedOutStatus.TIMED_OUT)) {
                        HystrixObservableTimeoutOperator.this.originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, HystrixObservableTimeoutOperator.this.originalCommand.commandKey);
                        s.unsubscribe();
                        // 2.执行超时任务
                        timeoutRunnable.run();
                    }
                }
            };
            
        }
    }

executeCommandAndObserve方法添加超时监控,如果任务执行超出限制时间会抛出超时异常,handleTimeoutViaFallback方法负责处理超时异常

private Observable<R> handleTimeoutViaFallback() {
	// 1.根据异常类型处理异常
	return this.getFallbackOrThrowException(this, HystrixEventType.TIMEOUT, FailureType.TIMEOUT, "timed-out", new TimeoutException());
}

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, HystrixEventType eventType, final HystrixRuntimeException.FailureType failureType, final String message, final Exception originalException) {
	...
	// 获取回调观察者
	fallbackExecutionChain = this.getFallbackObservable();
	...
}    

protected final Observable<R> getFallbackObservable() {
	return Observable.defer(new Func0<Observable<R>>() {
		public Observable<R> call() {
			try {
				// 执行备用方法
				return Observable.just(HystrixCommand.this.getFallback());
			} catch (Throwable var2) {
				return Observable.error(var2);
			}
		}
	});
}

到这里终于看到了getFallback方法,它会调用注解中fallback指向的方法,快速失败返回响应结果

protected Object getFallback() {
	// 获取注解中的备用方法信息
	final CommandAction commandAction = this.getFallbackAction();
	if (commandAction != null) {
		try {
			return this.process(new AbstractHystrixCommand<Object>.Action() {
				Object execute() {
					MetaHolder metaHolder = commandAction.getMetaHolder();
					Object[] args = CommonUtils.createArgsForFallback(metaHolder, GenericCommand.this.getExecutionException());
					return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
				}
			});
		} catch (Throwable var3) {
			LOGGER.error(FallbackErrorMessageBuilder.create().append(commandAction, var3).build());
			throw new FallbackInvocationException(ExceptionUtils.unwrapCause(var3));
		}
	} else {
		return super.getFallback();
	}
}

回到AbstractCommand.this.applyHystrixSemantics方法,当this.circuitBreaker.allowReques返回true时会走正常请求,返回false时表示服务进入熔断状态,熔断状态会走getFallback方法

调用链
-> AbstractCommand.handleShortCircuitViaFallback
-> getFallbackOrThrowException
-> this.getFallbackObservable
-> GenericCommand.getFallback

2、服务熔断

服务熔断是hystrix提供的一种保护机制,当一段时间内服务响应的异常的次数过多,hystrix会让服务降级快速返回失败信息,避免累积压力造成服务崩溃。
联系上文找到circuitBreaker.allowRequest方法,该方法判断是否允许请求往下走

public boolean allowRequest() {
	// 是否强制打开熔断
	if ((Boolean)this.properties.circuitBreakerForceOpen().get()) {
		return false;
	// 是否强制关闭熔断
	} else if ((Boolean)this.properties.circuitBreakerForceClosed().get()) {
		this.isOpen();
		return true;
	} else {
		return !this.isOpen() || this.allowSingleTest();
	}
}

public boolean isOpen() {
	if (this.circuitOpen.get()) {
		return true;
	} else {
		HystrixCommandMetrics.HealthCounts health = this.metrics.getHealthCounts();
		// 请求次数是否超过单位时间内请求数阈值
		if (health.getTotalRequests() < (long)(Integer)this.properties.circuitBreakerRequestVolumeThreshold().get()) {
			return false;
		// 请求异常次数占比
		} else if (health.getErrorPercentage() < (Integer)this.properties.circuitBreakerErrorThresholdPercentage().get()) {
			return false;
		} else if (this.circuitOpen.compareAndSet(false, true)) {
			this.circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
			return true;
		} else {
			return true;
		}
	}
}

isOpen方法内有针对请求的各种量化计算,当请求异常情况过多,就会触发熔断,走服务降级

3、总结

hystrix组件会根据请求状态判断是否执行请求,当请求超时或者存在其他异常会走备用方法,当异常次数过多会进入熔断状态快速失败,避免服务累积过多压力

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/775997.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

【算法笔记自学】入门篇(2)——算法初步

4.1排序 自己写的题解 #include <stdio.h> #include <stdlib.h>void selectSort(int A[], int n) {for(int i 0; i < n - 1; i) { // 修正索引范围int k i;for(int j i 1; j < n; j) { // 修正索引范围if(A[j] < A[k]) {k j;}}if (k ! i) { // 仅在…

取证与数据恢复:冷系统分析,实时系统分析与镜像分析之间的过渡办法

天津鸿萌科贸发展有限公司是 ElcomSoft 系列取证软件的授权代理商。 ElcomSoft 系列取证软件 ElcomSoft 系列取证软件支持从计算机和移动设备进行数据提取、解锁文档、解密压缩文件、破解加密容器、查看和分析证据。 计算机和手机取证的完整集合硬件加速解密最多支持10,000计…

面向对象案例:电影院

TOC 思路 代码 结构 具体代码 Movie.java public class Movie {//一共七个private int id;private String name;private double price;private double score;private String director;private String actors;private String info;//get和setpublic int getId() {return id;…

2024年【湖北省安全员-C证】考试资料及湖北省安全员-C证考试试卷

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 湖北省安全员-C证考试资料是安全生产模拟考试一点通生成的&#xff0c;湖北省安全员-C证证模拟考试题库是根据湖北省安全员-C证最新版教材汇编出湖北省安全员-C证仿真模拟考试。2024年【湖北省安全员-C证】考试资料及…

解决@Autowired 注入service 到 static接口方法的问题

1 对类进行 Component 定义 2 定义service及 static service Component public class OperationalJudgment {private static MemberService memberService;Resourceprivate MemberService service;PostConstructpublic void init() {memberServicethis.service;}3 static方法中…

PTrade常见问题系列3

量化允许同时运行回测和交易的策略个数配置。 量化允许同时运行回测和交易的策略个数在哪里查看&#xff1f; 在量化服务器/home/fly/config/custom_config_conf文件中&#xff0c;其中运行回测的策略个数由backtest_switch&#xff08;是否限制普通回测个数&#xff09;及ba…

AutoCAD 2022 for Mac/Win版 安装包下载

AutoCAD 2022 是由 Autodesk 开发的一款计算机辅助设计&#xff08;CAD&#xff09;软件。它广泛应用于工程、建筑、制造、动画和媒体娱乐等多个领域。 系统要求&#xff1a; 操作系统&#xff1a;Windows 10 或更高版本。 处理器&#xff1a;Intel 或 AMD 处理器&#xff0c…

算法库应用--寻找最长麦穗

学习贺利坚老师算法库 数据结构例程——串的顺序存储应用_使用顺序串存储身份证号-CSDN博客 本人详细解析博客 串的顺序存储的应用实例二_串的顺序存储应用-CSDN博客 版本更新日志 V1.0: 在原有的基础上, 进行优化名字, 并且有了相应的算法库作为支撑, 我使用了for循环来代替老…

第N7周:seq2seq翻译实战-pytorch复现-小白版

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客&#x1f356; 原作者&#xff1a;K同学啊 理论基础 seq2seq&#xff08;Sequence-to-Sequence&#xff09;模型是一种用于机器翻译、文本摘要等序列转换任务的框架。它由两个主要的递归神经网络&#…

HTML【详解】超链接 a 标签的四大功能(页面跳转、页内滚动【锚点】、页面刷新、文件下载)

超链接 a 标签主要有以下功能&#xff1a; 跳转到其他页面 <a href"https://www.baidu.com/" target"_blank" >百度</a>href&#xff1a;目标页面的 url 地址或同网站的其他页面地址&#xff0c;如 detail.htmltarget&#xff1a;打开目标页面…

全面助力巴西slot游戏包推广本土网盟dsp流量广告优势

全面助力巴西slot游戏包推广本土网盟dsp流量广告优势 在巴西这片充满活力的土地上&#xff0c;电子游戏市场蓬勃发展&#xff0c;成为娱乐产业的重要组成部分。随着网络技术的不断进步和移动互联网的普及&#xff0c;巴西玩家对于电子游戏的热情愈发高涨&#xff0c;游戏市场呈…

Streaming local LLM with FastAPI, Llama.cpp and Langchain

题意&#xff1a; 使用FastAPI、Llama.cpp和Langchain流式传输本地大型语言模型 问题背景&#xff1a; I have setup FastAPI with Llama.cpp and Langchain. Now I want to enable streaming in the FastAPI responses. Streaming works with Llama.cpp in my terminal, but…

google 邮件信息收集

主要介绍通过google和fofax对目标进行邮件信息收集 chrome插件 email-whatsapp-extractor link-klipper-extract-all bulk-url-opener-extension email-whatsapp-extractor 使用正则表达式&#xff0c;获取访问页面内所有的email邮箱和whatsapp号码&#xff0c;以表格的形式导…

C电池 和 D 电池的作用和类型详解及其之间的区别

C 和 D 电池是我们日常生活中必不可少的部件。它们通常用于高功率设备。例如手电筒和玩具。 D 型电池和 C 型电池是两种常见的电池类型。它们是一次性圆柱形电池。您可以在很多设备上使用它们。虽然它们有很多相似之处&#xff0c;但它们也有不同的特点。这些特点使它们适合某…

设置和取消Excel“打开密码”的3种方法

在日常工作中&#xff0c;Excel文件中常常包含敏感数据。为了防止未经授权的访问&#xff0c;给Excel文件设置打开密码是一个非常有效的方法。下面分享3种设置Excel打开密码的方法&#xff0c;以及如何取消这些密码。 先来看看设置Excel打开密码的3种方法。 方法一&#xff1…

CSRF漏洞攻击

05-CSRF 1 CSRF概述 1.1 概述 CSRF (Cross-Site Request Forgery) 跨站请求伪造&#xff0c;也可称为一键式攻击 (one-click-attack)&#xff0c;通常缩写为 CSRF 或者 XSRF。 CSRF 攻击是一种挟持用户在当前已登录的浏览器上发送恶意请求的攻击方法。相对于XSS利用用户对指…

对FPGA开发流程系统的学习

FPGA 开发流程&#xff1a; HDL&#xff08;Hardware Design Language&#xff09;和原理图是两种最常用的数字硬件电路描述方法&#xff0c;HDL 设计法具有更好的可移植性、通用性和模块划分与重用性的特点&#xff0c;在目前的工程设计中被广泛使用。所以&#xff0c;我们在…

JDK新特性之协程

在 JVM 中&#xff0c;java 线程直接映射内核线程&#xff0c;因此 java 线程的创建、销毁和调度都要依赖内核态的操作&#xff08;系统调用&#xff09;。而协程是真正的用户线程&#xff0c;如上图所示很多的协程可以映射很少的几个内核线程&#xff0c;并且协程的创建、销毁…

【kubectl详解】最全的kubectl命令用法

文章目录 简介一.命令帮助翻译1.1.基本命令&#xff08;初学者&#xff09;&#xff1a;1.2.基本命令&#xff08;中级&#xff09;&#xff1a;1.3.部署命令&#xff1a;1.4.群集管理命令&#xff1a;1.5.疑难解答和调试命令&#xff1a;1.6.高级命令&#xff1a;1.7.设置命令…

腾讯混元文生图开源模型推出小显存版本,仅需 6G 显存即可运行

腾讯宣布开源小显存版本的混元文生图模型&#xff0c;降低至 6G 显存即可运行&#xff0c;方便个人电脑本地部署。同时&#xff0c;混元 DiT 模型升级至 1.2 版本&#xff0c;图片质感与构图提升。混元 Captioner 打标模型也正式开源&#xff0c;支持中英文双语&#xff0c;优化…