kata_小规模流处理kata。 第2部分:RxJava 1.x / 2.x
kata
在第1部分:線程池中,我們設計并實現了相對簡單的系統,用于實時處理事件。 確保您閱讀了上一部分,因為它包含一些我們將重用的類。 以防萬一這是要求:
一個系統每秒發送大約一千個事件。 每個Event至少具有兩個屬性:
- clientId –我們期望一個客戶端每秒最多可以處理幾個事件
- UUID –全球唯一
消耗一個事件大約需要10毫秒。 設計此類流的使用者:
到目前為止,我們提出的是線程池和共享緩存的組合。 這次我們將使用RxJava實現解決方案。 首先,我沒有透露EventStream的實現方式,僅提供了API:
interface EventStream {void consume(EventConsumer consumer);}實際上,對于手動測試,我構建了一個簡單的RxJava流,其行為與系統的要求類似:
@Slf4j class EventStream {void consume(EventConsumer consumer) {observe().subscribe(consumer::consume,e -> log.error("Error emitting event", e));}Observable<Event> observe() {return Observable.interval(1, TimeUnit.MILLISECONDS).delay(x -> Observable.timer(RandomUtils.nextInt(0, 1_000), TimeUnit.MICROSECONDS)).map(x -> new Event(RandomUtils.nextInt(1_000, 1_100), UUID.randomUUID())).flatMap(this::occasionallyDuplicate, 100).observeOn(Schedulers.io());}private Observable<Event> occasionallyDuplicate(Event x) {final Observable<Event> event = Observable.just(x);if (Math.random() >= 0.01) {return event;}final Observable<Event> duplicated =event.delay(RandomUtils.nextInt(10, 5_000), TimeUnit.MILLISECONDS);return event.concatWith(duplicated);}}了解此模擬器的工作原理不是必不可少的,但很有趣。 首先,我們產生的源源不斷的Long值( 0 , 1 , 2 ...)每毫秒使用(每秒千個事件) interval()操作。 然后,我們使用delay()運算符將每個事件延遲0到1_000微秒之間的隨機時間。 這樣,事件將在難以預測的時刻出現,而情況會更加現實。 最后,我們將每個Long值映射(使用ekhem, map()運算符) map()到一個隨機Event其中clientId介于1_000到1_100 (包含在內)之間。
最后一點很有趣。 我們想模擬偶爾的重復。 為此,我們將每個事件(使用flatMap() )映射到自身(在99%的情況下)。 但是,在1%的情況下,我們兩次返回此事件,第二次發生在10毫秒至5秒后。 在實踐中,該事件的重復實例將在其他數百個事件之后出現,這使流的行為逼真。
與EventStream交互的方式有兩種-通過consume()回調和通過observe()流。 我們可以利用Observable<Event>來快速建立功能與第1部分非常相似但更簡單的處理管道。
缺少背壓
利用RxJava的第一個幼稚方法很快就失敗了:
EventStream es = new EventStream(); EventConsumer clientProjection = new ClientProjection(new ProjectionMetrics(new MetricRegistry()));es.observe().subscribe(clientProjection::consume,e -> log.error("Fatal error", e));( ClientProjection , ProjectionMetrics等人來自第1部分 )。 我們幾乎立即獲得MissingBackpressureException ,這是預期的。 還記得我們的第一個解決方案是如何通過處理越來越多的延遲來滯后嗎? RxJava嘗試避免這種情況,并避免隊列溢出。 由于使用者( ClientProjection )無法實時處理事件,因此拋出MissingBackpressureException 。 這是快速失敗的行為。 最快的解決方案是像以前一樣使用RxJava的功能將消耗轉移到單獨的線程池中:
EventStream es = new EventStream(); EventConsumer clientProjection = new FailOnConcurrentModification(new ClientProjection(new ProjectionMetrics(new MetricRegistry())));es.observe().flatMap(e -> clientProjection.consume(e, Schedulers.io())).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c -> log.info("Processed {} events/s", c),e -> log.error("Fatal error", e));EventConsumer接口具有一個輔助方法,該方法可以在提供的Scheduler上異步使用事件:
@FunctionalInterface interface EventConsumer {Event consume(Event event);default Observable<Event> consume(Event event, Scheduler scheduler) {return Observable.fromCallable(() -> this.consume(event)).subscribeOn(scheduler);}}通過在單獨的Scheduler.io()使用flatMap()使用事件,可以異步調用每個使用。 這次事件幾乎是實時處理的,但是存在更大的問題。 由于某種原因,我用FailOnConcurrentModification裝飾了ClientProjection 。 事件彼此獨立使用,因此可能會同時處理同一clientId兩個事件。 不好。 幸運的是,在RxJava中解決此問題比使用普通線程要容易得多:
es.observe().groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c -> log.info("Processed {} events/s", c),e -> log.error("Fatal error", e));有點改變了。 首先,我們將事件按clientId分組。 這將單個Observable流拆分為流 。 每個名為byClient子流代表與同一clientId相關的所有事件。 現在,如果我們映射到此子流,我們可以確保與同一個clientId相關的事件不會同時處理。 外部流是惰性的,因此我們必須訂閱它。 與其單獨訂閱每個事件,我們不每秒收集事件并進行計數。 這樣,我們每秒就會收到一個Integer類型的單個事件,該事件表示每秒消耗的事件數。
使用全局狀態的不純,非慣常,容易出錯,不安全的重復數據刪除解決方案
現在我們必須刪除重復的UUID 。 丟棄重復項的最簡單但非常愚蠢的方法是利用全局狀態。 我們可以通過在filter()運算符之外可用的緩存中查找重復項來簡單地過濾掉重復項:
final Cache<UUID, UUID> seenUuids = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();es.observe().filter(e -> seenUuids.getIfPresent(e.getUuid()) == null).doOnNext(e -> seenUuids.put(e.getUuid(), e.getUuid())).subscribe(clientProjection::consume,e -> log.error("Fatal error", e));如果要監視此機制的使用,只需添加指標:
Meter duplicates = metricRegistry.meter("duplicates");es.observe().filter(e -> {if (seenUuids.getIfPresent(e.getUuid()) != null) {duplicates.mark();return false;} else {return true;}})從操作員內部訪問全局狀態,尤其是可變狀態是非常危險的,并且破壞了RxJava的唯一目的–簡化并發。 顯然,我們使用了Guava的線程安全Cache ,但是在許多情況下,很容易錯過從多個線程訪問共享全局可變狀態的地方。 如果您發現自己在運算符鏈之外對某些變量進行了變異,請非常小心。
RxJava 1.x中的自定義
RxJava 1.x有一個distinct()運算符,大概可以完成此工作:
es.observe().distinct(Event::getUuid).groupBy(Event::getClientId)不幸的是, distinct()在內部將所有密鑰( UUID distinct()存儲在不斷增長的HashSet 。 但是我們只關心最近10秒鐘內的重復! 通過復制粘貼DistinctOperator的實現,我創建了DistinctEvent運算符,該運算符利用Guava的緩存僅存儲了最后10秒鐘的UUID值。 我故意在此運算符中對Event進行硬編碼,而不是使其通用性更強,以使代碼更易于理解:
class DistinctEvent implements Observable.Operator<Event, Event> {private final Duration duration;DistinctEvent(Duration duration) {this.duration = duration;}@Overridepublic Subscriber<? super Event> call(Subscriber<? super Event> child) {return new Subscriber<Event>(child) {final Map<UUID, Boolean> keyMemory = CacheBuilder.newBuilder().expireAfterWrite(duration.toMillis(), TimeUnit.MILLISECONDS).<UUID, Boolean>build().asMap();@Overridepublic void onNext(Event event) {if (keyMemory.put(event.getUuid(), true) == null) {child.onNext(event);} else {request(1);}}@Overridepublic void onError(Throwable e) {child.onError(e);}@Overridepublic void onCompleted() {child.onCompleted();}};} }用法非常簡單,整個實現(加上自定義運算符)如下:
es.observe().lift(new DistinctEvent(Duration.ofSeconds(10))).groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).window(1, TimeUnit.SECONDS).flatMap(Observable::count).subscribe(c -> log.info("Processed {} events/s", c),e -> log.error("Fatal error", e));實際上,如果您跳過每秒的日志記錄,它甚至可以更短:
es.observe().lift(new DistinctEvent(Duration.ofSeconds(10))).groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).subscribe(e -> {},e -> log.error("Fatal error", e));該解決方案比以前的基于線程池和裝飾器的解決方案要短得多。 唯一尷尬的部分是自定義運算符,它在存儲太多歷史UUID時避免內存泄漏。 幸運的是RxJava 2得以解救!
RxJava 2.x和更強大的內置
實際上,我是從提交公關RxJava具有更強大的執行這種緊密distinct()操作。 但是在我檢查2.x分支之前,它是: distinct()允許提供自定義Collection ,而不是硬編碼的HashSet 。 信不信由你,依賴倒置不僅涉及Spring框架或Java EE。 當庫允許您提供其內部數據結構的自定義實現時,這也是DI。 首先,我創建一個輔助方法,該方法可以構建由Map<UUID, Boolean>支持,由Cache<UUID, Boolean>支持的Set<UUID> Cache<UUID, Boolean> 。 我們一定喜歡代表團!
private Set<UUID> recentUuids() {return Collections.newSetFromMap(CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).<UUID, Boolean>build().asMap()); }有了這種方法,我們可以使用以下表達式實現整個任務:
es.observe().distinct(Event::getUuid, this::recentUuids).groupBy(Event::getClientId).flatMap(byClient -> byClient.observeOn(Schedulers.io()).map(clientProjection::consume)).subscribe(e -> {},e -> log.error("Fatal error", e));優雅,簡潔,清晰! 它看起來像是一個問題:
- 觀察事件流
- 僅考慮不同的UUID
- 客戶分組活動
- 為每個客戶消耗(順序)
希望您喜歡所有這些解決方案,并發現它們對您的日常工作很有用。
也可以看看:
- 小規模流處理kata。 第1部分:線程池
- 小規模流處理kata。 第2部分:RxJava 1.x / 2.x
翻譯自: https://www.javacodegeeks.com/2016/10/small-scale-stream-processing-kata-part-2-rxjava-1-x2-x.html
kata
總結
以上是生活随笔為你收集整理的kata_小规模流处理kata。 第2部分:RxJava 1.x / 2.x的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安卓还原微信聊天记录简单方法(安卓还原微
- 下一篇: apache ignite_使用Apac