在Kafka中发布订阅模型
這是第四個柱中的一系列關于同步客戶端集成與異步系統( 1, 2, 3 )。 在這里,我們將嘗試了解Kafka的工作方式,以便正確利用其發布-訂閱實現。
卡夫卡概念
根據官方文件 :
Kafka是一種分布式的,分區的,復制的提交日志服務。 它提供消息傳遞系統的功能,但具有獨特的設計。
Kafka作為集群運行,這些節點稱為代理。 代理可以是領導者或副本,以提供高可用性和容錯能力。 代理負責分區,分區是存儲消息的分發單元。 這些消息是有序的,可以通過名為offset的索引進行訪問。 一組分區構成一個主題,是消息的提要。 一個分區可以有不同的使用者,它們使用自己的偏移量訪問消息。 生產者將消息發布到Kafka主題中。 Kafka文檔中的以下圖表可以幫助您理解以下內容:
排隊與發布-訂閱
消費者群體是另一個關鍵概念,有助于解釋為什么Kafka比RabbitMQ等其他消息傳遞解決方案更靈活,功能更強大。 消費者與消費者群體相關聯。 如果每個使用者都屬于同一個使用者組,則主題的消息將在各個使用者之間平均負載均衡; 這就是所謂的“排隊模型”。 相反,如果每個使用者都屬于不同的使用者組,則所有消息都將在每個客戶端中使用。 這就是所謂的“發布-訂閱”模型。
您可以混合使用這兩種方法,分別針對不同的需求使用不同的邏輯使用者組,并在每個組中有多個使用者以通過并行提高吞吐量。 同樣, Kafka文檔中的另一個圖表:
了解我們的需求
正如我們在以前的文章(見1, 2, 3 )該項目服務發布消息到卡夫卡的話題叫item_deleted 。 此消息將位于該主題的一個分區中。 為了定義消息將駐留在哪個分區中,Kafka提供了三種選擇 :
- 如果記錄中指定了分區,請使用它
- 如果未指定分區但存在鍵,則根據鍵的哈希值選擇一個分區
- 如果沒有分區或密鑰,則以循環方式選擇一個分區
我們將使用item_id作為密鑰。 執法服務的不同實例中包含的消費者僅對特定分區感興趣,因為他們保留某些商品的內部狀態。 讓我們檢查不同的Kafka使用者實現,以了解哪種使用最方便。
卡夫卡消費者
卡夫卡共有三個消費者: 高級消費者 , 簡單消費者和新消費者
在這三個消費者中, 簡單消費者在最低級別上運行。 它滿足我們的要求,因為它允許消費者“在流程中僅使用主題中分區的子集”。 但是,正如文檔所述:
SimpleConsumer確實需要使用者組中不需要的大量工作:
- 您必須跟蹤應用程序中的偏移量,才能知道從何處停止消費
- 您必須確定哪個Broker是主題和分區的主要Broker。
- 您必須處理經紀人負責人變更
如果您閱讀了建議的用于處理這些問題的代碼,則將不鼓勵您使用此使用者。
新使用者提供正確的抽象級別,并允許我們訂閱特定的分區。 他們在文檔中建議以下用例:
第一種情況是,如果進程正在維護與該分區關聯的某種本地狀態(例如本地磁盤上的鍵值存儲),因此該進程應僅獲取其在磁盤上維護的分區的記錄。
不幸的是,我們的系統使用的是Kafka 0.8,而該使用者僅從0.9開始可用。 我們沒有足夠的資源來遷移到該版本,因此我們需要堅持使用高級消費者 。
該使用者提供了一個不錯的API,但不允許我們訂閱特定的分區。 這意味著,執法服務的每個實例都將使用每條消息,即使那些無關的消息也是如此。 我們可以通過為每個實例定義不同的消費者組來實現。
利用Akka Event Bus
在上一篇文章中,我們定義了一些等待ItemDeleted消息的有限狀態機ItemDeleted 。
when(Active) {case Event(ItemDeleted(item), currentItemsToBeDeleted@ItemsToBeDeleted(items)) =>val newItemsToBeDeleted = items.filterNot(_ == item)newItemsToBeDeleted.size match {case 0 => finishWorkWith(CensorResult(Right()))case _ => stay using currentItemsToBeDeleted.copy(items = newItemsToBeDeleted)}}我們的卡夫卡消費者可以將所有消息轉發給那些演員,并讓他們丟棄/過濾不相關的物品。 但是,我們不想讓參與者浪費大量的冗余和低效的工作,因此我們將添加一層抽象,使他們能夠以一種非常有效的方式丟棄適當的消息。
final case class MsgEnvelope(partitionKey: String, payload: ItemDeleted)class ItemDeletedBus extends EventBus with LookupClassification {override type Event = MsgEnvelopeoverride type Classifier = Stringoverride type Subscriber = ActorRefoverride protected def mapSize(): Int = 128override protected def publish(event: Event, subscriber: Subscriber): Unit = subscriber ! event.payloadoverride protected def classify(event: Event): Classifier = event.partitionKeyoverride protected def compareSubscribers(a: Subscriber, b: Subscriber): Int = a.compareTo(b) }Akka Event Bus按分區為我們提供訂閱,而我們的Kafka高級消費者中缺少該分區。 我們將從卡夫卡消費者處發布每條消息到公交車上:
itemDeletedBus.publish(MsgEnvelope(item.partitionKey, ItemDeleted(item)))在上一篇文章中,我們展示了如何使用該分區鍵訂閱消息:
itemDeletedBus.subscribe(self, item.partitionKey)LookupClassification將過濾不需要的消息,因此我們的參與者不會過載。
摘要
由于Kafka提供的靈活性,我們能夠設計我們的系統以了解不同的折衷方案。 在接下來的文章中,我們將看到如何協調這些FSM的結果以向客戶端提供同步響應。
第一部分 | 第2部分 | 第三部分
翻譯自: https://www.javacodegeeks.com/2016/05/publish-subscribe-model-kafka.html
總結
以上是生活随笔為你收集整理的在Kafka中发布订阅模型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 安卓volte在哪里设置(安卓volte
- 下一篇: 实现userdetails_Spring