hystrix 源码 线程池隔离_基于hystrix的线程池隔离
hystrix進(jìn)行資源隔離,其實是提供了一個抽象,叫做command,就是說,你如果要把對某一個依賴服務(wù)的所有調(diào)用請求,全部隔離在同一份資源池內(nèi)
對這個依賴服務(wù)的所有調(diào)用請求,全部走這個資源池內(nèi)的資源,不會去用其他的資源了,這個就叫做資源隔離
hystrix最最基本的資源隔離的技術(shù),線程池隔離技術(shù)
對某一個依賴服務(wù),商品服務(wù),所有的調(diào)用請求,全部隔離到一個線程池內(nèi),對商品服務(wù)的每次調(diào)用請求都封裝在一個command里面
每個command(每次服務(wù)調(diào)用請求)都是使用線程池內(nèi)的一個線程去執(zhí)行的
所以哪怕是對這個依賴服務(wù),商品服務(wù),現(xiàn)在同時發(fā)起的調(diào)用量已經(jīng)到了1000了,但是線程池內(nèi)就10個線程,最多就只會用這10個線程去執(zhí)行
不會說,對商品服務(wù)的請求,因為接口調(diào)用延遲,將tomcat內(nèi)部所有的線程資源全部耗盡,不會出現(xiàn)了
1.pox
com.netflix.hystrix
hystrix-core
1.5.12
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
import com.alibaba.fastjson.JSONObject;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixObservableCommand;
import com.roncoo.eshop.cache.ha.http.HttpClientUtils;
import com.roncoo.eshop.cache.ha.model.ProductInfo;
public class GetProductInfosCommand extends HystrixObservableCommand {
private String[] productIds;
public GetProductInfosCommand(String[] productIds) {
super(HystrixCommandGroupKey.Factory.asKey("GetProductInfoGroup"));
this.productIds = productIds;
}
@Override
protected Observable construct() {
return Observable.create(new Observable.OnSubscribe() {
public void call(Subscriber super ProductInfo> observer) {
try {
for(String productId : productIds) {
String url = "http://127.0.0.1:8082/getProductInfo?productId=" + productId;
String response = HttpClientUtils.sendGetRequest(url);
ProductInfo productInfo = JSONObject.parseObject(response, ProductInfo.class);
observer.onNext(productInfo);
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
}
import com.alibaba.fastjson.JSONObject;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.roncoo.eshop.cache.ha.http.HttpClientUtils;
import com.roncoo.eshop.cache.ha.model.ProductInfo;
public class GetProductInfoCommand extends HystrixCommand {
private Long productId;
public GetProductInfoCommand(Long productId) {
super(HystrixCommandGroupKey.Factory.asKey("GetProductInfoGroup"));
this.productId = productId;
}
@Override
protected ProductInfo run() throws Exception {
String url = "http://127.0.0.1:8082/getProductInfo?productId=" + productId;
String response = HttpClientUtils.sendGetRequest(url);
return JSONObject.parseObject(response, ProductInfo.class);
}
}
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import rx.Observable;
import rx.Observer;
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixObservableCommand;
import com.roncoo.eshop.cache.ha.http.HttpClientUtils;
import com.roncoo.eshop.cache.ha.hystrix.command.GetProductInfoCommand;
import com.roncoo.eshop.cache.ha.hystrix.command.GetProductInfosCommand;
import com.roncoo.eshop.cache.ha.model.ProductInfo;
@Controller
public class CacheController {
@RequestMapping("/change/product")
@ResponseBody
public String changeProduct(Long productId) {
// 拿到一個商品id
// 調(diào)用商品服務(wù)的接口,獲取商品id對應(yīng)的商品的最新數(shù)據(jù)
// 用HttpClient去調(diào)用商品服務(wù)的http接口
String url = "http://127.0.0.1:8082/getProductInfo?productId=" + productId;
String response = HttpClientUtils.sendGetRequest(url);
//Future future = getProductInfoCommand.queue();
//try {
//Thread.sleep(1000);
//System.out.println(future.get());
//} catch (Exception e) {
//e.printStackTrace();
//}
System.out.println(response);
return "success";
}
/**
* nginx開始,各級緩存都失效了,nginx發(fā)送很多的請求直接到緩存服務(wù)要求拉取最原始的數(shù)據(jù)
*
* @param productId
* @return
*/
@RequestMapping("/getProductInfo")
@ResponseBody
public String getProductInfo(Long productId) {
// 拿到一個商品id
// 調(diào)用商品服務(wù)的接口,獲取商品id對應(yīng)的商品的最新數(shù)據(jù)
// 用HttpClient去調(diào)用商品服務(wù)的http接口
HystrixCommand getProductInfoCommand = new GetProductInfoCommand(productId);
ProductInfo productInfo = getProductInfoCommand.execute();
System.out.println(productInfo);
return "success";
}
/**
* 一次性批量查詢多條商品數(shù)據(jù)的請求
*/
@RequestMapping("/getProductInfos")
@ResponseBody
public String getProductInfos(String productIds) {
HystrixObservableCommand getProductInfosCommand =
new GetProductInfosCommand(productIds.split(","));
Observable observable = getProductInfosCommand.observe();
//observable = getProductInfosCommand.toObservable(); // 還沒有執(zhí)行
observable.subscribe(new Observer() { // 等到調(diào)用subscribe然后才會執(zhí)行
public void onCompleted() {
System.out.println("獲取完了所有的商品數(shù)據(jù)");
}
public void onError(Throwable e) {
e.printStackTrace();
}
public void onNext(ProductInfo productInfo) {
System.out.println(productInfo);
}
});
return "success";
}
}
command的四種調(diào)用方式
同步:new CommandHelloWorld("World").execute(),new ObservableCommandHelloWorld("World").toBlocking().toFuture().get()
如果你認(rèn)為observable command只會返回一條數(shù)據(jù),那么可以調(diào)用上面的模式,去同步執(zhí)行,返回一條數(shù)據(jù)
異步:new CommandHelloWorld("World").queue(),new ObservableCommandHelloWorld("World").toBlocking().toFuture()
對command調(diào)用queue(),僅僅將command放入線程池的一個等待隊列,就立即返回,拿到一個Future對象,后面可以做一些其他的事情,然后過一段時間對future調(diào)用get()方法獲取數(shù)據(jù)。
1.線程池隔離技術(shù)與信號量隔離技術(shù)的區(qū)別
hystrix里面,核心的一項功能,其實就是所謂的資源隔離,要解決的最最核心的問題,就是將多個依賴服務(wù)的調(diào)用分別隔離到各自自己的資源池內(nèi)
避免說對某一個依賴服務(wù)的調(diào)用,因為依賴服務(wù)的接口調(diào)用的延遲或者失敗,導(dǎo)致服務(wù)所有的線程資源全部耗費在這個服務(wù)的接口調(diào)用上
一旦說某個服務(wù)的線程資源全部耗盡的話,可能就導(dǎo)致服務(wù)就會崩潰,甚至說這種故障會不斷蔓延
hystrix,資源隔離,兩種技術(shù),線程池的資源隔離,信號量的資源隔離
信號量,semaphore
信號量:適合,你的訪問不是對外部依賴的訪問,而是對內(nèi)部的一些比較復(fù)雜的業(yè)務(wù)邏輯的訪問,但是像這種訪問,系統(tǒng)內(nèi)部的代碼,其實不涉及任何的網(wǎng)絡(luò)請求,
那么只要做信號量的普通限流就可以了,因為不需要去捕獲timeout類似的問題,算法+數(shù)據(jù)結(jié)構(gòu)的效率不是太高,
并發(fā)量突然太高,因為這里稍微耗時一些,導(dǎo)致很多線程卡在這里的話,不太好,
所以進(jìn)行一個基本的資源隔離和訪問,避免內(nèi)部復(fù)雜的低效率的代碼,導(dǎo)致大量的線程被卡住。
一般我們在獲取到商品數(shù)據(jù)之后,都要去獲取商品是屬于哪個地理位置,省,市,賣家的,可能在自己的純內(nèi)存中,比如就一個Map去獲取
對于這種直接訪問本地內(nèi)存的邏輯,比較適合用信號量做一下簡單的隔離
優(yōu)點在于,不用自己管理線程池,不用care timeout超時了,信號量做隔離的話,性能會相對來說高一些。
import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandProperties;
import com.netflix.hystrix.HystrixCommandProperties.ExecutionIsolationStrategy;
import com.roncoo.eshop.cache.ha.local.LocationCache;
public class GetCityNameCommand extends HystrixCommand {
private Long cityId;
public GetCityNameCommand(Long cityId){
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GetCityNameGroup"))
.andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)));
this.cityId = cityId;
}
@Override
protected String run() throws Exception {
return LocationCache.getCityName(cityId);
}
}
execution.isolation.strategy:
指定了HystrixCommand.run()的資源隔離策略,THREAD或者SEMAPHORE,一種是基于線程池,一種是信號量
線程池機(jī)制,每個command運行在一個線程中,限流是通過線程池的大小來控制的
信號量機(jī)制,command是運行在調(diào)用線程中,但是通過信號量的容量來進(jìn)行限流
如何在線程池和信號量之間做選擇?
默認(rèn)的策略就是線程池
而使用信號量的場景,通常是針對超大并發(fā)量的場景下,每個服務(wù)實例每秒都幾百的QPS,那么此時你用線程池的話,線程一般不會太多,
可能撐不住那么高的并發(fā),如果要撐住,可能要耗費大量的線程資源,那么就是用信號量,來進(jìn)行限流保護(hù)
一般用信號量常見于那種基于純內(nèi)存的一些業(yè)務(wù)邏輯服務(wù),而不涉及到任何網(wǎng)絡(luò)訪問請求
netflix有100+的command運行在40+的線程池中,只有少數(shù)command是不運行在線程池中的,就是從純內(nèi)存中獲取一些元數(shù)據(jù),或者是對多個command包裝起來的facacde command,是用信號量限流的.
// to use thread isolation
HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.THREAD)
// to use semaphore isolation
HystrixCommandProperties.Setter()
.withExecutionIsolationStrategy(ExecutionIsolationStrategy.SEMAPHORE)
2、command名稱和command組
每個command,都可以設(shè)置一個自己的名稱,同時可以設(shè)置一個自己的組.
command group,是一個非常重要的概念,默認(rèn)情況下,因為就是通過command group來定義一個線程池的,而且還會通過command group來聚合一些監(jiān)控和報警信息
同一個command group中的請求,都會進(jìn)入同一個線程池中
3、command線程池
threadpool key代表了一個HystrixThreadPool,用來進(jìn)行統(tǒng)一監(jiān)控,統(tǒng)計,緩存
默認(rèn)的threadpool key就是command group名稱
每個command都會跟它的threadpool key對應(yīng)的thread pool綁定在一起
如果不想直接用command group,也可以手動設(shè)置thread pool name
public CommandHelloWorld(String name) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
.andCommandKey(HystrixCommandKey.Factory.asKey("HelloWorld"))
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("HelloWorldPool")));
this.name = name;
}
command threadpool -> command group -> command key
command key,代表了一類command,一般來說,代表了底層的依賴服務(wù)的一個接口
command group,代表了某一個底層的依賴服務(wù),合理,一個依賴服務(wù)可能會暴露出來多個接口,每個接口就是一個command key
command group,在邏輯上去組織起來一堆command key的調(diào)用,統(tǒng)計信息,成功次數(shù),timeout超時次數(shù),失敗次數(shù),可以看到某一個服務(wù)整體的一些訪問情況
command group,一般來說,推薦是根據(jù)一個服務(wù)去劃分出一個線程池,command key默認(rèn)都是屬于同一個線程池的
command group,對應(yīng)了一個服務(wù),但是這個服務(wù)暴露出來的幾個接口,訪問量很不一樣,差異非常之大
你可能就希望在這個服務(wù)command group內(nèi)部,包含的對應(yīng)多個接口的command key,做一些細(xì)粒度的資源隔離
對同一個服務(wù)的不同接口,都使用不同的線程池.
邏輯上來說,多個command key屬于一個command group,在做統(tǒng)計的時候,會放在一起統(tǒng)計
每個command key有自己的線程池,每個接口有自己的線程池,去做資源隔離和限流
但是對于thread pool資源隔離來說,可能是希望能夠拆分的更加一致一些,比如在一個功能模塊內(nèi),對不同的請求可以使用不同的thread pool
command group一般來說,可以是對應(yīng)一個服務(wù),多個command key對應(yīng)這個服務(wù)的多個接口,多個接口的調(diào)用共享同一個線程池
如果說你的command key,要用自己的線程池,可以定義自己的threadpool key.
4、coreSize
設(shè)置線程池的大小,默認(rèn)是10
HystrixThreadPoolProperties.Setter()
.withCoreSize(int value)
一般來說,用這個默認(rèn)的10個線程大小就夠了
5、queueSizeRejectionThreshold
控制queue滿后reject的threshold,因為maxQueueSize不允許熱修改,因此提供這個參數(shù)可以熱修改,控制隊列的最大大小
HystrixCommand在提交到線程池之前,其實會先進(jìn)入一個隊列中,這個隊列滿了之后,才會reject
默認(rèn)值是5
HystrixThreadPoolProperties.Setter()
.withQueueSizeRejectionThreshold(int value)
6、execution.isolation.semaphore.maxConcurrentRequests
設(shè)置使用SEMAPHORE隔離策略的時候,允許訪問的最大并發(fā)量,超過這個最大并發(fā)量,請求直接被reject
這個并發(fā)量的設(shè)置,跟線程池大小的設(shè)置,應(yīng)該是類似的,但是基于信號量的話,性能會好很多,而且hystrix框架本身的開銷會小很多
默認(rèn)值是10,設(shè)置的小一些,否則因為信號量是基于調(diào)用線程去執(zhí)行command的,而且不能從timeout中抽離,因此一旦設(shè)置的太大,而且有延時發(fā)生,可能瞬間導(dǎo)致tomcat本身的線程資源本占滿
HystrixCommandProperties.Setter()
.withExecutionIsolationSemaphoreMaxConcurrentRequests(int value)
創(chuàng)建command,執(zhí)行這個command,配置這個command對應(yīng)的group和線程池,以及線程池/信號量的容量和大小
畫圖分析整個8大步驟的流程,然后再對每個步驟進(jìn)行細(xì)致的講解
1、構(gòu)建一個HystrixCommand或者HystrixObservableCommand
一個HystrixCommand或一個HystrixObservableCommand對象,代表了對某個依賴服務(wù)發(fā)起的一次請求或者調(diào)用
構(gòu)造的時候,可以在構(gòu)造函數(shù)中傳入任何需要的參數(shù)
HystrixCommand主要用于僅僅會返回一個結(jié)果的調(diào)用
HystrixObservableCommand主要用于可能會返回多條結(jié)果的調(diào)用
HystrixCommand command = new HystrixCommand(arg1, arg2);
HystrixObservableCommand command = new HystrixObservableCommand(arg1, arg2);
2、調(diào)用command的執(zhí)行方法
執(zhí)行Command就可以發(fā)起一次對依賴服務(wù)的調(diào)用
要執(zhí)行Command,需要在4個方法中選擇其中的一個:execute(),queue(),observe(),toObservable()
其中execute()和queue()僅僅對HystrixCommand適用
execute():調(diào)用后直接block住,屬于同步調(diào)用,直到依賴服務(wù)返回單條結(jié)果,或者拋出異常
queue():返回一個Future,屬于異步調(diào)用,后面可以通過Future獲取單條結(jié)果
observe():訂閱一個Observable對象,Observable代表的是依賴服務(wù)返回的結(jié)果,獲取到一個那個代表結(jié)果的Observable對象的拷貝對象
toObservable():返回一個Observable對象,如果我們訂閱這個對象,就會執(zhí)行command并且獲取返回結(jié)果
K value = command.execute();
Future fValue = command.queue();
Observable ohValue = command.observe();
Observable ocValue = command.toObservable();
execute()實際上會調(diào)用queue().get().queue(),接著會調(diào)用toObservable().toBlocking().toFuture()
也就是說,無論是哪種執(zhí)行command的方式,最終都是依賴toObservable()去執(zhí)行的
3、檢查是否開啟緩存
如果這個command開啟了請求緩存,request cache,而且這個調(diào)用的結(jié)果在緩存中存在,那么直接從緩存中返回結(jié)果
4、檢查是否開啟了短路器
檢查這個command對應(yīng)的依賴服務(wù)是否開啟了短路器
如果斷路器被打開了,那么hystrix就不會執(zhí)行這個command,而是直接去執(zhí)行fallback降級機(jī)制
5、檢查線程池/隊列/semaphore是否已經(jīng)滿了
如果command對應(yīng)的線程池/隊列/semaphore已經(jīng)滿了,那么也不會執(zhí)行command,而是直接去調(diào)用fallback降級機(jī)制
6、執(zhí)行command
調(diào)用HystrixObservableCommand.construct()或HystrixCommand.run()來實際執(zhí)行這個command
HystrixCommand.run()是返回一個單條結(jié)果,或者拋出一個異常
HystrixObservableCommand.construct()是返回一個Observable對象,可以獲取多條結(jié)果
如果HystrixCommand.run()或HystrixObservableCommand.construct()的執(zhí)行,超過了timeout時長的話,那么command所在的線程就會拋出一個TimeoutException
如果timeout了,也會去執(zhí)行fallback降級機(jī)制,而且就不會管run()或construct()返回的值了
這里要注意的一點是,我們是不可能終止掉一個調(diào)用嚴(yán)重延遲的依賴服務(wù)的線程的,只能說給你拋出來一個TimeoutException,但是還是可能會因為嚴(yán)重延遲的調(diào)用線程占滿整個線程池的
即使這個時候新來的流量都被限流了。。。
如果沒有timeout的話,那么就會拿到一些調(diào)用依賴服務(wù)獲取到的結(jié)果,然后hystrix會做一些logging記錄和metric統(tǒng)計
7、短路健康檢查
Hystrix會將每一個依賴服務(wù)的調(diào)用成功,失敗,拒絕,超時,等事件,都會發(fā)送給circuit breaker斷路器
短路器就會對調(diào)用成功/失敗/拒絕/超時等事件的次數(shù)進(jìn)行統(tǒng)計
短路器會根據(jù)這些統(tǒng)計次數(shù)來決定,是否要進(jìn)行短路,如果打開了短路器,那么在一段時間內(nèi)就會直接短路,然后如果在之后第一次檢查發(fā)現(xiàn)調(diào)用成功了,就關(guān)閉斷路器
8、調(diào)用fallback降級機(jī)制
在以下幾種情況中,hystrix會調(diào)用fallback降級機(jī)制:run()或construct()拋出一個異常,短路器打開,線程池/隊列/semaphore滿了,command執(zhí)行超時了
一般在降級機(jī)制中,都建議給出一些默認(rèn)的返回值,比如靜態(tài)的一些代碼邏輯,或者從內(nèi)存中的緩存中提取一些數(shù)據(jù),盡量在這里不要再進(jìn)行網(wǎng)絡(luò)請求了
即使在降級中,一定要進(jìn)行網(wǎng)絡(luò)調(diào)用,也應(yīng)該將那個調(diào)用放在一個HystrixCommand中,進(jìn)行隔離
在HystrixCommand中,上線getFallback()方法,可以提供降級機(jī)制
在HystirxObservableCommand中,實現(xiàn)一個resumeWithFallback()方法,返回一個Observable對象,可以提供降級結(jié)果
如果fallback返回了結(jié)果,那么hystrix就會返回這個結(jié)果
對于HystrixCommand,會返回一個Observable對象,其中會發(fā)返回對應(yīng)的結(jié)果
對于HystrixObservableCommand,會返回一個原始的Observable對象
如果沒有實現(xiàn)fallback,或者是fallback拋出了異常,Hystrix會返回一個Observable,但是不會返回任何數(shù)據(jù)
不同的command執(zhí)行方式,其fallback為空或者異常時的返回結(jié)果不同
對于execute(),直接拋出異常
對于queue(),返回一個Future,調(diào)用get()時拋出異常
對于observe(),返回一個Observable對象,但是調(diào)用subscribe()方法訂閱它時,理解拋出調(diào)用者的onError方法
對于toObservable(),返回一個Observable對象,但是調(diào)用subscribe()方法訂閱它時,理解拋出調(diào)用者的onError方法
9、不同的執(zhí)行方式
execute(),獲取一個Future.get(),然后拿到單個結(jié)果
queue(),返回一個Future
observer(),立即訂閱Observable,然后啟動8大執(zhí)行步驟,返回一個拷貝的Observable,訂閱時理解回調(diào)給你結(jié)果
toObservable(),返回一個原始的Observable,必須手動訂閱才會去執(zhí)行8大步驟
總結(jié)
以上是生活随笔為你收集整理的hystrix 源码 线程池隔离_基于hystrix的线程池隔离的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: cimage和gdi绘图效率比较_GDI
- 下一篇: php带截切图片上传_PHP大文件切割上