Hystrix中的批量(折叠)请求
Hystrix具有折疊(或批處理)請求的高級功能。 如果兩個或多個命令同時運行相似的請求,Hystrix可以將它們組合在一起,運行一個批處理請求,并將拆分結果分派回所有命令。 首先讓我們看看Hystrix如何工作而不會崩潰。 假設我們有一個StockPrice給定Ticker StockPrice的服務:
為了方便起見, StockPriceGateway核心實現必須提供loadAll()批處理方法,而實現load()方法則是為了方便。 因此,我們的網關能夠批量加載多個價格(例如,以減少延遲或網絡協議開銷),但是目前我們不使用此功能,始終一次加載一個股票的價格:
class StockPriceCommand extends HystrixCommand<StockPrice> {private final StockPriceGateway gateway;private final Ticker stock;StockPriceCommand(StockPriceGateway gateway, Ticker stock) {super(HystrixCommandGroupKey.Factory.asKey("Stock"));this.gateway = gateway;this.stock = stock;}@Overrideprotected StockPrice run() throws Exception {return gateway.load(stock);} }這樣的命令將始終為每個Ticker調用StockPriceGateway.load() ,如以下測試所示:
class StockPriceCommandTest extends Specification {def gateway = Mock(StockPriceGateway)def 'should fetch price from external service'() {given:gateway.load(TickerExamples.any()) >> StockPriceExamples.any()def command = new StockPriceCommand(gateway, TickerExamples.any())when:def price = command.execute()then:price == StockPriceExamples.any()}def 'should call gateway exactly once when running Hystrix command'() {given:def command = new StockPriceCommand(gateway, TickerExamples.any())when:command.execute()then:1 * gateway.load(TickerExamples.any())}def 'should call gateway twice when command executed two times'() {given:def commandOne = new StockPriceCommand(gateway, TickerExamples.any())def commandTwo = new StockPriceCommand(gateway, TickerExamples.any())when:commandOne.execute()commandTwo.execute()then:2 * gateway.load(TickerExamples.any())}def 'should call gateway twice even when executed in parallel'() {given:def commandOne = new StockPriceCommand(gateway, TickerExamples.any())def commandTwo = new StockPriceCommand(gateway, TickerExamples.any())when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:2 * gateway.load(TickerExamples.any())}}如果您不了解Hystrix,則通過將外部調用包裝在命令中可以獲得許多功能,例如超時,斷路器等。但這不是本文的重點。 看一下最后兩個測試:當兩次,順序或并行( queue() )兩次詢問任意行情的價格時,我們的外部gateway也被兩次調用。 上一次測試特別有趣–我們幾乎同時要求相同的報價,但Hystrix不能弄清楚。 這兩個命令是完全獨立的,將在不同的線程中執行,彼此之間一無所知-即使它們幾乎同時運行。
折疊就是找到類似的請求并將其組合。 批處理(我將這個術語與崩潰互換使用)不會自動發生,并且需要一些編碼。 但是首先讓我們看看它的行為:
def 'should collapse two commands executed concurrently for the same stock ticker'() {given:def anyTicker = TickerExamples.any()def tickers = [anyTicker] as Setand:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, anyTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:0 * gateway.load(_)1 * gateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any()) }def 'should collapse two commands executed concurrently for the different stock tickers'() {given:def anyTicker = TickerExamples.any()def otherTicker = TickerExamples.other()def tickers = [anyTicker, otherTicker] as Setand:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:futureOne.get()futureTwo.get()then:1 * gateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any(),otherTicker, StockPriceExamples.other()) }def 'should correctly map collapsed response into individual requests'() {given:def anyTicker = TickerExamples.any()def otherTicker = TickerExamples.other()def tickers = [anyTicker, otherTicker] as Setgateway.loadAll(tickers) >> ImmutableMap.of(anyTicker, StockPriceExamples.any(),otherTicker, StockPriceExamples.other())and:def commandOne = new StockTickerPriceCollapsedCommand(gateway, anyTicker)def commandTwo = new StockTickerPriceCollapsedCommand(gateway, otherTicker)when:Future<StockPrice> futureOne = commandOne.queue()Future<StockPrice> futureTwo = commandTwo.queue()and:def anyPrice = futureOne.get()def otherPrice = futureTwo.get()then:anyPrice == StockPriceExamples.any()otherPrice == StockPriceExamples.other() }第一個測試證明,不是兩次調用load() ,而是幾乎一次調用loadAll() 。 還要注意,由于我們要求相同的Ticker (來自兩個不同的線程),因此loadAll()僅請求一個代碼。 第二個測試顯示兩個并發請求,兩個不同的行情收錄器被折疊為一個批處理調用。 第三次測試可確保我們仍能對每個單獨的請求得到正確的響應。 相反,延長HystrixCommand我們必須擴展更加復雜HystrixCollapser 。 現在是時候看到StockTickerPriceCollapsedCommand實現,它無縫替換了StockPriceCommand :
class StockTickerPriceCollapsedCommand extends HystrixCollapser<ImmutableMap<Ticker, StockPrice>, StockPrice, Ticker> {private final StockPriceGateway gateway;private final Ticker stock;StockTickerPriceCollapsedCommand(StockPriceGateway gateway, Ticker stock) {super(HystrixCollapser.Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Stock")).andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)));this.gateway = gateway;this.stock = stock;}@Overridepublic Ticker getRequestArgument() {return stock;}@Overrideprotected HystrixCommand<ImmutableMap<Ticker, StockPrice>> createCommand(Collection<CollapsedRequest<StockPrice, Ticker>> collapsedRequests) {final Set<Ticker> stocks = collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(toSet());return new StockPricesBatchCommand(gateway, stocks);}@Overrideprotected void mapResponseToRequests(ImmutableMap<Ticker, StockPrice> batchResponse, Collection<CollapsedRequest<StockPrice, Ticker>> collapsedRequests) {collapsedRequests.forEach(request -> {final Ticker ticker = request.getArgument();final StockPrice price = batchResponse.get(ticker);request.setResponse(price);});}}這里有很多事情要做,所以讓我們逐步回顧StockTickerPriceCollapsedCommand 。 前三種通用類型:
- BatchReturnType (在我們的示例中為ImmutableMap<Ticker, StockPrice> )是批處理命令響應的類型。 如您將在后面看到的那樣,崩潰程序會將多個小命令變成批處理命令。 這是該批處理命令的響應的類型。 請注意,它與StockPriceGateway.loadAll()類型相同。
- ResponseType ( StockPrice )是要折疊的每個命令的類型。 在我們的例子中,我們正在折疊HystrixCommand<StockPrice> 。 稍后,我們將BatchReturnType值BatchReturnType為多個StockPrice 。
- RequestArgumentType ( Ticker )是我們將要折疊(批處理)的每個命令的輸入。 當多個命令一起批處理時,我們最終將所有這些替換為一個批處理命令。 此命令應接收所有單個請求,以便執行一個批處理請求。
withTimerDelayInMilliseconds(100)將在稍后說明。 createCommand()創建一個批處理命令。 此命令應替換所有單個命令并執行批處理邏輯。 在我們的情況下,我們不會進行多次單獨的load()調用:
class StockPricesBatchCommand extends HystrixCommand<ImmutableMap<Ticker, StockPrice>> {private final StockPriceGateway gateway;private final Set<Ticker> stocks;StockPricesBatchCommand(StockPriceGateway gateway, Set<Ticker> stocks) {super(HystrixCommandGroupKey.Factory.asKey("Stock"));this.gateway = gateway;this.stocks = stocks;}@Overrideprotected ImmutableMap<Ticker, StockPrice> run() throws Exception {return gateway.loadAll(stocks);} }此類與StockPriceCommand之間的唯一區別是,它需要一堆Ticker并返回所有價格。 Hystrix將收集幾個StockTickerPriceCollapsedCommand實例,一旦它具有足夠的StockTickerPriceCollapsedCommand (稍后再介紹),它將創建一個StockPriceCommand 。 希望這很清楚,因為mapResponseToRequests()涉及的更多。 折疊后的StockPricesBatchCommand完成后,我們必須以某種方式拆分批處理響應,并將回復傳達回各個命令,而不會崩潰。 從這個角度來看, mapResponseToRequests()實現非常簡單:我們收到批處理響應和包裝CollapsedRequest<StockPrice, Ticker>的集合。 現在,我們必須遍歷所有正在等待的單個請求并完成它們( setResponse() )。 如果我們不完成某些請求,它們將無限期掛起并最終超時。
怎么運行的
這是描述如何實現折疊的正確時機。 我之前說過崩潰是在同時發生兩個請求時發生的。 沒有一樣的時間了 。 實際上,當第一個可折疊請求進入時,Hystrix會啟動一個計時器。 在我們的示例中,我們將其設置為100毫秒。 在此期間,我們的命令被掛起,等待其他命令加入。 在此可配置時間段之后,Hystrix將調用createCommand() ,收集所有請求鍵(通過調用getRequestArgument() )并運行它。 批處理命令完成后,它將使我們將結果分發給所有等待中的單個命令。 如果我們擔心創建龐大的批處理,也可以限制已折疊請求的數量–另一方面,在此短時間內可以容納多少并發請求?
用例和缺點
請求折疊應在承受高負載(請求頻率很高)的系統中使用。 如果每個折疊時間窗口(在示例中為100毫秒)僅收到一個請求,則折疊只會增加開銷。 這是因為每次您調用可折疊命令時,它都必須等待,以防萬一其他命令想要加入并形成批處理。 僅當至少折疊了幾個命令時,這才有意義。 節省網絡等待時間和/或更好地利用合作者中的資源可以平衡浪費的等待時間(與單個呼叫相比,批處理請求通常要快得多)。 但是請記住,折疊是一把雙刃劍,在特定情況下很有用。
最后要記住的一件事–為了使用請求折疊,您需要在try-finally塊中使用HystrixRequestContext.initializeContext()和shutdown() :
HystrixRequestContext context = HystrixRequestContext.initializeContext(); try {//... } finally {context.shutdown(); }崩潰與緩存
您可能會認為可以通過適當的緩存來代替崩潰。 這不是真的。 在以下情況下使用緩存:
另一方面,折疊不會強制數據的局部性(1),它總是命中實際服務,并且永遠不會返回陳舊的數據(2)。 最后,如果我們從多個線程中請求相同的資源,我們將僅調用一次備份服務(3)。 在進行緩存的情況下,除非您的緩存真的很聰明,否則兩個線程將獨立地發現緩存中沒有給定的資源,并兩次請求支持服務。 但是,折疊可以與緩存一起使用-通過在運行可折疊命令之前咨詢緩存。
摘要
請求折疊是一個有用的工具,但是用例非常有限。 它可以顯著提高我們系統的吞吐量,并限制外部服務的負載。 崩潰可以神奇地平抑流量高峰,而不是將其散布到各處。 只要確保將其用于以極高頻率運行的命令即可。
翻譯自: https://www.javacodegeeks.com/2014/11/batching-collapsing-requests-in-hystrix.html
總結
以上是生活随笔為你收集整理的Hystrix中的批量(折叠)请求的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Data JPA教程:简介
- 下一篇: 江南才子唐伯虎是哪里人 唐伯虎是哪里人