Kafka设计解析(八)- Kafka事务机制与Exactly Once语义实现原理
http://www.infoq.com/cn/articles/kafka-analysis-part-8#
寫在前面的話
本文所有Kafka原理性的描述除特殊說明外均基于Kafka 1.0.0版本。
為什么要提供事務機制
Kafka事務機制的實現主要是為了支持
- Exactly Once即正好一次語義
- 操作的原子性
- 有狀態操作的可恢復性
Exactly Once
《Kafka背景及架構介紹》一文中有說明Kafka在0.11.0.0之前的版本中只支持At Least Once和At Most Once語義,尚不支持Exactly Once語義。
但是在很多要求嚴格的場景下,如使用Kafka處理交易數據,Exactly Once語義是必須的。我們可以通過讓下游系統具有冪等性來配合Kafka的At Least Once語義來間接實現Exactly Once。但是:
- 該方案要求下游系統支持冪等操作,限制了Kafka的適用場景
- 實現門檻相對較高,需要用戶對Kafka的工作機制非常了解
- 對于Kafka Stream而言,Kafka本身即是自己的下游系統,但Kafka在0.11.0.0版本之前不具有冪等發送能力
因此,Kafka本身對Exactly Once語義的支持就非常必要。
操作原子性
操作的原子性是指,多個操作要么全部成功要么全部失敗,不存在部分成功部分失敗的可能。
實現原子性操作的意義在于:
- 操作結果更可控,有助于提升數據一致性
- 便于故障恢復。因為操作是原子的,從故障中恢復時只需要重試該操作(如果原操作失敗)或者直接跳過該操作(如果原操作成功),而不需要記錄中間狀態,更不需要針對中間狀態作特殊處理
實現事務機制的幾個階段
冪等性發送
上文提到,實現Exactly Once的一種方法是讓下游系統具有冪等處理特性,而在Kafka Stream中,Kafka Producer本身就是“下游”系統,因此如果能讓Producer具有冪等處理特性,那就可以讓Kafka Stream在一定程度上支持Exactly once語義。
為了實現Producer的冪等語義,Kafka引入了Producer ID(即PID)和Sequence Number。每個新的Producer在初始化的時候會被分配一個唯一的PID,該PID對用戶完全透明而不會暴露給用戶。
對于每個PID,該Producer發送數據的每個<Topic, Partition>都對應一個從0開始單調遞增的Sequence Number。
類似地,Broker端也會為每個<PID, Topic, Partition>維護一個序號,并且每次Commit一條消息時將其對應序號遞增。對于接收的每條消息,如果其序號比Broker維護的序號(即最后一次Commit的消息的序號)大一,則Broker會接受它,否則將其丟棄:
- 如果消息序號比Broker維護的序號大一以上,說明中間有數據尚未寫入,也即亂序,此時Broker拒絕該消息,Producer拋出InvalidSequenceNumber
- 如果消息序號小于等于Broker維護的序號,說明該消息已被保存,即為重復消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
上述設計解決了0.11.0.0之前版本中的兩個問題:
- Broker保存消息后,發送ACK前宕機,Producer認為消息未發送成功并重試,造成數據重復
- 前一條消息發送失敗,后一條消息發送成功,前一條消息重試后成功,造成數據亂序
事務性保證
上述冪等設計只能保證單個Producer對于同一個<Topic, Partition>的Exactly Once語義。
另外,它并不能保證寫操作的原子性——即多個寫操作,要么全部被Commit要么全部不被Commit。
更不能保證多個讀寫操作的的原子性。尤其對于Kafka Stream應用而言,典型的操作即是從某個Topic消費數據,經過一系列轉換后寫回另一個Topic,保證從源Topic的讀取與向目標Topic的寫入的原子性有助于從故障中恢復。
事務保證可使得應用程序將生產數據和消費數據當作一個原子單元來處理,要么全部成功,要么全部失敗,即使該生產或消費跨多個<Topic, Partition>。
另外,有狀態的應用也可以保證重啟后從斷點處繼續處理,也即事務恢復。
為了實現這種效果,應用程序必須提供一個穩定的(重啟后不變)唯一的ID,也即Transaction ID。Transactin ID與PID可能一一對應。區別在于Transaction ID由用戶提供,而PID是內部的實現對用戶透明。
另外,為了保證新的Producer啟動后,舊的具有相同Transaction ID的Producer即失效,每次Producer通過Transaction ID拿到PID的同時,還會獲取一個單調遞增的epoch。由于舊的Producer的epoch比新Producer的epoch小,Kafka可以很容易識別出該Producer是老的Producer并拒絕其請求。
有了Transaction ID后,Kafka可保證:
- 跨Session的數據冪等發送。當具有相同Transaction ID的新的Producer實例被創建且工作時,舊的且擁有相同Transaction ID的Producer將不再工作。
- 跨Session的事務恢復。如果某個應用實例宕機,新的實例可以保證任何未完成的舊的事務要么Commit要么Abort,使得新實例從一個正常狀態開始工作。
需要注意的是,上述的事務保證是從Producer的角度去考慮的。從Consumer的角度來看,該保證會相對弱一些。尤其是不能保證所有被某事務Commit過的所有消息都被一起消費,因為:
- 對于壓縮的Topic而言,同一事務的某些消息可能被其它版本覆蓋
- 事務包含的消息可能分布在多個Segment中(即使在同一個Partition內),當老的Segment被刪除時,該事務的部分數據可能會丟失
- Consumer在一個事務內可能通過seek方法訪問任意Offset的消息,從而可能丟失部分消息
- Consumer可能并不需要消費某一事務內的所有Partition,因此它將永遠不會讀取組成該事務的所有消息
事務機制原理
事務性消息傳遞
這一節所說的事務主要指原子性,也即Producer將多條消息作為一個事務批量發送,要么全部成功要么全部失敗。
為了實現這一點,Kafka 0.11.0.0引入了一個服務器端的模塊,名為Transaction Coordinator,用于管理Producer發送的消息的事務性。
該Transaction Coordinator維護Transaction Log,該log存于一個內部的Topic內。由于Topic數據具有持久性,因此事務的狀態也具有持久性。
Producer并不直接讀寫Transaction Log,它與Transaction Coordinator通信,然后由Transaction Coordinator將該事務的狀態插入相應的Transaction Log。
Transaction Log的設計與Offset Log用于保存Consumer的Offset類似。
事務中Offset的提交
許多基于Kafka的應用,尤其是Kafka Stream應用中同時包含Consumer和Producer,前者負責從Kafka中獲取消息,后者負責將處理完的數據寫回Kafka的其它Topic中。
為了實現該場景下的事務的原子性,Kafka需要保證對Consumer Offset的Commit與Producer對發送消息的Commit包含在同一個事務中。否則,如果在二者Commit中間發生異常,根據二者Commit的順序可能會造成數據丟失和數據重復:
- 如果先Commit Producer發送數據的事務再Commit Consumer的Offset,即At Least Once語義,可能造成數據重復。
- 如果先Commit Consumer的Offset,再Commit Producer數據發送事務,即At Most Once語義,可能造成數據丟失。
用于事務特性的控制型消息
為了區分寫入Partition的消息被Commit還是Abort,Kafka引入了一種特殊類型的消息,即Control Message。該類消息的Value內不包含任何應用相關的數據,并且不會暴露給應用程序。它只用于Broker與Client間的內部通信。
對于Producer端事務,Kafka以Control Message的形式引入一系列的Transaction Marker。Consumer即可通過該標記判定對應的消息被Commit了還是Abort了,然后結合該Consumer配置的隔離級別決定是否應該將該消息返回給應用程序。
事務處理樣例代碼
Producer<String, String> producer = new KafkaProducer<String, String>(props); // 初始化事務,包括結束該Transaction ID對應的未完成的事務(如果有) // 保證新的事務在一個正確的狀態下啟動 producer.initTransactions(); // 開始事務 producer.beginTransaction(); // 消費數據 ConsumerRecords<String, String> records = consumer.poll(100); try{ // 發送數據 producer.send(new ProducerRecord<String, String>("Topic", "Key", "Value")); // 發送消費數據的Offset,將上述數據消費與數據發送納入同一個Transaction內 producer.sendOffsetsToTransaction(offsets, "group1"); // 數據發送及Offset發送均成功的情況下,提交事務 producer.commitTransaction(); } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) { // 數據發送或者Offset發送出現異常時,終止事務 producer.abortTransaction(); } finally { // 關閉Producer和Consumer producer.close(); consumer.close(); }完整事務過程
找到Transaction Coordinator
由于Transaction Coordinator是分配PID和管理事務的核心,因此Producer要做的第一件事情就是通過向任意一個Broker發送FindCoordinator請求找到Transaction Coordinator的位置。
注意:只有應用程序為Producer配置了Transaction ID時才可使用事務特性,也才需要這一步。另外,由于事務性要求Producer開啟冪等特性,因此通過將transactional.id設置為非空從而開啟事務特性的同時也需要通過將enable.idempotence設置為true來開啟冪等特性。
獲取PID
找到Transaction Coordinator后,具有冪等特性的Producer必須發起InitPidRequest請求以獲取PID。
注意:只要開啟了冪等特性即必須執行該操作,而無須考慮該Producer是否開啟了事務特性。
* 如果事務特性被開啟 *
InitPidRequest會發送給Transaction Coordinator。如果Transaction Coordinator是第一次收到包含有該Transaction ID的InitPidRequest請求,它將會把該<TransactionID, PID>存入Transaction Log,如上圖中步驟2.1所示。這樣可保證該對應關系被持久化,從而保證即使Transaction Coordinator宕機該對應關系也不會丟失。
除了返回PID外,InitPidRequest還會執行如下任務:
- 增加該PID對應的epoch。具有相同PID但epoch小于該epoch的其它Producer(如果有)新開啟的事務將被拒絕。
- 恢復(Commit或Abort)之前的Producer未完成的事務(如果有)。
注意:InitPidRequest的處理過程是同步阻塞的。一旦該調用正確返回,Producer即可開始新的事務。
另外,如果事務特性未開啟,InitPidRequest可發送至任意Broker,并且會得到一個全新的唯一的PID。該Producer將只能使用冪等特性以及單一Session內的事務特性,而不能使用跨Session的事務特性。
開啟事務
Kafka從0.11.0.0版本開始,提供beginTransaction()方法用于開啟一個事務。調用該方法后,Producer本地會記錄已經開啟了事務,但Transaction Coordinator只有在Producer發送第一條消息后才認為事務已經開啟。
Consume-Transform-Produce
這一階段,包含了整個事務的數據處理過程,并且包含了多種請求。
AddPartitionsToTxnRequest
一個Producer可能會給多個<Topic, Partition>發送數據,給一個新的<Topic, Partition>發送數據前,它需要先向Transaction Coordinator發送AddPartitionsToTxnRequest。
Transaction Coordinator會將該<Transaction, Topic, Partition>存于Transaction Log內,并將其狀態置為BEGIN,如上圖中步驟4.1所示。有了該信息后,我們才可以在后續步驟中為每個Topic, Partition>設置COMMIT或者ABORT標記(如上圖中步驟5.2所示)。
另外,如果該<Topic, Partition>為該事務中第一個<Topic, Partition>,Transaction Coordinator還會啟動對該事務的計時(每個事務都有自己的超時時間)。
ProduceRequest
Producer通過一個或多個ProduceRequest發送一系列消息。除了應用數據外,該請求還包含了PID,epoch,和Sequence Number。該過程如上圖中步驟4.2所示。
AddOffsetsToTxnRequest
為了提供事務性,Producer新增了sendOffsetsToTransaction方法,該方法將多組消息的發送和消費放入同一批處理內。
該方法先判斷在當前事務中該方法是否已經被調用并傳入了相同的Group ID。若是,直接跳到下一步;若不是,則向Transaction Coordinator發送AddOffsetsToTxnRequests請求,Transaction Coordinator將對應的所有<Topic, Partition>存于Transaction Log中,并將其狀態記為BEGIN,如上圖中步驟4.3所示。該方法會阻塞直到收到響應。
TxnOffsetCommitRequest
作為sendOffsetsToTransaction方法的一部分,在處理完AddOffsetsToTxnRequest后,Producer也會發送TxnOffsetCommit請求給Consumer Coordinator從而將本事務包含的與讀操作相關的各<Topic, Partition>的Offset持久化到內部的__consumer_offsets中,如上圖步驟4.4所示。
在此過程中,Consumer Coordinator會通過PID和對應的epoch來驗證是否應該允許該Producer的該請求。
這里需要注意:
- 寫入__consumer_offsets的Offset信息在當前事務Commit前對外是不可見的。也即在當前事務被Commit前,可認為該Offset尚未Commit,也即對應的消息尚未被完成處理。
- Consumer Coordinator并不會立即更新緩存中相應<Topic, Partition>的Offset,因為此時這些更新操作尚未被COMMIT或ABORT。
Commit或Abort事務
一旦上述數據寫入操作完成,應用程序必須調用KafkaProducer的commitTransaction方法或者abortTransaction方法以結束當前事務。
EndTxnRequest
commitTransaction方法使得Producer寫入的數據對下游Consumer可見。abortTransaction方法通過Transaction Marker將Producer寫入的數據標記為Aborted狀態。下游的Consumer如果將isolation.level設置為READ_COMMITTED,則它讀到被Abort的消息后直接將其丟棄而不會返回給客戶程序,也即被Abort的消息對應用程序不可見。
無論是Commit還是Abort,Producer都會發送EndTxnRequest請求給Transaction Coordinator,并通過標志位標識是應該Commit還是Abort。
收到該請求后,Transaction Coordinator會進行如下操作
補充說明:對于commitTransaction方法,它會在發送EndTxnRequest之前先調用flush方法以確保所有發送出去的數據都得到相應的ACK。對于abortTransaction方法,在發送EndTxnRequest之前直接將當前Buffer中的事務性消息(如果有)全部丟棄,但必須等待所有被發送但尚未收到ACK的消息發送完成。
上述第二步是實現將一組讀操作與寫操作作為一個事務處理的關鍵。因為Producer寫入的數據Topic以及記錄Comsumer Offset的Topic會被寫入相同的Transactin Marker,所以這一組讀操作與寫操作要么全部COMMIT要么全部ABORT。
WriteTxnMarkerRequest
上面提到的WriteTxnMarkerRequest由Transaction Coordinator發送給當前事務涉及到的每個<Topic, Partition>的Leader。收到該請求后,對應的Leader會將對應的COMMIT(PID)或者ABORT(PID)控制信息寫入日志,如上圖中步驟5.2所示。
該控制消息向Broker以及Consumer表明對應PID的消息被Commit了還是被Abort了。
這里要注意,如果事務也涉及到__consumer_offsets,即該事務中有消費數據的操作且將該消費的Offset存于__consumer_offsets中,Transaction Coordinator也需要向該內部Topic的各Partition的Leader發送WriteTxnMarkerRequest從而寫入COMMIT(PID)或COMMIT(PID)控制信息。
寫入最終的COMPLETE_COMMIT或COMPLETE_ABORT消息
寫完所有的Transaction Marker后,Transaction Coordinator會將最終的COMPLETE_COMMIT或COMPLETE_ABORT消息寫入Transaction Log中以標明該事務結束,如上圖中步驟5.3所示。
此時,Transaction Log中所有關于該事務的消息全部可以移除。當然,由于Kafka內數據是Append Only的,不可直接更新和刪除,這里說的移除只是將其標記為null從而在Log Compact時不再保留。
另外,COMPLETE_COMMIT或COMPLETE_ABORT的寫入并不需要得到所有Rreplica的ACK,因為如果該消息丟失,可以根據事務協議重發。
補充說明,如果參與該事務的某些<Topic, Partition>在被寫入Transaction Marker前不可用,它對READ_COMMITTED的Consumer不可見,但不影響其它可用<Topic, Partition>的COMMIT或ABORT。在該<Topic, Partition>恢復可用后,Transaction Coordinator會重新根據PREPARE_COMMIT或PREPARE_ABORT向該<Topic, Partition>發送Transaction Marker。
總結
- PID與Sequence Number的引入實現了寫操作的冪等性
- 寫操作的冪等性結合At Least Once語義實現了單一Session內的Exactly Once語義
- Transaction Marker與PID提供了識別消息是否應該被讀取的能力,從而實現了事務的隔離性
- Offset的更新標記了消息是否被讀取,從而將對讀操作的事務處理轉換成了對寫(Offset)操作的事務處理
- Kafka事務的本質是,將一組寫操作(如果有)對應的消息與一組讀操作(如果有)對應的Offset的更新進行同樣的標記(即Transaction Marker)來實現事務中涉及的所有讀寫操作同時對外可見或同時對外不可見
- Kafka只提供對Kafka本身的讀寫操作的事務性,不提供包含外部系統的事務性
異常處理
Exception處理
InvalidProducerEpoch
這是一種Fatal Error,它說明當前Producer是一個過期的實例,有Transaction ID相同但epoch更新的Producer實例被創建并使用。此時Producer會停止并拋出Exception。
InvalidPidMapping
Transaction Coordinator沒有與該Transaction ID對應的PID。此時Producer會通過包含有Transaction ID的InitPidRequest請求創建一個新的PID。
NotCorrdinatorForGTransactionalId
該Transaction Coordinator不負責該當前事務。Producer會通過FindCoordinatorRequest請求重新尋找對應的Transaction Coordinator。
InvalidTxnRequest
違反了事務協議。正確的Client實現不應該出現這種Exception。如果該異常發生了,用戶需要檢查自己的客戶端實現是否有問題。
CoordinatorNotAvailable
Transaction Coordinator仍在初始化中。Producer只需要重試即可。
DuplicateSequenceNumber
發送的消息的序號低于Broker預期。該異常說明該消息已經被成功處理過,Producer可以直接忽略該異常并處理下一條消息
InvalidSequenceNumber
這是一個Fatal Error,它說明發送的消息中的序號大于Broker預期。此時有兩種可能
- 數據亂序。比如前面的消息發送失敗后重試期間,新的消息被接收。正常情況下不應該出現該問題,因為當冪等發送啟用時,max.inflight.requests.per.connection被強制設置為1,而acks被強制設置為all。故前面消息重試期間,后續消息不會被發送,也即不會發生亂序。并且只有ISR中所有Replica都ACK,Producer才會認為消息已經被發送,也即不存在Broker端數據丟失問題。
- 服務器由于日志被Truncate而造成數據丟失。此時應該停止Producer并將此Fatal Error報告給用戶。
InvalidTransactionTimeout
InitPidRequest調用出現的Fatal Error。它表明Producer傳入的timeout時間不在可接受范圍內,應該停止Producer并報告給用戶。
處理Transaction Coordinator失敗
寫PREPARE_COMMIT/PREPARE_ABORT前失敗
Producer通過FindCoordinatorRequest找到新的Transaction Coordinator,并通過EndTxnRequest請求發起COMMIT或ABORT流程,新的Transaction Coordinator繼續處理EndTxnRequest請求——寫PREPARE_COMMIT或PREPARE_ABORT,寫Transaction Marker,寫COMPLETE_COMMIT或COMPLETE_ABORT。
寫完PREPARE_COMMIT/PREPARE_ABORT后失敗
此時舊的Transaction Coordinator可能已經成功寫入部分Transaction Marker。新的Transaction Coordinator會重復這些操作,所以部分Partition中可能會存在重復的COMMIT或ABORT,但只要該Producer在此期間沒有發起新的事務,這些重復的Transaction Marker就不是問題。
寫完COMPLETE_COMMIT/ABORT后失敗
舊的Transaction Coordinator可能已經寫完了COMPLETE_COMMIT或COMPLETE_ABORT但在返回EndTxnRequest之前失敗。該場景下,新的Transaction Coordinator會直接給Producer返回成功。
事務過期機制
事務超時
transaction.timeout.ms
終止過期事務
當Producer失敗時,Transaction Coordinator必須能夠主動的讓某些進行中的事務過期。否則沒有Producer的參與,Transaction Coordinator無法判斷這些事務應該如何處理,這會造成:
- 如果這種進行中事務太多,會造成Transaction Coordinator需要維護大量的事務狀態,大量占用內存
- Transaction Log內也會存在大量數據,造成新的Transaction Coordinator啟動緩慢
- READ_COMMITTED的Consumer需要緩存大量的消息,造成不必要的內存浪費甚至是OOM
- 如果多個Transaction ID不同的Producer交叉寫同一個Partition,當一個Producer的事務狀態不更新時,READ_COMMITTED的Consumer為了保證順序消費而被阻塞
為了避免上述問題,Transaction Coordinator會周期性遍歷內存中的事務狀態Map,并執行如下操作
- 如果狀態是BEGIN并且其最后更新時間與當前時間差大于transaction.remove.expired.transaction.cleanup.interval.ms(默認值為1小時),則主動將其終止:1)未避免原Producer臨時恢復與當前終止流程沖突,增加該Producer對應的PID的epoch,并確保將該更新的信息寫入Transaction Log;2)以更新后的epoch回滾事務,從而使得該事務相關的所有Broker都更新其緩存的該PID的epoch從而拒絕舊Producer的寫操作
- 如果狀態是PREPARE_COMMIT,完成后續的COMMIT流程————向各<Topic, Partition>寫入Transaction Marker,在Transaction Log內寫入COMPLETE_COMMIT
- 如果狀態是PREPARE_ABORT,完成后續ABORT流程
終止Transaction ID
某Transaction ID的Producer可能很長時間不再發送數據,Transaction Coordinator沒必要再保存該Transaction ID與PID等的映射,否則可能會造成大量的資源浪費。因此需要有一個機制探測不再活躍的Transaction ID并將其信息刪除。
Transaction Coordinator會周期性遍歷內存中的Transaction ID與PID映射,如果某Transaction ID沒有對應的正在進行中的事務并且它對應的最后一個事務的結束時間與當前時間差大于transactional.id.expiration.ms(默認值是7天),則將其從內存中刪除并在Transaction Log中將其對應的日志的值設置為null從而使得Log Compact可將其記錄刪除。
與其它系統事務機制對比
PostgreSQL MVCC
Kafka的事務機制與《MVCC PostgreSQL實現事務和多版本并發控制的精華》一文中介紹的PostgreSQL通過MVCC實現事務的機制非常類似,對于事務的回滾,并不需要刪除已寫入的數據,都是將寫入數據的事務標記為Rollback/Abort從而在讀數據時過濾該數據。
兩階段提交
Kafka的事務機制與《分布式事務(一)兩階段提交及JTA》一文中所介紹的兩階段提交機制看似相似,都分PREPARE階段和最終COMMIT階段,但又有很大不同。
- Kafka事務機制中,PREPARE時即要指明是PREPARE_COMMIT還是PREPARE_ABORT,并且只須在Transaction Log中標記即可,無須其它組件參與。而兩階段提交的PREPARE需要發送給所有的分布式事務參與方,并且事務參與方需要盡可能準備好,并根據準備情況返回Prepared或Non-Prepared狀態給事務管理器。
- Kafka事務中,一但發起PREPARE_COMMIT或PREPARE_ABORT,則確定該事務最終的結果應該是被COMMIT或ABORT。而分布式事務中,PREPARE后由各事務參與方返回狀態,只有所有參與方均返回Prepared狀態才會真正執行COMMIT,否則執行ROLLBACK
- Kafka事務機制中,某幾個Partition在COMMIT或ABORT過程中變為不可用,只影響該Partition不影響其它Partition。兩階段提交中,若唯一收到COMMIT命令參與者Crash,其它事務參與方無法判斷事務狀態從而使得整個事務阻塞
- Kafka事務機制引入事務超時機制,有效避免了掛起的事務影響其它事務的問題
- Kafka事務機制中存在多個Transaction Coordinator實例,而分布式事務中只有一個事務管理器
Zookeeper
Zookeeper的原子廣播協議與兩階段提交以及Kafka事務機制有相似之處,但又有各自的特點
- Kafka事務可COMMIT也可ABORT。而Zookeeper原子廣播協議只有COMMIT沒有ABORT。當然,Zookeeper不COMMIT某消息也即等效于ABORT該消息的更新。
- Kafka存在多個Transaction Coordinator實例,擴展性較好。而Zookeeper寫操作只能在Leader節點進行,所以其寫性能遠低于讀性能。
- Kafka事務是COMMIT還是ABORT完全取決于Producer即客戶端。而Zookeeper原子廣播協議中某條消息是否被COMMIT取決于是否有一大半FOLLOWER ACK該消息。
轉載于:https://www.cnblogs.com/davidwang456/articles/7884705.html
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的Kafka设计解析(八)- Kafka事务机制与Exactly Once语义实现原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kafka设计解析(七)- 流式计算的新
- 下一篇: 从 Zero 到 Hero ,一文掌握