关于“豪猪”,你理解的透彻吗?【Hystrix是个什么玩意儿】
1. 什么是Hystrix
?
?
Hystrix是Netflix的一個(gè)開源框架,地址如下:https://github.com/Netflix/Hystrix
?
中文名為“豪豬”,即平時(shí)很溫順,在感受到危險(xiǎn)的時(shí)候,用刺保護(hù)自己;在危險(xiǎn)過去后,還是一個(gè)溫順的肉球。
?
所以,整個(gè)框架的核心業(yè)務(wù)也就是這2點(diǎn):
?
何時(shí)需要保護(hù)
如何保護(hù)
?
2. 何時(shí)需要保護(hù)
?
對(duì)于一個(gè)系統(tǒng)而言,它往往承擔(dān)著2層角色,服務(wù)提供者與服務(wù)消費(fèi)者。對(duì)于服務(wù)消費(fèi)者而言最大的痛苦就是如何“明哲保身”,做過網(wǎng)關(guān)項(xiàng)目的同學(xué)肯定感同身受
上面是一個(gè)常見的系統(tǒng)依賴關(guān)系,底層的依賴往往很多,通信協(xié)議包括 socket、HTTP、Dubbo、WebService等等。當(dāng)通信層發(fā)生網(wǎng)絡(luò)抖動(dòng)以及所依賴的系統(tǒng)發(fā)生業(yè)務(wù)響應(yīng)異常時(shí),我們業(yè)務(wù)本身所提供的服務(wù)能力也直接會(huì)受到影響。
?
這種效果傳遞下去就很有可能造成雪崩效應(yīng),即整個(gè)業(yè)務(wù)聯(lián)調(diào)發(fā)生異常,比如業(yè)務(wù)整體超時(shí),或者訂單數(shù)據(jù)不一致。
?
那么核心問題就來了,如何檢測(cè)業(yè)務(wù)處于異常狀態(tài)?
?
成功率!成功率直接反映了業(yè)務(wù)的數(shù)據(jù)流轉(zhuǎn)狀態(tài),是最直接的業(yè)務(wù)表現(xiàn)。
?
當(dāng)然,也可以根據(jù)超時(shí)時(shí)間做判斷,比如 Sentinel 的實(shí)現(xiàn)。其實(shí)這里概念上可以做一個(gè)轉(zhuǎn)化,用時(shí)間做超時(shí)控制,超時(shí)=失敗,這依然是一個(gè)成功率的概念。
?
3. 如何保護(hù)
?
如同豪豬一樣,“刺”就是他的保護(hù)工具,所有的攻擊都會(huì)被刺無情的懟回去。
?
在 Hystrix 的實(shí)現(xiàn)中,這就出現(xiàn)了“熔斷器”的概念,即當(dāng)前的系統(tǒng)是否處于需要保護(hù)的狀態(tài)。
?
當(dāng)熔斷器處于開啟的狀態(tài)時(shí),所有的請(qǐng)求都不會(huì)真正的走之前的業(yè)務(wù)邏輯,而是直接返回一個(gè)約定的信息,即 FallBack。通過這種快速失敗原則保護(hù)我們的系統(tǒng)。?
?
但是,系統(tǒng)不應(yīng)該永遠(yuǎn)處于“有刺”的狀態(tài),當(dāng)危險(xiǎn)過后需要恢復(fù)正常。
?
于是對(duì)熔斷器的核心操作就是如下幾個(gè)功能:
?
如果成功率過低,就打開熔斷器,阻止正常業(yè)務(wù)
隨著時(shí)間的流動(dòng),熔斷器處于半打開狀態(tài),嘗試性放入一筆請(qǐng)求
熔斷器的核心 API 如下圖:?
?
?
4. 限流、熔斷、隔離、降級(jí)
?
這四個(gè)概念是我們談起微服務(wù)會(huì)經(jīng)常談到的概念,這里我們討論的是 Hystrix 的實(shí)現(xiàn)方式。
?
限流
?
-
這里的限流與 Guava 的 RateLimiter 的限流差異比較大,一個(gè)是為了“保護(hù)自我”,一個(gè)是“保護(hù)下游”
-
當(dāng)對(duì)服務(wù)進(jìn)行限流時(shí),超過的流量將直接 Fallback,即熔斷。而 RateLimiter 關(guān)心的其實(shí)是“流量整形”,將不規(guī)整流量在一定速度內(nèi)規(guī)整
?
熔斷
?
-
當(dāng)我的應(yīng)用無法提供服務(wù)時(shí),我要對(duì)上游請(qǐng)求熔斷,避免上游把我壓垮
-
當(dāng)我的下游依賴成功率過低時(shí),我要對(duì)下游請(qǐng)求熔斷,避免下游把我拖垮
?
降級(jí)
?
-
降級(jí)與熔斷緊密相關(guān),熔斷后業(yè)務(wù)如何表現(xiàn),約定一個(gè)快速失敗的 Fallback,即為服務(wù)降級(jí)
?
隔離
?
-
業(yè)務(wù)之間不可互相影響,不同業(yè)務(wù)需要有獨(dú)立的運(yùn)行空間
-
最徹底的,可以采用物理隔離,不同的機(jī)器部
-
次之,采用進(jìn)程隔離,一個(gè)機(jī)器多個(gè) Tomcat
-
次之,請(qǐng)求隔離
-
由于 Hystrix 框架所屬的層級(jí)為代碼層,所以實(shí)現(xiàn)的是請(qǐng)求隔離,線程池或信號(hào)量
?
5. 源碼分析
?
?
先上一個(gè) Hystrix 的業(yè)務(wù)流程圖
?
?
可以看到 Hystrix 的請(qǐng)求都要經(jīng)過 HystrixCommand 的包裝,其核心邏輯在 AbstractComman.java 類中。
?
下面的源碼是基于 RxJava 的,看之前最好先了解下 RxJava 的常見用法與邏輯,否則看起來會(huì)很迷惑。
?
簡(jiǎn)單的說,RxJava 就是基于回調(diào)的函數(shù)式編程。通俗的說,就等同于策略模式的匿名內(nèi)部類實(shí)現(xiàn)。
?
5.1 熔斷器
?
首先看信號(hào)量是如何影響我們請(qǐng)求的:
?
private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {// 自定義擴(kuò)展executionHook.onStart(_cmd);//判斷熔斷器是否允許請(qǐng)求過來if (circuitBreaker.attemptExecution()) {//獲得分組信號(hào)量,如果沒有采用信號(hào)量分組,返回默認(rèn)通過的信號(hào)量實(shí)現(xiàn)final TryableSemaphore executionSemaphore = getExecutionSemaphore();final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);//調(diào)用終止的回調(diào)函數(shù)final Action0 singleSemaphoreRelease = new Action0() {@Overridepublic void call() {if (semaphoreHasBeenReleased.compareAndSet(false, true)) {executionSemaphore.release();}}};//調(diào)用異常的回調(diào)函數(shù)final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {@Overridepublic void call(Throwable t) {eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);}};//根據(jù)信號(hào)量嘗試競(jìng)爭(zhēng)信號(hào)量if (executionSemaphore.tryAcquire()) {try {//競(jìng)爭(zhēng)成功,注冊(cè)執(zhí)行參數(shù)executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());return executeCommandAndObserve(_cmd).doOnError(markExceptionThrown).doOnTerminate(singleSemaphoreRelease).doOnUnsubscribe(singleSemaphoreRelease);} catch (RuntimeException e) {return Observable.error(e);}} else {//競(jìng)爭(zhēng)失敗,進(jìn)入fallbackreturn handleSemaphoreRejectionViaFallback();}} else {//熔斷器已打開,進(jìn)入fallbackreturn handleShortCircuitViaFallback();}}?
什么時(shí)候熔斷器可以放請(qǐng)求進(jìn)來:
?
@Overridepublic boolean attemptExecution() {//動(dòng)態(tài)屬性判斷,熔斷器是否強(qiáng)制開著,如果強(qiáng)制開著,就不允許請(qǐng)求if (properties.circuitBreakerForceOpen().get()) {return false;}//如果強(qiáng)制關(guān)閉,就允許請(qǐng)求if (properties.circuitBreakerForceClosed().get()) {return true;}//如果當(dāng)前是關(guān)閉,就允許請(qǐng)求if (circuitOpened.get() == -1) {return true;} else {//如果當(dāng)前開著,就看是否已經(jīng)過了"滑動(dòng)窗口",過了就可以請(qǐng)求,不過就不可以if (isAfterSleepWindow()) {//only the first request after sleep window should execute//if the executing command succeeds, the status will transition to CLOSED//if the executing command fails, the status will transition to OPEN//if the executing command gets unsubscribed, the status will transition to OPEN//這里使用CAS的方式,只有一個(gè)請(qǐng)求能過來,即"半關(guān)閉"狀態(tài)if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {return true;} else {return false;}} else {return false;}}}}?
這里有個(gè)重要概念就是"滑動(dòng)窗口":
?
?
private boolean isAfterSleepWindow() {final long circuitOpenTime = circuitOpened.get();final long currentTime = System.currentTimeMillis();final long sleepWindowTime = properties.circuitBreakerSleepWindowInMilliseconds().get();//滑動(dòng)窗口的判斷就是看看熔斷器打開的時(shí)間與現(xiàn)在相比是否超過了配置的滑動(dòng)窗口return currentTime > circuitOpenTime + sleepWindowTime;}?
5.2 隔離
?
如果將業(yè)務(wù)請(qǐng)求進(jìn)行隔離?
?
private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {//判斷隔離策略是什么,是線程池隔離還是信號(hào)量隔離 if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {// mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)//線程池隔離的運(yùn)行邏輯如下return Observable.defer(new Func0<Observable<R>>() {@Overridepublic Observable<R> call() {executionResult = executionResult.setExecutionOccurred();if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));}//按照配置生成監(jiān)控?cái)?shù)據(jù)metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {// the command timed out in the wrapping thread so we will return immediately// and not increment any of the counters below or other such logicreturn Observable.error(new RuntimeException("timed out before executing run()"));}if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {//we have not been unsubscribed, so should proceedHystrixCounters.incrementGlobalConcurrentThreads();threadPool.markThreadExecution();// store the command that is being runendCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());executionResult = executionResult.setExecutedInThread();/*** If any of these hooks throw an exception, then it appears as if the actual execution threw an error*/try {//執(zhí)行擴(kuò)展點(diǎn)邏輯executionHook.onThreadStart(_cmd);executionHook.onRunStart(_cmd);executionHook.onExecutionStart(_cmd);return getUserExecutionObservable(_cmd);} catch (Throwable ex) {return Observable.error(ex);}} else {//command has already been unsubscribed, so return immediatelyreturn Observable.empty();}}//注冊(cè)各種場(chǎng)景的回調(diào)函數(shù)}).doOnTerminate(new Action0() {@Overridepublic void call() {if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {handleThreadEnd(_cmd);}if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {//if it was never started and received terminal, then no need to clean up (I don't think this is possible)}//if it was unsubscribed, then other cleanup handled it}}).doOnUnsubscribe(new Action0() {@Overridepublic void call() {if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {handleThreadEnd(_cmd);}if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {//if it was never started and was cancelled, then no need to clean up}//if it was terminal, then other cleanup handled it}//將邏輯放在線程池的調(diào)度器上執(zhí)行,即將上述邏輯放入線程池中}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {@Overridepublic Boolean call() {return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;}}));} else {//走到這里就是信號(hào)量隔離,在當(dāng)前線程中執(zhí)行,沒有調(diào)度器return Observable.defer(new Func0<Observable<R>>() {@Overridepublic Observable<R> call() {executionResult = executionResult.setExecutionOccurred();if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));}metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);// semaphore isolated// store the command that is being runendCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());try {executionHook.onRunStart(_cmd);executionHook.onExecutionStart(_cmd);return getUserExecutionObservable(_cmd); ?//the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw} catch (Throwable ex) {//If the above hooks throw, then use that as the result of the run methodreturn Observable.error(ex);}}});}}?
5.3 核心運(yùn)行流程
?
private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();//執(zhí)行發(fā)生的回調(diào)final Action1<R> markEmits = new Action1<R>() {@Overridepublic void call(R r) {if (shouldOutputOnNextEvents()) {executionResult = executionResult.addEvent(HystrixEventType.EMIT);eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);}if (commandIsScalar()) {long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());circuitBreaker.markSuccess();}}};//執(zhí)行成功的回調(diào),標(biāo)記下狀態(tài),熔斷器根據(jù)這個(gè)狀態(tài)維護(hù)熔斷邏輯final Action0 markOnCompleted = new Action0() {@Overridepublic void call() {if (!commandIsScalar()) {long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());circuitBreaker.markSuccess();}}};//執(zhí)行失敗的回調(diào)final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {@Overridepublic Observable<R> call(Throwable t) {circuitBreaker.markNonSuccess();Exception e = getExceptionFromThrowable(t);executionResult = executionResult.setExecutionException(e);//各種回調(diào)進(jìn)行各種fallbackif (e instanceof RejectedExecutionException) {return handleThreadPoolRejectionViaFallback(e);} else if (t instanceof HystrixTimeoutException) {return handleTimeoutViaFallback();} else if (t instanceof HystrixBadRequestException) {return handleBadRequestByEmittingError(e);} else {/** Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.*/if (e instanceof HystrixBadRequestException) {eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);return Observable.error(e);}return handleFailureViaFallback(e);}}};final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {@Overridepublic void call(Notification<? super R> rNotification) {setRequestContextIfNeeded(currentRequestContext);}};Observable<R> execution;if (properties.executionTimeoutEnabled().get()) {execution = executeCommandWithSpecifiedIsolation(_cmd).lift(new HystrixObservableTimeoutOperator<R>(_cmd));} else {execution = executeCommandWithSpecifiedIsolation(_cmd);}//注冊(cè)各種回調(diào)函數(shù)return execution.doOnNext(markEmits).doOnCompleted(markOnCompleted).onErrorResumeNext(handleFallback).doOnEach(setRequestContext);}?
6. 小結(jié)
?
-
Hystrix 是基于單機(jī)應(yīng)用的熔斷限流框架
-
根據(jù)熔斷器的滑動(dòng)窗口判斷當(dāng)前請(qǐng)求是否可以執(zhí)行
-
線程競(jìng)爭(zhēng)實(shí)現(xiàn)“半關(guān)閉”狀態(tài),拿一個(gè)請(qǐng)求試試是否可以關(guān)閉熔斷器
-
線程池隔離將請(qǐng)求丟到線程池中運(yùn)行,限流依靠線程池拒絕策略
-
信號(hào)量隔離在當(dāng)前線程中運(yùn)行,限流依靠并發(fā)請(qǐng)求數(shù)
-
當(dāng)信號(hào)量競(jìng)爭(zhēng)失敗/線程池隊(duì)列滿,就進(jìn)入限流模式,執(zhí)行 Fallback
-
當(dāng)熔斷器開啟,就熔斷請(qǐng)求,執(zhí)行 Fallback
-
整個(gè)框架采用的 RxJava 的編程模式,回調(diào)函數(shù)滿天飛
總結(jié)
以上是生活随笔為你收集整理的关于“豪猪”,你理解的透彻吗?【Hystrix是个什么玩意儿】的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: IDEA显示Run Dashboard窗
- 下一篇: 魔法值是什么?(为什么在阿里巴巴开发手册