小规模流处理kata。 第1部分:线程池
我再次為我的公司在GeeCON 2016上舉辦了編程競賽。 這次分配需要設計并根據(jù)以下要求選擇實施系統(tǒng):
一個系統(tǒng)每秒傳送約一千個事件。 每個Event至少具有兩個屬性:
- clientId –我們期望一個客戶端每秒最多可以處理幾個事件
- UUID –全球唯一
消耗一個事件大約需要10毫秒。 設計此類流的使用者:
這些要求中沒有幾個重要的細節(jié):
在本文中,我將指導您完成幾個正確的解決方案,并進行一些嘗試。 您還將學習如何使用少量精確定位的指標來解決問題。
天真的順序處理
讓我們通過迭代解決這個問題。 首先,我們必須對API進行一些假設。 想象一下:
interface EventStream {void consume(EventConsumer consumer);}@FunctionalInterface interface EventConsumer {Event consume(Event event); }@Value class Event {private final Instant created = Instant.now();private final int clientId;private final UUID uuid;}典型的基于推送的API,類似于JMS。 一個重要的注意事項是EventConsumer正在阻止,這意味著直到EventConsumer消耗了前一個Event ,它才交付新的Event 。 這只是我所做的一個假設,并沒有徹底改變需求。 這也是JMS中消息偵聽器的工作方式。 天真的實現(xiàn)只附加了一個偵聽器,該偵聽器需要大約10毫秒才能完成:
class ClientProjection implements EventConsumer {@Overridepublic Event consume(Event event) {Sleeper.randSleep(10, 1);return event;}}當然,在現(xiàn)實生活中,該消費者會將一些東西存儲在數(shù)據(jù)庫中,進行遠程調(diào)用等。我在睡眠時間分配中添加了一些隨機性,以使手動測試更加實際:
class Sleeper {private static final Random RANDOM = new Random();static void randSleep(double mean, double stdDev) {final double micros = 1_000 * (mean + RANDOM.nextGaussian() * stdDev);try {TimeUnit.MICROSECONDS.sleep((long) micros);} catch (InterruptedException e) {throw new RuntimeException(e);}}}//...EventStream es = new EventStream(); //some real implementation here es.consume(new ClientProjection());它可以編譯并運行,但是為了確定未滿足要求,我們必須插入少量指標。 最重要的度量標準是消息消耗的延遲,以消息創(chuàng)建到開始處理之間的時間來衡量。 我們將為此使用Dropwizard指標 :
class ClientProjection implements EventConsumer {private final ProjectionMetrics metrics;ClientProjection(ProjectionMetrics metrics) {this.metrics = metrics;}@Overridepublic Event consume(Event event) {metrics.latency(Duration.between(event.getCreated(), Instant.now()));Sleeper.randSleep(10, 1);return event;}}提取ProjectionMetrics類以分離職責:
import com.codahale.metrics.Histogram; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Slf4jReporter; import lombok.extern.slf4j.Slf4j;import java.time.Duration; import java.util.concurrent.TimeUnit;@Slf4j class ProjectionMetrics {private final Histogram latencyHist;ProjectionMetrics(MetricRegistry metricRegistry) {final Slf4jReporter reporter = Slf4jReporter.forRegistry(metricRegistry).outputTo(log).convertRatesTo(TimeUnit.SECONDS).convertDurationsTo(TimeUnit.MILLISECONDS).build();reporter.start(1, TimeUnit.SECONDS);latencyHist = metricRegistry.histogram(MetricRegistry.name(ProjectionMetrics.class, "latency"));}void latency(Duration duration) {latencyHist.update(duration.toMillis());} }現(xiàn)在,當您運行幼稚的解決方案時,您會很快發(fā)現(xiàn)中值延遲以及99.9%的百分數(shù)無限增長:
type=HISTOGRAM, [...] count=84, min=0, max=795, mean=404.88540608274104, [...]median=414.0, p75=602.0, p95=753.0, p98=783.0, p99=795.0, p999=795.0 type=HISTOGRAM, [...] count=182, min=0, max=1688, mean=861.1706371990878, [...]median=869.0, p75=1285.0, p95=1614.0, p98=1659.0, p99=1678.0, p999=1688.0[...30 seconds later...]type=HISTOGRAM, [...] count=2947, min=14, max=26945, mean=15308.138585757424, [...]median=16150.0, p75=21915.0, p95=25978.0, p98=26556.0, p99=26670.0, p999=26945.030秒后,我們的應用程序平均會延遲15秒處理事件。 并非完全實時 。 顯然,缺少并發(fā)是任何原因。 我們的ClientProjection事件使用者大約需要10毫秒才能完成,因此它每秒可以處理多達100個事件,而我們還需要一個數(shù)量級。 我們必須以某種方式擴展ClientProjection 。 而且我們甚至都沒有觸及其他要求!
天真線程池
最明顯的解決方案是從多個線程調(diào)用EventConsumer 。 最簡單的方法是利用ExecutorService :
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;class NaivePool implements EventConsumer, Closeable {private final EventConsumer downstream;private final ExecutorService executorService;NaivePool(int size, EventConsumer downstream) {this.executorService = Executors.newFixedThreadPool(size);this.downstream = downstream;}@Overridepublic Event consume(Event event) {executorService.submit(() -> downstream.consume(event));return event;}@Overridepublic void close() throws IOException {executorService.shutdown();} }我們在這里使用裝飾器模式 。 實現(xiàn)EventConsumer的原始ClientProjection是正確的。 但是,我們使用EventConsumer另一個實現(xiàn)來包裝它,該實現(xiàn)增加了并發(fā)性。 這將使我們能夠編寫復雜的行為而無需更改ClientProjection本身。 這樣的設計促進:
- 松散耦合:各種EventConsumer彼此都不了解,可以自由組合
- 單一職責:每個人都做一份工作,然后委派給下一個組成部分
- 開放/封閉原則 :我們可以在不修改現(xiàn)有實現(xiàn)的情況下更改系統(tǒng)的行為。
打開/關(guān)閉原理通常通過注入策略和模板方法模式來實現(xiàn)。 在這里,它甚至更簡單。 整體接線如下:
MetricRegistry metricRegistry =new MetricRegistry(); ProjectionMetrics metrics =new ProjectionMetrics(metricRegistry); ClientProjection clientProjection =new ClientProjection(metrics); NaivePool naivePool =new NaivePool(10, clientProjection); EventStream es = new EventStream(); es.consume(naivePool);我們精心設計的指標表明情況確實好得多:
type=HISToOGRAM, count=838, min=1, max=422, mean=38.80768197277468, [...]median=37.0, p75=45.0, p95=51.0, p98=52.0, p99=52.0, p999=422.0 type=HISTOGRAM, count=1814, min=1, max=281, mean=47.82642776789085, [...]median=51.0, p75=57.0, p95=61.0, p98=62.0, p99=63.0, p999=65.0[...30 seconds later...]type=HISTOGRAM, count=30564, min=5, max=3838, mean=364.2904915942238, [...]median=352.0, p75=496.0, p95=568.0, p98=574.0, p99=1251.0, p999=3531.0但是,我們?nèi)匀豢吹窖舆t的規(guī)模越來越小,在30秒后,延遲達到了364毫秒。 它一直在增長,所以問題是系統(tǒng)的。 我們……需要……更多……指標。 請注意, NaivePool (您很快就會知道為什么它是naive )有正好有10個線程NaivePool 。 這應該足以處理數(shù)千個事件,每個事件需要10毫秒來處理。 實際上,我們需要一點額外的處理能力,以避免垃圾收集后或負載高峰時出現(xiàn)問題。 為了證明線程池實際上是我們的瓶頸,最好監(jiān)視其內(nèi)部隊列。 這需要一些工作:
class NaivePool implements EventConsumer, Closeable {private final EventConsumer downstream;private final ExecutorService executorService;NaivePool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();String name = MetricRegistry.name(ProjectionMetrics.class, "queue");Gauge<Integer> gauge = queue::size;metricRegistry.register(name, gauge);this.executorService = new ThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, queue);this.downstream = downstream;}@Overridepublic Event consume(Event event) {executorService.submit(() -> downstream.consume(event));return event;}@Overridepublic void close() throws IOException {executorService.shutdown();} }這里的想法是手動創(chuàng)建ThreadPoolExecutor ,以提供自定義的LinkedBlockingQueue實例。 我們稍后可以使用該隊列來監(jiān)視其長度(請參閱: ExecutorService – 10個技巧 )。 Gauge將定期調(diào)用queue::size并將其報告給您需要的地方。 度量標準確認線程池大小確實是一個問題:
type=GAUGE, name=[...].queue, value=35 type=GAUGE, name=[...].queue, value=52[...30 seconds later...]type=GAUGE, name=[...].queue, value=601容納待處理任務的隊列的大小不斷增加,這會損害延遲。 將線程池大小從10增加到20最終會報告出不錯的結(jié)果,并且沒有停頓。 但是,我們?nèi)匀粵]有解決重復項,也沒有針對同一clientId防止事件的同時修改。
模糊鎖定
讓我們從避免對同一clientId的事件進行并發(fā)處理開始。 如果兩個事件接連發(fā)生,并且都與同一個clientId相關(guān),那么NaivePool將同時選擇它們并開始同時處理它們。 首先,我們至少通過為每個clientId設置一個Lock來發(fā)現(xiàn)這種情況:
@Slf4j class FailOnConcurrentModification implements EventConsumer {private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();private final EventConsumer downstream;FailOnConcurrentModification(EventConsumer downstream) {this.downstream = downstream;}@Overridepublic Event consume(Event event) {Lock lock = findClientLock(event);if (lock.tryLock()) {try {downstream.consume(event);} finally {lock.unlock();}} else {log.error("Client {} already being modified by another thread", event.getClientId());}return event;}private Lock findClientLock(Event event) {return clientLocks.computeIfAbsent(event.getClientId(),clientId -> new ReentrantLock());}}這肯定是朝錯誤的方向前進。 復雜程度不計其數(shù),但運行此代碼至少表明存在問題。 事件處理管道如下所示,一個裝飾器包裝了另一個裝飾器:
ClientProjection clientProjection =new ClientProjection(new ProjectionMetrics(metricRegistry)); FailOnConcurrentModification failOnConcurrentModification =new FailOnConcurrentModification(clientProjection); NaivePool naivePool =new NaivePool(10, failOnConcurrentModification, metricRegistry); EventStream es = new EventStream();es.consume(naivePool);有時會彈出錯誤消息,告訴我們其他一些線程已經(jīng)在處理同一clientId事件。 對于每個clientId我們關(guān)聯(lián)一個我們檢查的Lock ,以便確定當前是否有另一個線程不在處理該客戶端。 盡管丑陋,但實際上我們已經(jīng)接近殘酷的解決方案。 而不是因為另一個線程已經(jīng)在處理某個事件而無法獲得Lock時失敗,讓我們稍等一下,希望Lock可以被釋放:
@Slf4j class WaitOnConcurrentModification implements EventConsumer {private final ConcurrentMap<Integer, Lock> clientLocks = new ConcurrentHashMap<>();private final EventConsumer downstream;private final Timer lockWait;WaitOnConcurrentModification(EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;lockWait = metricRegistry.timer(MetricRegistry.name(WaitOnConcurrentModification.class, "lockWait"));}@Overridepublic Event consume(Event event) {try {final Lock lock = findClientLock(event);final Timer.Context time = lockWait.time();try {final boolean locked = lock.tryLock(1, TimeUnit.SECONDS);time.stop();if(locked) {downstream.consume(event);}} finally {lock.unlock();}} catch (InterruptedException e) {log.warn("Interrupted", e);}return event;}private Lock findClientLock(Event event) {return clientLocks.computeIfAbsent(event.getClientId(),clientId -> new ReentrantLock());}}這個想法非常相似。 但是, tryLock()失敗,它最多等待1秒,以希望釋放給定客戶端的Lock 。 如果兩個事件很快相繼發(fā)生,則一個事件將獲得一個Lock并繼續(xù)執(zhí)行,而另一個事件將阻止等待unlock()發(fā)生。
不僅這些代碼確實令人費解,而且還可能以許多微妙的方式被破壞。 例如,如果同一個clientId兩個事件幾乎完全同時發(fā)生,但顯然是第一個事件,該怎么辦? 這兩個事件將同時請求Lock ,并且我們無法保證哪個事件會首先獲得不公平的Lock ,從而可能會亂序使用事件。 肯定有更好的辦法…
專用線程
讓我們退后一步,深吸一口氣。 您如何確保事情不會同時發(fā)生? 好吧,只需使用一個線程! 事實上,這是我們一開始所做的,但是吞吐量并不令人滿意。 但是我們不關(guān)心不同的clientId的并發(fā)性,我們只需要確保具有相同clientId事件始終由同一線程處理即可!
也許您會想到創(chuàng)建從clientId到Thread的映射? 好吧,這將過于簡單化。 我們將創(chuàng)建成千上萬個線程,大部分時間根據(jù)需求空閑(對于給定的clientId每秒只有很少的事件)。 一個不錯的折衷方案是固定大小的線程池,每個線程負責clientId的眾所周知的子集。 這樣,兩個不同的clientId可以結(jié)束在同一線程上,但是同一clientId將始終由同一線程處理。 如果出現(xiàn)同一clientId兩個事件,則它們都將被路由到同一線程,從而避免了并發(fā)處理。 實現(xiàn)非常簡單:
class SmartPool implements EventConsumer, Closeable {private final List<ExecutorService> threadPools;private final EventConsumer downstream;SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;List<ExecutorService> list = IntStream.range(0, size).mapToObj(i -> Executors.newSingleThreadExecutor()).collect(Collectors.toList());this.threadPools = new CopyOnWriteArrayList<>(list);}@Overridepublic void close() throws IOException {threadPools.forEach(ExecutorService::shutdown);}@Overridepublic Event consume(Event event) {final int threadIdx = event.getClientId() % threadPools.size();final ExecutorService executor = threadPools.get(threadIdx);executor.submit(() -> downstream.consume(event));return event;} }關(guān)鍵部分就在最后:
int threadIdx = event.getClientId() % threadPools.size(); ExecutorService executor = threadPools.get(threadIdx);這個簡單的算法將始終對相同的clientId使用相同的單線程ExecutorService 。 不同的ID可在同一池中結(jié)束,例如,當池大小是20 ,客戶機7 , 27 , 47等,將使用相同的線程。 但這可以,只要一個clientId始終使用同一線程即可。 此時,不需要鎖定,并且可以保證順序調(diào)用,因為同一客戶端的事件始終由同一線程執(zhí)行。 旁注:每個clientId一個線程無法擴展,但是每個clientId一個角色(例如,在Akka中)是一個很好的主意,它可以簡化很多工作。
為了更加安全,我在每個線程池中插入了平均隊列大小的指標,從而使實現(xiàn)更長:
class SmartPool implements EventConsumer, Closeable {private final List<LinkedBlockingQueue<Runnable>> queues;private final List<ExecutorService> threadPools;private final EventConsumer downstream;SmartPool(int size, EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;this.queues = IntStream.range(0, size).mapToObj(i -> new LinkedBlockingQueue<Runnable>()).collect(Collectors.toList());List<ThreadPoolExecutor> list = queues.stream().map(q -> new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, q)).collect(Collectors.toList());this.threadPools = new CopyOnWriteArrayList<>(list);metricRegistry.register(MetricRegistry.name(ProjectionMetrics.class, "queue"), (Gauge<Double>) this::averageQueueLength);}private double averageQueueLength() {double totalLength =queues.stream().mapToDouble(LinkedBlockingQueue::size).sum();return totalLength / queues.size();}//...}如果您偏執(zhí)狂,甚至可以為每個隊列創(chuàng)建一個指標。
重復數(shù)據(jù)刪除和冪等
在分布式環(huán)境中,當生產(chǎn)者至少有一次保證時,接收重復事件是很常見的。 這種行為背后的原因不在本文討論范圍之內(nèi),但我們必須學習如何解決該問題。 一種方法是將全局唯一標識符( UUID )附加到每封郵件,并在使用方確保具有相同標識符的郵件不會被處理兩次。 每個Event都有這樣的UUID 。 根據(jù)我們的要求,最直接的解決方案是簡單地存儲所有可見的UUID并在到達時驗證接收到的UUID從未見過。 按原樣使用ConcurrentHashMap<UUID, UUID> (JDK中沒有ConcurrentHashSet )會導致內(nèi)存泄漏,因為隨著時間的推移,我們將不斷積累越來越多的ID。 這就是為什么我們僅在最近10秒內(nèi)查找重復項。 從技術(shù)上講,您可以擁有ConcurrentHashMap<UUID, Instant> ,在遇到該問題時可以將其從UUID映射到時間戳。 通過使用后臺線程,我們可以刪除10秒鐘以上的元素。 但是,如果您是快樂的Guava用戶,則具有聲明驅(qū)逐策略的Cache<UUID, UUID>可以解決此問題:
import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder;import java.util.UUID; import java.util.concurrent.TimeUnit;class IgnoreDuplicates implements EventConsumer {private final EventConsumer downstream;private Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();IgnoreDuplicates(EventConsumer downstream) {this.downstream = downstream;}@Overridepublic Event consume(Event event) {final UUID uuid = event.getUuid();if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {return downstream.consume(event);} else {return event;}} }為了保證生產(chǎn)安全,我至少認為有兩個指標可能會有用:緩存大小和發(fā)現(xiàn)的重復項數(shù)量。 讓我們也插入以下指標:
class IgnoreDuplicates implements EventConsumer {private final EventConsumer downstream;private final Meter duplicates;private Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();IgnoreDuplicates(EventConsumer downstream, MetricRegistry metricRegistry) {this.downstream = downstream;duplicates = metricRegistry.meter(MetricRegistry.name(IgnoreDuplicates.class, "duplicates"));metricRegistry.register(MetricRegistry.name(IgnoreDuplicates.class, "cacheSize"), (Gauge<Long>) seenUuids::size);}@Overridepublic Event consume(Event event) {final UUID uuid = event.getUuid();if (seenUuids.asMap().putIfAbsent(uuid, uuid) == null) {return downstream.consume(event);} else {duplicates.mark();return event;}} }最終,我們擁有了構(gòu)建解決方案的所有要素。 這個想法是由相互封裝的EventConsumer實例組成管道:
您可以選擇在SmartPool和ClientProjection之間放置FailOnConcurrentModification步驟,以提高安全性(設計時不應進行并發(fā)修改):
ClientProjection clientProjection =new ClientProjection(new ProjectionMetrics(metricRegistry)); FailOnConcurrentModification concurrentModification =new FailOnConcurrentModification(clientProjection); SmartPool smartPool =new SmartPool(12, concurrentModification, metricRegistry); IgnoreDuplicates withoutDuplicates =new IgnoreDuplicates(smartPool, metricRegistry); EventStream es = new EventStream(); es.consume(withoutDuplicates);我們花了很多工作才能提出相對簡單且結(jié)構(gòu)合理的解決方案(我希望您同意)。 最后,解決并發(fā)問題的最佳方法是……避免并發(fā)并在一個線程中運行受競爭條件約束的代碼。 這也是Akka actor(每個actor處理單個消息)和RxJava( Subscriber處理的一條消息)背后的思想。 在下一部分中,我們將在RxJava中看到聲明式解決方案。
翻譯自: https://www.javacodegeeks.com/2016/10/small-scale-stream-processing-kata-part-1-thread-pools.html
創(chuàng)作挑戰(zhàn)賽新人創(chuàng)作獎勵來咯,堅持創(chuàng)作打卡瓜分現(xiàn)金大獎總結(jié)
以上是生活随笔為你收集整理的小规模流处理kata。 第1部分:线程池的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java word批注_编写Java批注
- 下一篇: 苹果 iOS 17 拍照引入“HEIF