kata_小规模流处理kata。 第1部分:线程池
kata
我再次為我的公司在GeeCON 2016上舉辦了編程競賽。 這次分配需要設(shè)計(jì)并根據(jù)以下要求選擇實(shí)施系統(tǒng):
一個(gè)系統(tǒng)每秒發(fā)送大約一千個(gè)事件。 每個(gè)Event至少具有兩個(gè)屬性:
- clientId –我們期望一個(gè)客戶端每秒最多可以處理幾個(gè)事件
- UUID –全球唯一
消耗一個(gè)事件大約需要10毫秒。 設(shè)計(jì)此類流的使用者:
這些要求中沒有幾個(gè)重要的細(xì)節(jié):
在本文中,我將指導(dǎo)您完成一些正確的解決方案,并嘗試一些失敗的嘗試。 您還將學(xué)習(xí)如何使用少量精確定位的指標(biāo)來解決問題。
天真的順序處理
讓我們通過迭代解決這個(gè)問題。 首先,我們必須對API進(jìn)行一些假設(shè)。 想象一下:
interface EventStream {void consume(EventConsumer consumer);}@FunctionalInterface interface EventConsumer {Event consume(Event event); }@Value class Event {private final Instant created = Instant.now();private final int clientId;private final UUID uuid;}典型的基于推送的API,類似于JMS。 一個(gè)重要的注意事項(xiàng)是EventConsumer正在阻止,這意味著直到EventConsumer消耗了前一個(gè)Event ,它才交付新的Event 。 這只是我所做的一個(gè)假設(shè),并沒有徹底改變需求。 這也是JMS中消息偵聽器的工作方式。 天真的實(shí)現(xiàn)只附加了一個(gè)偵聽器,該偵聽器需要大約10毫秒才能完成:
class ClientProjection implements EventConsumer {@Overridepublic Event consume(Event event) {Sleeper.randSleep(10, 1);return event;}}當(dāng)然,在現(xiàn)實(shí)生活中,這個(gè)使用者會(huì)在數(shù)據(jù)庫中存儲(chǔ)一些東西,進(jìn)行遠(yuǎn)程調(diào)用等。我在睡眠時(shí)間分配中添加了一些隨機(jī)性,以使手動(dòng)測試更加實(shí)際:
class Sleeper {private static final Random RANDOM = new Random();static void randSleep(double mean, double stdDev) {final double micros = 1_000 * (mean + RANDOM.nextGaussian() * stdDev);try {TimeUnit.MICROSECONDS.sleep((long) micros);} catch (InterruptedException e) {throw new RuntimeException(e);}}}//...EventStream es = new EventStream(); //some real implementation here es.consume(new ClientProjection());它可以編譯并運(yùn)行,但是為了確定未滿足要求,我們必須插入少量指標(biāo)。 最重要的指標(biāo)是消息消耗的延遲,以消息創(chuàng)建到開始處理之間的時(shí)間來衡量。 我們將為此使用Dropwizard指標(biāo) :
class ClientProjection implements EventConsumer {private final ProjectionMetrics metrics;ClientProjection(ProjectionMetrics metrics) {this.metrics = metrics;}@Overridepublic Event consume(Event event) {metrics.latency(Duration.between(event.getCreated(), Instant.now()));Sleeper.randSleep(10, 1);return event;}}提取ProjectionMetrics類以分離職責(zé):
import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Slf4jReporter; import lombok.extern.slf4j.Slf4j;import java.time.Duration; import java.util.concurrent.TimeUnit;@Slf4j class ProjectionMetrics {private final Histogram latencyHist;ProjectionMetrics(MetricRegistry metricRegistry) {final Slf4jReporter reporter = Slf4jReporter.forRegistry(metricRegistry).outputTo(log).convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build();reporter.start(1, TimeUnit.SECONDS);latencyHist = metricRegistry.histogram(MetricRegistry.name(ProjectionMetrics.class, "latency"));}void latency(Duration duration) {latencyHist.update(duration.toMillis());} }現(xiàn)在,當(dāng)您運(yùn)行樸素的解決方案時(shí),您會(huì)Swift發(fā)現(xiàn)中值延遲以及99.9%的百分?jǐn)?shù)無限增長:
type=HISTOGRAM, [...] count=84, min=0, max=795, mean=404.88540608274104, [...]median=414.0, p75=602.0, p95=753.0, p98=783.0, p99=795.0, p999=795.0 type=HISTOGRAM, [...] count=182, min=0, max=1688, mean=861.1706371990878, [...]median=869.0, p75=1285.0, p95=1614.0, p98=1659.0, p99=1678.0, p999=1688.0[...30 seconds later...]type=HISTOGRAM, [...] count=2947, min=14, max=26945, mean=15308.138585757424, [...]median=16150.0, p75=21915.0, p95=25978.0, p98=26556.0, p99=26670.0, p999=26945.030秒后,我們的應(yīng)用程序平均會(huì)延遲15秒處理事件。 并非完全實(shí)時(shí) 。 顯然,缺少并發(fā)是任何原因。 我們的ClientProjection事件使用者大約需要10毫秒才能完成,因此每秒可以處理多達(dá)100個(gè)事件,而我們還需要一個(gè)數(shù)量級(jí)。 我們必須以某種方式擴(kuò)展ClientProjection 。 而且我們甚至都沒有觸及其他要求!
天真線程池
最明顯的解決方案是從多個(gè)線程調(diào)用EventConsumer 。 最簡單的方法是利用ExecutorService :
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;class NaivePool implements EventConsumer, Closeable {private final EventConsumer downstream;private final ExecutorService executorService;NaivePool(int size, EventConsumer downstream) {this.executorService = Executors.newFixedThreadPool(size);this.downstream = downstream;}@Overridepublic Event consume(Event event) {executorService.submit(() -> downstream.consume(event));return event;}@Overridepublic void close() throws IOException {executorService.shutdown();} }我們在這里使用裝飾器模式 。 實(shí)現(xiàn)EventConsumer的原始ClientProjection是正確的。 但是,我們將其與EventConsumer另一個(gè)實(shí)現(xiàn)并發(fā)包裝。 這將使我們能夠編寫復(fù)雜的行為而無需更改ClientProjection本身。 這樣的設(shè)計(jì)促進(jìn):
- 松散耦合:各種EventConsumer彼此都不了解,可以自由組合
- 單一責(zé)任:每個(gè)人都做一份工作,然后委派給下一個(gè)組成部分
- 開放/封閉原則 :我們可以在不修改現(xiàn)有實(shí)現(xiàn)的情況下更改系統(tǒng)的行為。
打開/關(guān)閉原理通常通過注入策略和模板方法模式來實(shí)現(xiàn)。 在這里,它甚至更簡單。 整體接線如下:
MetricRegistry metricRegistry =new MetricRegistry(); ProjectionMetrics metrics =new ProjectionMetrics(metricRegistry); ClientProjection clientProjection =new ClientProjection(metrics); NaivePool naivePool =new NaivePool(10, clientProjection); EventStream es = new EventStream(); es.consume(naivePool);我們精心設(shè)計(jì)的指標(biāo)表明情況確實(shí)好得多:
type=HISToOGRAM, count=838, min=1, max=422, mean=38.80768197277468, [...]median=37.0, p75=45.0, p95=51.0, p98=52.0, p99=52.0, p999=422.0 type=HISTOGRAM, count=1814, min=1, max=281, mean=47.82642776789085, [...]median=51.0, p75=57.0, p95=61.0, p98=62.0, p99=63.0, p999=65.0[...30 seconds later...]type=HISTOGRAM, count=30564, min=5, max=3838, mean=364.2904915942238, [...]median=352.0, p75=496.0, p95=568.0, p98=574.0, p99=1251.0, p999=3531.0然而,我們?nèi)匀豢吹窖舆t的規(guī)模越來越小,在30秒后,延遲達(dá)到了364毫秒。 它一直在增長,所以問題是系統(tǒng)的。 我們……需要……更多……指標(biāo)。 請注意, NaivePool (您很快就會(huì)知道為什么它是naive )有正好有10個(gè)線程N(yùn)aivePool 。 這應(yīng)該足以處理數(shù)千個(gè)事件,每個(gè)事件需要10毫秒來處理。 實(shí)際上,我們需要一點(diǎn)額外的處理能力,以避免垃圾收集后或負(fù)載高峰時(shí)出現(xiàn)問題。 為了證明線程池實(shí)際上是我們的瓶頸,最好監(jiān)視其內(nèi)部隊(duì)列。 這需要一些工作:
class NaivePool implements EventConsumer, Closeable {private final EventConsumer downstream;private final ExecutorService executorService;NaivePool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();String name = MetricRegistry.name(ProjectionMetrics.class, "queue");Gauge<Integer> gauge = queue::size;metricRegistry.register(name, gauge);this.executorService = new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, queue);this.downstream = downstream;}@Overridepublic Event consume(Event event) {executorService.submit(() -> downstream.consume(event));return event;}@Overridepublic void close() throws IOException {executorService.shutdown();} }這里的想法是手動(dòng)創(chuàng)建ThreadPoolExecutor ,以提供自定義的LinkedBlockingQueue實(shí)例。 我們稍后可以使用該隊(duì)列來監(jiān)視其長度(請參閱: ExecutorService – 10個(gè)技巧 )。 Gauge將定期調(diào)用queue::size并將其報(bào)告給您需要的地方。 度量標(biāo)準(zhǔn)確認(rèn)線程池大小確實(shí)是一個(gè)問題:
type=GAUGE, name=[...].queue, value=35 type=GAUGE, name=[...].queue, value=52[...30 seconds later...]type=GAUGE, name=[...].queue, value=601容納待處理任務(wù)的隊(duì)列的大小不斷增加,這會(huì)損害延遲。 線程池大小從10增加到20最終報(bào)告了不錯(cuò)的結(jié)果,并且沒有停頓。 但是,我們?nèi)匀粵]有解決重復(fù)項(xiàng),也沒有針對同一clientId防止事件的同時(shí)修改。
晦澀的鎖定
讓我們從避免對同一clientId的事件進(jìn)行并發(fā)處理開始。 如果兩個(gè)事件接連發(fā)生,并且都與同一個(gè)clientId相關(guān),那么NaivePool將選擇它們并開始同時(shí)處理它們。 首先,我們至少通過為每個(gè)clientId設(shè)置一個(gè)Lock來發(fā)現(xiàn)這種情況:
@Slf4j class FailOnConcurrentModification implements EventConsumer {private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();private final EventConsumer downstream;FailOnConcurrentModification(EventConsumer downstream) {this.downstream = downstream;}@Overridepublic Event consume(Event event) {Lock lock = findClientLock(event);if (lock.tryLock()) {try {downstream.consume(event);} finally {lock.unlock();}} else {log.error("Client {} already being modified by another thread", event.getClientId());}return event;}private Lock findClientLock(Event event) {return clientLocks.computeIfAbsent(event.getClientId(),clientId -> new ReentrantLock());}}這肯定是朝錯(cuò)誤的方向前進(jìn)。 復(fù)雜程度不堪重負(fù),但是運(yùn)行此代碼至少表明存在問題。 事件處理管道如下所示,一個(gè)裝飾器包裝了另一個(gè)裝飾器:
ClientProjection clientProjection =new ClientProjection(new ProjectionMetrics(metricRegistry)); FailOnConcurrentModification failOnConcurrentModification =new FailOnConcurrentModification(clientProjection); NaivePool naivePool =new NaivePool(10, failOnConcurrentModification, metricRegistry); EventStream es = new EventStream();es.consume(naivePool);偶爾會(huì)彈出錯(cuò)誤消息,告訴我們其他一些線程已經(jīng)在處理同一clientId事件。 對于每個(gè)clientId我們關(guān)聯(lián)一個(gè)檢查的Lock ,以便確定當(dāng)前是否有另一個(gè)線程不在處理該客戶端。 盡管丑陋,但實(shí)際上我們已經(jīng)接近殘酷的解決方案。 而不是因?yàn)榱硪粋€(gè)線程已經(jīng)在處理某個(gè)事件而無法獲取Lock時(shí)失敗,讓我們稍等一下,希望Lock可以被釋放:
@Slf4j class WaitOnConcurrentModification implements EventConsumer {private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();private final EventConsumer downstream;private final Timer lockWait;WaitOnConcurrentModification(EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;lockWait = metricRegistry.timer(MetricRegistry.name(WaitOnConcurrentModification.class, "lockWait"));}@Overridepublic Event consume(Event event) {try {final Lock lock = findClientLock(event);final Timer.Context time = lockWait.time();try {final boolean locked = lock.tryLock(1, TimeUnit.SECONDS);time.stop();if(locked) {downstream.consume(event);}} finally {lock.unlock();}} catch (InterruptedException e) {log.warn("Interrupted", e);}return event;}private Lock findClientLock(Event event) {return clientLocks.computeIfAbsent(event.getClientId(),clientId -> new ReentrantLock());}}這個(gè)想法非常相似。 但是, tryLock()失敗,它最多等待1秒,以希望釋放給定客戶端的Lock 。 如果兩個(gè)事件很快相繼發(fā)生,一個(gè)事件將獲得一個(gè)Lock并繼續(xù)執(zhí)行,而另一個(gè)事件將阻止等待unlock()發(fā)生。
不僅這些代碼確實(shí)令人費(fèi)解,而且還可能以許多微妙的方式被破壞。 例如,如果幾乎同一時(shí)間發(fā)生同一客戶clientId兩個(gè)事件,但是顯然是第一個(gè)事件呢? 這兩個(gè)事件將同時(shí)請求Lock ,并且我們無法保證哪個(gè)事件會(huì)首先獲得不公平的Lock ,從而可能會(huì)亂序使用事件。 肯定有更好的辦法…
專用線程
讓我們退后一步,深吸一口氣。 您如何確保事情不會(huì)同時(shí)發(fā)生? 好吧,只需使用一個(gè)線程! 事實(shí)上,這是我們一開始所做的,但吞吐量并不令人滿意。 但是我們不關(guān)心不同的clientId的并發(fā)性,我們只需要確保具有相同clientId事件始終由同一線程處理即可!
也許您會(huì)想到創(chuàng)建從clientId到Thread的映射? 好吧,這將過于簡單化。 我們將創(chuàng)建數(shù)千個(gè)線程,每個(gè)線程在大多數(shù)時(shí)間都根據(jù)需求空閑(對于給定的clientId每秒只有很少的事件)。 一個(gè)不錯(cuò)的折衷方案是固定大小的線程池,每個(gè)線程負(fù)責(zé)clientId的眾所周知的子集。 這樣,兩個(gè)不同的clientId可能會(huì)終止在同一線程上,但是同一clientId將始終由同一線程處理。 如果出現(xiàn)同一clientId兩個(gè)事件,則它們都將被路由到同一線程,從而避免了并發(fā)處理。 實(shí)現(xiàn)非常簡單:
class SmartPool implements EventConsumer, Closeable {private final List<ExecutorService> threadPools;private final EventConsumer downstream;SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;List<ExecutorService> list = IntStream.range(0, size).mapToObj(i -> Executors.newSingleThreadExecutor()).collect(Collectors.toList());this.threadPools = new CopyOnWriteArrayList<>(list);}@Overridepublic void close() throws IOException {threadPools.forEach(ExecutorService::shutdown);}@Overridepublic Event consume(Event event) {final int threadIdx = event.getClientId() % threadPools.size();final ExecutorService executor = threadPools.get(threadIdx);executor.submit(() -> downstream.consume(event));return event;} }關(guān)鍵部分就在最后:
int threadIdx = event.getClientId() % threadPools.size(); ExecutorService executor = threadPools.get(threadIdx);這個(gè)簡單的算法將始終對相同的clientId使用相同的單線程ExecutorService 。 不同的ID可在同一池中結(jié)束,例如,當(dāng)池大小是20 ,客戶機(jī)7 , 27 , 47等,將使用相同的線程。 但這可以,只要一個(gè)clientId始終使用同一線程即可。 此時(shí),不需要鎖定,并且可以保證順序調(diào)用,因?yàn)橥豢蛻舳说氖录冀K由同一線程執(zhí)行。 旁注:每個(gè)clientId一個(gè)線程無法擴(kuò)展,但是每個(gè)clientId一個(gè)角色(例如,在Akka中)是一個(gè)很好的主意,它可以簡化很多工作。
為了更加安全,我在每個(gè)線程池中插入了平均隊(duì)列大小的指標(biāo),從而使實(shí)現(xiàn)更長:
class SmartPool implements EventConsumer, Closeable {private final List<LinkedBlockingQueue<Runnable>> queues;private final List<ExecutorService> threadPools;private final EventConsumer downstream;SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;this.queues = IntStream.range(0, size).mapToObj(i -> new LinkedBlockingQueue<Runnable>()).collect(Collectors.toList());List<ThreadPoolExecutor> list = queues.stream().map(q -> new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, q)).collect(Collectors.toList());this.threadPools = new CopyOnWriteArrayList<>(list);metricRegistry.register(MetricRegistry.name(ProjectionMetrics.class, "queue"), (Gauge<Double>) this::averageQueueLength);}private double averageQueueLength() {double totalLength =queues.stream().mapToDouble(LinkedBlockingQueue::size).sum();return totalLength / queues.size();}//...}如果您偏執(zhí)狂,您甚至可以為每個(gè)隊(duì)列創(chuàng)建一個(gè)指標(biāo)。
重復(fù)數(shù)據(jù)刪除和冪等
在分布式環(huán)境中,當(dāng)生產(chǎn)者至少有一次保證時(shí),接收重復(fù)事件是很常見的。 這種行為的原因不在本文討論范圍之內(nèi),但我們必須學(xué)習(xí)如何解決該問題。 一種方法是將全局唯一標(biāo)識(shí)符( UUID )附加到每條消息,并確保在消費(fèi)者方面不會(huì)對具有相同標(biāo)識(shí)符的消息進(jìn)行兩次處理。 每個(gè)Event都有這樣的UUID 。 根據(jù)我們的要求,最直接的解決方案是簡單地存儲(chǔ)所有可見的UUID并在到達(dá)時(shí)驗(yàn)證接收到的UUID從未見過。 按原樣使用ConcurrentHashMap<UUID, UUID> (JDK中沒有ConcurrentHashSet )會(huì)導(dǎo)致內(nèi)存泄漏,因?yàn)殡S著時(shí)間的推移,我們將不斷積累越來越多的ID。 這就是為什么我們僅在最近10秒內(nèi)查找重復(fù)項(xiàng)。 從技術(shù)上講,您可以擁有ConcurrentHashMap<UUID, Instant> ,當(dāng)遇到該問題時(shí),它會(huì)從UUID映射到時(shí)間戳。 通過使用后臺(tái)線程,我們可以刪除10秒鐘以上的元素。 但是,如果您是快樂的Guava用戶,則具有聲明性驅(qū)逐策略的Cache<UUID, UUID>將達(dá)到目的:
import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder;import java.util.UUID; import java.util.concurrent.TimeUnit;class IgnoreDuplicates implements EventConsumer {private final EventConsumer downstream;private Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();IgnoreDuplicates(EventConsumer downstream) {this.downstream = downstream;}@Overridepublic Event consume(Event event) {final UUID uuid = event.getUuid();if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {return downstream.consume(event);} else {return event;}} }為了保證生產(chǎn)安全,我至少可以想到兩個(gè)指標(biāo)可能會(huì)有用:緩存大小和發(fā)現(xiàn)的重復(fù)項(xiàng)數(shù)量。 讓我們也插入以下指標(biāo):
class IgnoreDuplicates implements EventConsumer {private final EventConsumer downstream;private final Meter duplicates;private Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();IgnoreDuplicates(EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;duplicates = metricRegistry.meter(MetricRegistry.name(IgnoreDuplicates.class, "duplicates"));metricRegistry.register(MetricRegistry.name(IgnoreDuplicates.class, "cacheSize"), (Gauge<Long>) seenUuids::size);}@Overridepublic Event consume(Event event) {final UUID uuid = event.getUuid();if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {return downstream.consume(event);} else {duplicates.mark();return event;}} }最終,我們擁有了構(gòu)建解決方案的所有要素。 這個(gè)想法是由彼此包裝的EventConsumer實(shí)例組成管道:
您可以選擇在SmartPool和ClientProjection之間放置FailOnConcurrentModification步驟,以提高安全性(設(shè)計(jì)上不應(yīng)進(jìn)行并發(fā)修改):
ClientProjection clientProjection =new ClientProjection(new ProjectionMetrics(metricRegistry)); FailOnConcurrentModification concurrentModification =new FailOnConcurrentModification(clientProjection); SmartPool smartPool =new SmartPool(12, concurrentModification, metricRegistry); IgnoreDuplicates withoutDuplicates =new IgnoreDuplicates(smartPool, metricRegistry); EventStream es = new EventStream(); es.consume(withoutDuplicates);我們花了很多工作才能提出相對簡單且結(jié)構(gòu)合理的解決方案(我希望您同意)。 最后,解決并發(fā)問題的最佳方法是……避免并發(fā),并在一個(gè)線程中運(yùn)行受競爭條件約束的代碼。 這也是Akka actor(每個(gè)actor處理單個(gè)消息)和RxJava( Subscriber處理的一條消息)背后的思想。 在下一部分中,我們將在RxJava中看到聲明式解決方案。
翻譯自: https://www.javacodegeeks.com/2016/10/small-scale-stream-processing-kata-part-1-thread-pools.html
kata
總結(jié)
以上是生活随笔為你收集整理的kata_小规模流处理kata。 第1部分:线程池的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 沙白瓜的功效与作用 沙白瓜的功效有哪些
- 下一篇: 驶怎么组词 驶如何组词