流式计算新贵Kafka Stream设计详解--转
原文地址:https://mp.weixin.qq.com/s?__biz=MzA5NzkxMzg1Nw==&mid=2653162822&idx=1&sn=8c46114360b98b621b166d41d8e01d74&chksm=8b493028bc3eb93e8376d85c7d1f9b2a699888b7f0f52e4556bb8543ebebd5e102e91ea23355#rd
本文介紹了 Kafka Stream 的背景,如 Kafka Stream 是什么,什么是流式計算,以及為什么要有 Kafka Stream。接著介紹了 Kafka Stream 的整體架構、并行模型、狀態存儲以及主要的兩種數據集 KStream 和 KTable。然后分析了 Kafka Stream 如何解決流式系統中的關鍵問題,如時間定義、窗口操作、Join 操作、聚合操作,以及如何處理亂序和提供容錯能力。最后結合示例講解了如何使用 Kafka Stream。Kafka Stream 背景??Kafka Stream 是什么??
Kafka Stream 是 Apache Kafka 從 0.10 版本引入的一個新 Feature。它提供了對存儲于 Kafka 內的數據進行流式處理和分析的功能。
Kafka Stream 的特點如下:
-
Kafka Stream 提供了一個非常簡單而輕量的 Library,它可以非常方便地嵌入任意 Java 應用中,也可以任意方式打包和部署
-
除了 Kafka 外,無任何外部依賴
-
充分利用 Kafka 分區機制實現水平擴展和順序性保證
-
通過可容錯的 state store 實現高效的狀態操作(如 windowed join 和 aggregation)
-
支持正好一次處理語義
-
提供記錄級的處理能力,從而實現毫秒級的低延遲
-
支持基于事件時間的窗口操作,并且可處理晚到的數據(late arrival of records)
-
同時提供底層的處理原語 Processor(類似于 Storm 的 spout 和 bolt),以及高層抽象的 DSL(類似于 Spark 的 map/group/reduce)
什么是流式計算??
一般流式計算會與批量計算相比較。在流式計算模型中,輸入是持續的,可以認為在時間上是無界的,也就意味著,永遠拿不到全量數據去做計算。同時,計算結果是持續輸出的,也即計算結果在時間上也是無界的。流式計算一般對實時性要求較高,同時一般是先定義目標計算,然后數據到來之后將計算邏輯應用于數據。同時為了提高計算效率,往往盡可能采用增量計算代替全量計算。
批量處理模型中,一般先有全量數據集,然后定義計算邏輯,并將計算應用于全量數據。特點是全量計算,并且計算結果一次性全量輸出。
?
為什么要有 Kafka Stream??
當前已經有非常多的流式處理系統,最知名且應用最多的開源流式處理系統有 Spark Streaming 和 Apache Storm。Apache Storm 發展多年,應用廣泛,提供記錄級別的處理能力,當前也支持 SQL on Stream。而 Spark Streaming 基于 Apache Spark,可以非常方便與圖計算,SQL 處理等集成,功能強大,對于熟悉其它 Spark 應用開發的用戶而言使用門檻低。另外,目前主流的 Hadoop 發行版,如 MapR,Cloudera 和 Hortonworks,都集成了 Apache Storm 和 Apache Spark,使得部署更容易。
既然 Apache Spark 與 Apache Storm 擁用如此多的優勢,那為何還需要 Kafka Stream 呢?筆者認為主要有如下原因。
第一,Spark 和 Storm 都是流式處理框架,而 Kafka Stream 提供的是一個基于 Kafka 的流式處理類庫??蚣芤箝_發者按照特定的方式去開發邏輯部分,供框架調用。開發者很難了解框架的具體運行方式,從而使得調試成本高,并且使用受限。而 Kafka Stream 作為流式處理類庫,直接提供具體的類給開發者調用,整個應用的運行方式主要由開發者控制,方便使用和調試。
第二,雖然 Cloudera 與 Hortonworks 方便了 Storm 和 Spark 的部署,但是這些框架的部署仍然相對復雜。而 Kafka Stream 作為類庫,可以非常方便的嵌入應用程序中,它對應用的打包和部署基本沒有任何要求。更為重要的是,Kafka Stream 充分利用了 Kafka 的分區機制和 Consumer 的 Rebalance 機制,使得 Kafka Stream 可以非常方便的水平擴展,并且各個實例可以使用不同的部署方式。
具體來說,每個運行 Kafka Stream 的應用程序實例都包含了 Kafka Consumer 實例,多個同一應用的實例之間并行處理數據集。而不同實例之間的部署方式并不要求一致,比如部分實例可以運行在 Web 容器中,部分實例可運行在 Docker 或 Kubernetes 中。
第三,就流式處理系統而言,基本都支持 Kafka 作為數據源。例如 Storm 具有專門的 kafka-spout,而 Spark 也提供專門的 spark-streaming-kafka 模塊。事實上,Kafka 基本上是主流的流式處理系統的標準數據源。換言之,大部分流式系統中都已部署了 Kafka,此時使用 Kafka Stream 的成本非常低。
第四,使用 Storm 或 Spark Streaming 時,需要為框架本身的進程預留資源,如 Storm 的 supervisor 和 Spark on YARN 的 node manager。即使對于應用實例而言,框架本身也會占用部分資源,如 Spark Streaming 需要為 shuffle 和 storage 預留內存。
第五,由于 Kafka 本身提供數據持久化,因此 Kafka Stream 提供滾動部署和滾動升級以及重新計算的能力。
第六,由于 Kafka Consumer Rebalance 機制,Kafka Stream 可以在線動態調整并行度。
Kafka Stream 架構??
?
Kafka Stream 整體架構??
Kafka Stream 的整體架構圖如下。
?
目前(Kafka 0.11.0.0)Kafka Stream 的數據源只能如上圖所示是 Kafka。但是處理結果并不一定要如上圖所示輸出到 Kafka。實際上 KStream 和 Ktable 的實例化都需要指定 Topic。
KStream<String, String> stream = builder.stream("words-stream");KTable<String, String> table = builder.table("words-table", "words-store");另外,上圖中的 Consumer 和 Producer 并不需要開發者在應用中顯示實例化,而是由 Kafka Stream 根據參數隱式實例化和管理,從而降低了使用門檻。開發者只需要專注于開發核心業務邏輯,也即上圖中 Task 內的部分。
Processor Topology??
基于 Kafka Stream 的流式應用的業務邏輯全部通過一個被稱為 Processor Topology 的地方執行。它與 Storm 的 Topology 和 Spark 的 DAG 類似,都定義了數據在各個處理單元(在 Kafka Stream 中被稱作 Processor)間的流動方式,或者說定義了數據的處理邏輯。
下面是一個 Processor 的示例,它實現了 Word Count 功能,并且每秒輸出一次結果。
public class WordCountProcessor implements Processor<String, String> {private ProcessorContext context;private KeyValueStore<String, Integer> kvStore;@SuppressWarnings("unchecked")@Overridepublic void init(ProcessorContext context) {this.context = context;this.context.schedule(1000);this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts");}@Overridepublic void process(String key, String value) {Stream.of(value.toLowerCase().split(" ")).forEach((String word) -> {Optional<Integer> counts = Optional.ofNullable(kvStore.get(word));int count = counts.map(wordcount -> wordcount + 1).orElse(1);kvStore.put(word, count);});}@Overridepublic void punctuate(long timestamp) {KeyValueIterator<String, Integer> iterator = this.kvStore.all();iterator.forEachRemaining(entry -> {context.forward(entry.key, entry.value);this.kvStore.delete(entry.key);});context.commit();}@Overridepublic void close() {this.kvStore.close();}}從上述代碼中可見
-
process 定義了對每條記錄的處理邏輯,也印證了 Kafka 可具有記錄級的數據處理能力。
-
context.scheduler 定義了 punctuate 被執行的周期,從而提供了實現窗口操作的能力。
-
context.getStateStore 提供的狀態存儲為有狀態計算(如窗口,聚合)提供了可能。
Kafka Stream 并行模型??
Kafka Stream 的并行模型中,最小粒度為 Task,而每個 Task 包含一個特定子 Topology 的所有 Processor。因此每個 Task 所執行的代碼完全一樣,唯一的不同在于所處理的數據集互補。這一點跟 Storm 的 Topology 完全不一樣。Storm 的 Topology 的每一個 Task 只包含一個 Spout 或 Bolt 的實例。因此 Storm 的一個 Topology 內的不同 Task 之間需要通過網絡通信傳遞數據,而 Kafka Stream 的 Task 包含了完整的子 Topology,所以 Task 之間不需要傳遞數據,也就不需要網絡通信。這一點降低了系統復雜度,也提高了處理效率。
如果某個 Stream 的輸入 Topic 有多個 (比如 2 個 Topic,1 個 Partition 數為 4,另一個 Partition 數為 3),則總的 Task 數等于 Partition 數最多的那個 Topic 的 Partition 數(max(4,3)=4)。這是因為 Kafka Stream 使用了 Consumer 的 Rebalance 機制,每個 Partition 對應一個 Task。
下圖展示了在一個進程(Instance)中以 2 個 Topic(Partition 數均為 4)為數據源的 Kafka Stream 應用的并行模型。從圖中可以看到,由于 Kafka Stream 應用的默認線程數為 1,所以 4 個 Task 全部在一個線程中運行。
為了充分利用多線程的優勢,可以設置 Kafka Stream 的線程數。下圖展示了線程數為 2 時的并行模型。
前文有提到,Kafka Stream 可被嵌入任意 Java 應用(理論上基于 JVM 的應用都可以)中,下圖展示了在同一臺機器的不同進程中同時啟動同一 Kafka Stream 應用時的并行模型。注意,這里要保證兩個進程的 StreamsConfig.APPLICATION_ID_CONFIG 完全一樣。因為 Kafka Stream 將 APPLICATION_ID_CONFI 作為隱式啟動的 Consumer 的 Group ID。只有保證 APPLICATION_ID_CONFI 相同,才能保證這兩個進程的 Consumer 屬于同一個 Group,從而可以通過 Consumer Rebalance 機制拿到互補的數據集。
既然實現了多進程部署,可以以同樣的方式實現多機器部署。該部署方式也要求所有進程的 APPLICATION_ID_CONFIG 完全一樣。從圖上也可以看到,每個實例中的線程數并不要求一樣。但是無論如何部署,Task 總數總會保證一致。
注意:Kafka Stream 的并行模型,非常依賴于《Kafka 設計解析(一)- Kafka 背景及架構介紹》一文中介紹的 Kafka 分區機制和《Kafka 設計解析(四)- Kafka Consumer 設計解析》中介紹的 Consumer 的 Rebalance 機制。強烈建議不太熟悉這兩種機制的朋友,先行閱讀這兩篇文章。
這里對比一下 Kafka Stream 的 Processor Topology 與 Storm 的 Topology。
-
Storm 的 Topology 由 Spout 和 Bolt 組成,Spout 提供數據源,而 Bolt 提供計算和數據導出。Kafka Stream 的 Processor Topology 完全由 Processor 組成,因為它的數據固定由 Kafka 的 Topic 提供。
-
Storm 的不同 Bolt 運行在不同的 Executor 中,很可能位于不同的機器,需要通過網絡通信傳輸數據。而 Kafka Stream 的 Processor Topology 的不同 Processor 完全運行于同一個 Task 中,也就完全處于同一個線程,無需網絡通信。
-
Storm 的 Topology 可以同時包含 Shuffle 部分和非 Shuffle 部分,并且往往一個 Topology 就是一個完整的應用。而 Kafka Stream 的一個物理 Topology 只包含非 Shuffle 部分,而 Shuffle 部分需要通過 through 操作顯示完成,該操作將一個大的 Topology 分成了 2 個子 Topology。
-
Storm 的 Topology 內,不同 Bolt/Spout 的并行度可以不一樣,而 Kafka Stream 的子 Topology 內,所有 Processor 的并行度完全一樣。
-
Storm 的一個 Task 只包含一個 Spout 或者 Bolt 的實例,而 Kafka Stream 的一個 Task 包含了一個子 Topology 的所有 Processor。
KTable vs. KStream??
KTable 和 KStream 是 Kafka Stream 中非常重要的兩個概念,它們是 Kafka 實現各種語義的基礎。因此這里有必要分析下二者的區別。
KStream 是一個數據流,可以認為所有記錄都通過 Insert only 的方式插入進這個數據流里。而 KTable 代表一個完整的數據集,可以理解為數據庫中的表。
由于每條記錄都是 Key-Value 對,這里可以將 Key 理解為數據庫中的 Primary Key,而 Value 可以理解為一行記錄??梢哉J為 KTable 中的數據都是通過 Update only 的方式進入的。也就意味著,如果 KTable 對應的 Topic 中新進入的數據的 Key 已經存在,那么從 KTable 只會取出同一 Key 對應的最后一條數據,相當于新的數據更新了舊的數據。
以下圖為例,假設有一個 KStream 和 KTable,基于同一個 Topic 創建,并且該 Topic 中包含如下圖所示 5 條數據。此時遍歷 KStream 將得到與 Topic 內數據完全一樣的所有 5 條數據,且順序不變。而此時遍歷 KTable 時,因為這 5 條記錄中有 3 個不同的 Key,所以將得到 3 條記錄,每個 Key 對應最新的值,并且這三條數據之間的順序與原來在 Topic 中的順序保持一致。這一點與 Kafka 的日志 compact 相同。
此時如果對該 KStream 和 KTable 分別基于 key 做 Group,對 Value 進行 Sum,得到的結果將會不同。對 KStream 的計算結果是<jack,4><Jack,4>,<Lily,7>,<Mike,4><lily,7><mike,4>。而對 Ktable 的計算結果是<mike,4><Mike,4>,<Jack,3>,<Lily,5><jack,3><lily,5>。
State store??
流式處理中,部分操作是無狀態的,例如過濾操作(Kafka Stream DSL 中用 filer 方法實現)。而部分操作是有狀態的,需要記錄中間狀態,如 Window 操作和聚合計算。State store 被用來存儲中間狀態。它可以是一個持久化的 Key-Value 存儲,也可以是內存中的 HashMap,或者是數據庫。Kafka 提供了基于 Topic 的狀態存儲。
Topic 中存儲的數據記錄本身是 Key-Value 形式的,同時 Kafka 的 log compaction 機制可對歷史數據做 compact 操作,保留每個 Key 對應的最后一個 Value,從而在保證 Key 不丟失的前提下,減少總數據量,從而提高查詢效率。
構造 KTable 時,需要指定其 state store name。默認情況下,該名字也即用于存儲該 KTable 的狀態的 Topic 的名字,遍歷 KTable 的過程,實際就是遍歷它對應的 state store,或者說遍歷 Topic 的所有 key,并取每個 Key 最新值的過程。為了使得該過程更加高效,默認情況下會對該 Topic 進行 compact 操作。
另外,除了 KTable,所有狀態計算,都需要指定 state store name,從而記錄中間狀態。
Kafka Stream 如何解決流式系統中關鍵問題??時間??
在流式數據處理中,時間是數據的一個非常重要的屬性。從 Kafka 0.10 開始,每條記錄除了 Key 和 Value 外,還增加了 timestamp 屬性。目前 Kafka Stream 支持三種時間
-
事件發生時間。事件發生的時間,包含在數據記錄中。發生時間由 Producer 在構造 ProducerRecord 時指定。并且需要 Broker 或者 Topic 將 message.timestamp.type 設置為 CreateTime(默認值)才能生效。
-
消息接收時間,也即消息存入 Broker 的時間。當 Broker 或 Topic 將 message.timestamp.type 設置為 LogAppendTime 時生效。此時 Broker 會在接收到消息后,存入磁盤前,將其 timestamp 屬性值設置為當前機器時間。一般消息接收時間比較接近于事件發生時間,部分場景下可代替事件發生時間。
-
消息處理時間,也即 Kafka Stream 處理消息時的時間。
注:Kafka Stream 允許通過實現 org.apache.kafka.streams.processor.TimestampExtractor 接口自定義記錄時間。
窗口??
前文提到,流式數據是在時間上無界的數據。而聚合操作只能作用在特定的數據集,也即有界的數據集上。因此需要通過某種方式從無界的數據集上按特定的語義選取出有界的數據。窗口是一種非常常用的設定計算邊界的方式。不同的流式處理系統支持的窗口類似,但不盡相同。
Kafka Stream 支持的窗口如下。
1、Hopping Time Window 該窗口定義如下圖所示。它有兩個屬性,一個是 Window size,一個是 Advance interval。Window size 指定了窗口的大小,也即每次計算的數據集的大小。而 Advance interval 定義輸出的時間間隔。一個典型的應用場景是,每隔 5 秒鐘輸出一次過去 1 個小時內網站的 PV 或者 UV。
2、Tumbling Time Window 該窗口定義如下圖所示。可以認為它是 Hopping Time Window 的一種特例,也即 Window size 和 Advance interval 相等。它的特點是各個 Window 之間完全不相交。
3、Sliding Window 該窗口只用于 2 個 KStream 進行 Join 計算時。該窗口的大小定義了 Join 兩側 KStream 的數據記錄被認為在同一個窗口的最大時間差。假設該窗口的大小為 5 秒,則參與 Join 的 2 個 KStream 中,記錄時間差小于 5 的記錄被認為在同一個窗口中,可以進行 Join 計算。
4、Session Window 該窗口用于對 Key 做 Group 后的聚合操作中。它需要對 Key 做分組,然后對組內的數據根據業務需求定義一個窗口的起始點和結束點。一個典型的案例是,希望通過 Session Window 計算某個用戶訪問網站的時間。對于一個特定的用戶(用 Key 表示)而言,當發生登錄操作時,該用戶(Key)的窗口即開始,當發生退出操作或者超時時,該用戶(Key)的窗口即結束。窗口結束時,可計算該用戶的訪問時間或者點擊次數等。
Join??
Kafka Stream 由于包含 KStream 和 Ktable 兩種數據集,因此提供如下 Join 計算
-
KTable Join KTable 結果仍為 KTable。任意一邊有更新,結果 KTable 都會更新。
-
KStream Join KStream 結果為 KStream。必須帶窗口操作,否則會造成 Join 操作一直不結束。
-
KStream Join KTable / GlobakKTable 結果為 KStream。只有當 KStream 中有新數據時,才會觸發 Join 計算并輸出結果。KStream 無新數據時,KTable 的更新并不會觸發 Join 計算,也不會輸出數據。并且該更新只對下次 Join 生效。一個典型的使用場景是,KStream 中的訂單信息與 KTable 中的用戶信息做關聯計算。
對于 Join 操作,如果要得到正確的計算結果,需要保證參與 Join 的 KTable 或 KStream 中 Key 相同的數據被分配到同一個 Task。具體方法是
-
參與 Join 的 KTable 或 KStream 的 Key 類型相同(實際上,業務含意也應該相同)
-
參與 Join 的 KTable 或 KStream 對應的 Topic 的 Partition 數相同Partitioner 策略的最終結果等效(實現不需要完全一樣,只要效果一樣即可),也即 Key 相同的情況下,被分配到 ID 相同的 Partition 內
-
如果上述條件不滿足,可通過調用如下方法使得它滿足上述條件。
KStream<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<K, V> partitioner, String topic)
聚合與亂序處理??
聚合操作可應用于 KStream 和 KTable。當聚合發生在 KStream 上時必須指定窗口,從而限定計算的目標數據集。
需要說明的是,聚合操作的結果肯定是 KTable。因為 KTable 是可更新的,可以在晚到的數據到來時(也即發生數據亂序時)更新結果 KTable。
這里舉例說明。假設對 KStream 以 5 秒為窗口大小,進行 Tumbling Time Window 上的 Count 操作。并且 KStream 先后出現時間為 1 秒, 3 秒, 5 秒的數據,此時 5 秒的窗口已達上限,Kafka Stream 關閉該窗口,觸發 Count 操作并將結果 3 輸出到 KTable 中(假設該結果表示為<1-5,3>)。若 1 秒后,又收到了時間為 2 秒的記錄,由于 1-5 秒的窗口已關閉,若直接拋棄該數據,則可認為之前的結果<1-5,3>不準確。
而如果直接將完整的結果<1-5,4>輸出到 KStream 中,則 KStream 中將會包含該窗口的 2 條記錄,<1-5,3>, <1-5,4>,也會存在骯數據。因此 Kafka Stream 選擇將聚合結果存于 KTable 中,此時新的結果<1-5,4>會替代舊的結果<1-5,3>。用戶可得到完整的正確的結果。
這種方式保證了數據準確性,同時也提高了容錯性。
但需要說明的是,Kafka Stream 并不會對所有晚到的數據都重新計算并更新結果集,而是讓用戶設置一個 retention period,將每個窗口的結果集在內存中保留一定時間,該窗口內的數據晚到時,直接合并計算,并更新結果 KTable。超過 retention period 后,該窗口結果將從內存中刪除,并且晚到的數據即使落入窗口,也會被直接丟棄。
容錯??
Kafka Stream 從如下幾個方面進行容錯
-
高可用的 Partition 保證無數據丟失。每個 Task 計算一個 Partition,而 Kafka 數據復制機制保證了 Partition 內數據的高可用性,故無數據丟失風險。同時由于數據是持久化的,即使任務失敗,依然可以重新計算。
-
狀態存儲實現快速故障恢復和從故障點繼續處理。對于 Join 和聚合及窗口等有狀態計算,狀態存儲可保存中間狀態。即使發生 Failover 或 Consumer Rebalance,仍然可以通過狀態存儲恢復中間狀態,從而可以繼續從 Failover 或 Consumer Rebalance 前的點繼續計算。
-
KTable 與 retention period 提供了對亂序數據的處理能力。
Kafka Stream 應用示例??
?
下面結合一個案例來講解如何開發 Kafka Stream 應用。本例完整代碼可從作者 Github 獲取。
訂單 KStream(名為 orderStream),底層 Topic 的 Partition 數為 3,Key 為用戶名,Value 包含用戶名,商品名,訂單時間,數量。用戶 KTable(名為 userTable),底層 Topic 的 Partition 數為 3,Key 為用戶名,Value 包含性別,地址和年齡。商品 KTable(名為 itemTable),底層 Topic 的 Partition 數為 6,Key 為商品名,價格,種類和產地?,F在希望計算每小時購買產地與自己所在地相同的用戶總數。
首先由于希望使用訂單時間,而它包含在 orderStream 的 Value 中,需要通過提供一個實現 TimestampExtractor 接口的類從 orderStream 對應的 Topic 中抽取出訂單時間。
public class OrderTimestampExtractor implements TimestampExtractor {@Overridepublic long extract(ConsumerRecord<Object, Object> record) {if(record instanceof Order) {return ((Order)record).getTS();} else {return 0;}} }接著通過將 orderStream 與 userTable 進行 Join,來獲取訂單用戶所在地。由于二者對應的 Topic 的 Partition 數相同,且 Key 都為用戶名,再假設 Producer 往這兩個 Topic 寫數據時所用的 Partitioner 實現相同,則此時上文所述 Join 條件滿足,可直接進行 Join。
orderUserStream = orderStream.leftJoin(userTable, // 該 lamda 表達式定義了如何從 orderStream 與 userTable 生成結果集的 Value(Order order, User user) -> OrderUser.fromOrderUser(order, user), // 結果集 Key 序列化方式Serdes.String(),// 結果集 Value 序列化方式SerdesFactory.serdFrom(Order.class)).filter((String userName, OrderUser orderUser) -> orderUser.userAddress != null)從上述代碼中,可以看到,Join 時需要指定如何從參與 Join 雙方的記錄生成結果記錄的 Value。Key 不需要指定,因為結果記錄的 Key 與 Join Key 相同,故無須指定。Join 結果存于名為 orderUserStream 的 KStream 中。
接下來需要將 orderUserStream 與 itemTable 進行 Join,從而獲取商品產地。此時 orderUserStream 的 Key 仍為用戶名,而 itemTable 對應的 Topic 的 Key 為產品名,并且二者的 Partition 數不一樣,因此無法直接 Join。此時需要通過 through 方法,對其中一方或雙方進行重新分區,使得二者滿足 Join 條件。這一過程相當于 Spark 的 Shuffle 過程和 Storm 的 FieldGrouping。
orderUserStrea.through(// Key 的序列化方式Serdes.String(),// Value 的序列化方式 SerdesFactory.serdFrom(OrderUser.class), // 重新按照商品名進行分區,具體取商品名的哈希值,然后對分區數取模(String key, OrderUser orderUser, int numPartitions) -> (orderUser.getItemName().hashCode() & 0x7FFFFFFF) % numPartitions, "orderuser-repartition-by-item").leftJoin(itemTable, (OrderUser orderUser, Item item) -> OrderUserItem.fromOrderUser(orderUser, item), Serdes.String(), SerdesFactory.serdFrom(OrderUser.class))從上述代碼可見,through 時需要指定 Key 的序列化器,Value 的序列化器,以及分區方式和結果集所在的 Topic。這里要注意,該 Topic(orderuser-repartition-by-item)的 Partition 數必須與 itemTable 對應 Topic 的 Partition 數相同,并且 through 使用的分區方法必須與 iteamTable 對應 Topic 的分區方式一樣。經過這種 through 操作,orderUserStream 與 itemTable 滿足了 Join 條件,可直接進行 Join。
總結??
?
-
Kafka Stream 的并行模型完全基于 Kafka 的分區機制和 Rebalance 機制,實現了在線動態調整并行度
-
同一 Task 包含了一個子 Topology 的所有 Processor,使得所有處理邏輯都在同一線程內完成,避免了不必的網絡通信開銷,從而提高了效率。
-
through 方法提供了類似 Spark 的 Shuffle 機制,為使用不同分區策略的數據提供了 Join 的可能
-
log compact 提高了基于 Kafka 的 state store 的加載效率
-
state store 為狀態計算提供了可能
-
基于 offset 的計算進度管理以及基于 state store 的中間狀態管理為發生 Consumer rebalance 或 Failover 時從斷點處繼續處理提供了可能,并為系統容錯性提供了保障
-
KTable 的引入,使得聚合計算擁用了處理亂序問題的能力
轉載于:https://www.cnblogs.com/davidwang456/p/7449471.html
總結
以上是生活随笔為你收集整理的流式计算新贵Kafka Stream设计详解--转的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kettle、Oozie、camus、g
- 下一篇: HBASE+Solr实现详单查询--转