Storm 的可靠性保证测试
Storm 是一個分布式的實(shí)時計算框架,可以很方便地對流式數(shù)據(jù)進(jìn)行實(shí)時處理和分析,能運(yùn)用在實(shí)時分析、在線數(shù)據(jù)挖掘、持續(xù)計算以及分布式 RPC 等場景下。Storm 的實(shí)時性可以使得數(shù)據(jù)從收集到處理展示在秒級別內(nèi)完成,從而為業(yè)務(wù)方?jīng)Q策提供實(shí)時的數(shù)據(jù)支持。
在美團(tuán)點(diǎn)評公司內(nèi)部,實(shí)時計算主要應(yīng)用場景包括實(shí)時日志解析、用戶行為分析、實(shí)時消息推送、消費(fèi)趨勢展示、實(shí)時新客判斷、實(shí)時活躍用戶數(shù)統(tǒng)計等。這些數(shù)據(jù)提供給各事業(yè)群,并作為他們實(shí)時決策的有力依據(jù),彌補(bǔ)了離線計算“T+1”的不足。
在實(shí)時計算中,用戶不僅僅關(guān)心時效性的問題,同時也關(guān)心消息處理的成功率。本文將通過實(shí)驗(yàn)驗(yàn)證 Storm 的消息可靠性保證機(jī)制,文章分為消息保證機(jī)制、測試目的、測試環(huán)境、測試場景以及總結(jié)等五節(jié)。
Storm 的消息保證機(jī)制
Storm 提供了三種不同層次的消息保證機(jī)制,分別是 At Most Once、At Least Once 以及 Exactly Once。消息保證機(jī)制依賴于消息是否被完全處理。
消息完全處理
每個從 Spout(Storm 中數(shù)據(jù)源節(jié)點(diǎn))發(fā)出的 Tuple(Storm 中的最小消息單元)可能會生成成千上萬個新的 Tuple,形成一棵 Tuple 樹,當(dāng)整棵 Tuple 樹的節(jié)點(diǎn)都被成功處理了,我們就說從 Spout 發(fā)出的 Tuple 被完全處理了。 我們可以通過下面的例子來更好地詮釋消息被完全處理這個概念:
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("sentences", new KafkaSpout(spoutConfig), spoutNum); builder.setBolt("split", new SplitSentence(), 10).shuffleGrouping("sentences"); builder.setBolt("count", new WordCount(), 20).fieldsGrouping("split", new Fields("word"));這個 Topology 從 Kafka(一個開源的分布式消息隊列)讀取信息發(fā)往下游,下游的 Bolt 將收到的句子分割成單獨(dú)的單詞,并進(jìn)行計數(shù)。每一個從 Spout 發(fā)送出來的 Tuple 會衍生出多個新的 Tuple,從 Spout 發(fā)送出來的 Tuple 以及后續(xù)衍生出來的 Tuple 形成一棵 Tuple 樹,下圖是一棵 Tuple 樹示例:
上圖中所有的 Tuple 都被成功處理了,我們才認(rèn)為 Spout 發(fā)出的 Tuple 被完全處理。如果在一個固定的時間內(nèi)(這個時間可以配置,默認(rèn)為 30 秒),有至少一個 Tuple 處理失敗或超時,則認(rèn)為整棵 Tuple 樹處理失敗,即從 Spout 發(fā)出的 Tuple 處理失敗。
如何實(shí)現(xiàn)不同層次的消息保證機(jī)制
Tuple 的完全處理需要 Spout、Bolt 以及 Acker(Storm 中用來記錄某棵 Tuple 樹是否被完全處理的節(jié)點(diǎn))協(xié)同完成,如上圖所示。從 Spout 發(fā)送 Tuple 到下游,并把相應(yīng)信息通知給 Acker,整棵 Tuple 樹中某個 Tuple 被成功處理了都會通知 Acker,待整棵 Tuple 樹都被處理完成之后,Acker 將成功處理信息返回給 Spout;如果某個 Tuple 處理失敗,或者超時,Acker 將會給 Spout 發(fā)送一個處理失敗的消息,Spout 根據(jù) Acker 的返回信息以及用戶對消息保證機(jī)制的選擇判斷是否需要進(jìn)行消息重傳。
Storm 提供的三種不同消息保證機(jī)制中。利用 Spout、Bolt 以及 Acker 的組合我們可以實(shí)現(xiàn) At Most Once 以及 At Least Once 語義,Storm 在 At Least Once 的基礎(chǔ)上進(jìn)行了一次封裝(Trident),從而實(shí)現(xiàn) Exactly Once 語義。
Storm 的消息保證機(jī)制中,如果需要實(shí)現(xiàn) At Most Once 語義,只需要滿足下面任何一條即可:
- 關(guān)閉 ACK 機(jī)制,即 Acker 數(shù)目設(shè)置為 0
- Spout 不實(shí)現(xiàn)可靠性傳輸
- Spout 發(fā)送消息是使用不帶 message ID 的 API
- 不實(shí)現(xiàn) fail 函數(shù)
- Bolt 不把處理成功或失敗的消息發(fā)送給 Acker
如果需要實(shí)現(xiàn) At Least Once 語義,則需要同時保證如下幾條:
- 開啟 ACK 機(jī)制,即 Acker 數(shù)目大于 0
- Spout 實(shí)現(xiàn)可靠性傳輸保證
- Spout 發(fā)送消息時附帶 message 的 ID
- 如果收到 Acker 的處理失敗反饋,需要進(jìn)行消息重傳,即實(shí)現(xiàn) fail 函數(shù)
- Bolt 在處理成功或失敗后需要調(diào)用相應(yīng)的方法通知 Acker
實(shí)現(xiàn) Exactly Once 語義,則需要在 At Least Once 的基礎(chǔ)上進(jìn)行狀態(tài)的存儲,用來防止重復(fù)發(fā)送的數(shù)據(jù)被重復(fù)處理,在 Storm 中使用 Trident API 實(shí)現(xiàn)。
下圖中,每種消息保證機(jī)制中左邊的字母表示上游發(fā)送的消息,右邊的字母表示下游接收到的消息。從圖中可以知道,At Most Once 中,消息可能會丟失(上游發(fā)送了兩個 A,下游只收到一個 A);At Least Once 中,消息不會丟失,可能重復(fù)(上游只發(fā)送了一個 B ,下游收到兩個 B);Exactly Once 中,消息不丟失、不重復(fù),因此需要在 At Least Once 的基礎(chǔ)上保存相應(yīng)的狀態(tài),表示上游的哪些消息已經(jīng)成功發(fā)送到下游,防止同一條消息發(fā)送多次給下游的情況。
測試目的
Storm 官方提供 At Most Once、At Least Once 以及 Exactly Once 三種不同層次的消息保證機(jī)制,我們希望通過相關(guān)測試,達(dá)到如下目的:
- 三種消息保證機(jī)制的表現(xiàn),是否與官方的描述相符;
- At Most Once 語義下,消息的丟失率和什么有關(guān)系、關(guān)系如何;
- At Least Once 語義下,消息的重復(fù)率和什么有關(guān)系、關(guān)系如何。
測試環(huán)境
本文的測試環(huán)境如下: 每個 worker(worker 為一個 物理 JVM 進(jìn)程,用于運(yùn)行實(shí)際的 Storm 作業(yè))分配 1 CPU 以及 1.6G 內(nèi)存。Spout、Bolt、Acker 分別跑在單獨(dú)的 worker 上。并通過在程序中控制拋出異常以及人工 Kill Spout/Bolt/Acker 的方式來模擬實(shí)際情況中的異常情況。
三種消息保證機(jī)制的測試均由 Spout 從 Kafka 讀取測試數(shù)據(jù),經(jīng)由相應(yīng) Bolt 進(jìn)行處理,然后發(fā)送到 Kafka,并將 Kafka 上的數(shù)據(jù)同步到 MySQL 方便最終結(jié)果的統(tǒng)計,如下圖所示:
測試數(shù)據(jù)為 Kafka 上順序保存的一系列純數(shù)字,數(shù)據(jù)量分別有十萬、五十萬、一百萬等,每個數(shù)字在每個測試樣例中出現(xiàn)且僅出現(xiàn)一次。
測試場景
對于三種不同的消息保證機(jī)制,我們分別設(shè)置了不同的測試場景,來進(jìn)行充分的測試。其中為了保證 Spout/Bolt/Acker 發(fā)生異常的情況下不影響其他節(jié)點(diǎn),在下面的測試中,所有的節(jié)點(diǎn)單獨(dú)運(yùn)行在獨(dú)立的 Worker 上。
At Most Once
從背景中可以得知,如果希望實(shí)現(xiàn) At Most Once 語義,將 Acker 的數(shù)目設(shè)置為 0 即可,本文的測試過程中通過把設(shè)置 Acker 為 0 來進(jìn)行 At Most Once 的測試。
輸入數(shù)據(jù)
保存在 Kafka 上的一系列純數(shù)字,數(shù)據(jù)量從十萬到五百萬不等,每個測試樣例中,同一個數(shù)字在 Kafka 中出現(xiàn)且僅出現(xiàn)一次。
測試結(jié)果
| 0 | 500000 | 500000 | 0 | 0% | 0 |
| 0 | 1000000 | 1000000 | 0 | 0% | 0 |
| 0 | 2000000 | 2000000 | 0 | 0% | 0 |
| 0 | 3000000 | 3000000 | 0 | 0% | 0 |
| 1 | 3000000 | 2774940 | 225060 | 7.50% | 0 |
| 2 | 3000000 | 2307087 | 692913 | 23.09% | 0 |
| 3 | 3000000 | 2082823 | 917177 | 30.57% | 0 |
| 4 | 3000000 | 1420725 | 1579275 | 52.64% | 0 |
結(jié)論
不發(fā)生異常的情況下,消息能夠不丟不重;Bolt 發(fā)生異常的情況下,消息會丟失,不會重復(fù),其中消息的丟失數(shù)目與異常次數(shù)正相關(guān)。與官方文檔描述相符,符合預(yù)期。
At Least Once
為了實(shí)現(xiàn) At Least Once 語義,需要 Spout、Bolt、Acker 進(jìn)行配合。我們使用 Kafka-Spout 并通過自己管理 offset 的方式來實(shí)現(xiàn)可靠的 Spout;Bolt 通過繼承 BaseBasicBolt,自動幫我們建立 Tuple 樹以及消息處理之后通知 Acker;將 Acker 的數(shù)目設(shè)置為 1,即打開 ACK 機(jī)制,這樣整個 Topology 即可提供 At Least Once 的語義。
測試數(shù)據(jù)
Kafka 上保存的十萬到五十萬不等的純數(shù)字,其中每個測試樣例中,每個數(shù)字在 Kafka 中出現(xiàn)且僅出現(xiàn)一次。
測試結(jié)果
Acker 發(fā)生異常的情況
| 0 | 100000 | 100000 | - | - | 0 | 2000(默認(rèn)值) |
| 0 | 200000 | 200000 | - | - | 0 | 2000 |
| 0 | 300000 | 300000 | - | - | 0 | 2000 |
| 0 | 400000 | 400000 | - | - | 0 | 2000 |
| 1 | 100000 | 100000 | 2 | 2000 | 0 | 2000 |
| 2 | 100000 | 100000 | 2 | 4001 | 0 | 2000 |
| 3 | 100000 | 100000 | 2 | 6000 | 0 | 2000 |
| 4 | 100000 | 100000 | 2 | 8000 | 0 | 2000 |
Spout 發(fā)生異常的情況
| 0 | 100000 | 100000 | - | - | 0 |
| 0 | 200000 | 200000 | - | - | 0 |
| 0 | 300000 | 300000 | - | - | 0 |
| 0 | 400000 | 400000 | - | - | 0 |
| 1 | 100000 | 100000 | 2 | 2052 | 0 |
| 2 | 100000 | 100000 | 2 | 4414 | 0 |
| 4 | 100000 | 100000 | 2 | 9008 | 0 |
| 6 | 100000 | 100000 | 2 | 9690 | 0 |
| 3 | 1675 | 0 |
Bolt 發(fā)生異常的情況
調(diào)用 emit 函數(shù)之前發(fā)生異常
| 異常次數(shù) | 結(jié)果集中不重復(fù)的 Tuple 數(shù) | 數(shù)據(jù)重復(fù)的次數(shù) (>1) | 出現(xiàn)重復(fù)的 Tuple 數(shù) | 數(shù)據(jù)丟失量 |
| — | — | — | — | — | |0 |100000| - |- |0| |0 |200000| - |- |0| |0 |300000| - |- |0| |0 |400000| - |- |0|
| 異常次數(shù) | 結(jié)果集中不重復(fù)的 Tuple 數(shù) | 數(shù)據(jù)重復(fù)的次數(shù) (>1) | 出現(xiàn)重復(fù)的 Tuple 數(shù) | 數(shù)據(jù)丟失量 |
| — | — | — | — | — | |1 |100000| - |- |0| |2 |100000| - |- |0| |4 |100000| - |- |0| |8 |100000| - |- |0| |10|100000| - |- |0|
調(diào)用 emit 函數(shù)之后發(fā)生異常
| 0 | 100000 | - | - | 0 |
| 0 | 200000 | - | - | 0 |
| 0 | 300000 | - | - | 0 |
| 0 | 400000 | - | - | 0 |
| 1 | 100000 | 2 | 2 | 0 |
| 2 | 100000 | 2 | 3 | 0 |
| 4 | 100000 | 2 | 5 | 0 |
| 8 | 100000 | 2 | 9 | 0 |
| 10 | 100000 | 2 | 11 | 0 |
結(jié)論
從上面的表格中可以得到,消息不會丟失,可能發(fā)生重復(fù),重復(fù)的數(shù)目與異常的情況相關(guān)。
- 不發(fā)生任何異常的情況下,消息不會重復(fù)不會丟失。
- Spout 發(fā)生異常的情況下,消息的重復(fù)數(shù)目約等于 spout.max.pending(Spout 的配置項,每次可以發(fā)送的最多消息條數(shù)) * NumberOfException(異常次數(shù))。
- Acker 發(fā)生異常的情況下,消息重復(fù)的數(shù)目等于 spout.max.pending * NumberOfException。
- Bolt 發(fā)生異常的情況:
- emit 之前發(fā)生異常,消息不會重復(fù)。
- emit 之后發(fā)生異常,消息重復(fù)的次數(shù)等于異常的次數(shù)。
結(jié)論與官方文檔所述相符,每條消息至少發(fā)送一次,保證數(shù)據(jù)不會丟失,但可能重復(fù),符合預(yù)期。
Exactly Once
對于 Exactly Once 的語義,利用 Storm 中的 Trident 來實(shí)現(xiàn)。
測試數(shù)據(jù)
Kafka 上保存的一萬到一百萬不等的數(shù)字,每個數(shù)字在每次測試樣例中出現(xiàn)且僅出現(xiàn)一次。
測試結(jié)果
Spout 發(fā)生異常情況
|異常數(shù) |測試數(shù)據(jù)量 |結(jié)果集中不重復(fù)的 Tuple 數(shù) |結(jié)果集中所有 Tuple 的總和 | | — | — | — | — | |1 |10000 |10000 |50005000 | |2 |10000 |10000 |50005000 | |3 |10000 |10000 |50005000 |
Acker 發(fā)生異常的情況
|異常數(shù) |測試數(shù)據(jù)量 |結(jié)果集中不重復(fù)的 Tuple 數(shù) |結(jié)果集中所有 Tuple 的總和 | | — | — | — | — | |1 |10000 |10000 |50005000 | |2 |10000 |10000 |50005000 | |3 |10000 |10000 |50005000 |
Bolt 發(fā)生異常的情況
|異常數(shù) |測試數(shù)據(jù)量 |結(jié)果集中不重復(fù)的 Tuple 數(shù) |結(jié)果集中所有 Tuple 的總和 | | — | — | — | — | |1 |10000 |10000 |50005000 | |2 |10000 |10000 |50005000 | |3 |10000 |10000 |50005000 |
結(jié)論
在所有情況下,最終結(jié)果集中的消息不會丟失,不會重復(fù),與官方文檔中的描述相符,符合預(yù)期。
總結(jié)
對 Storm 提供的三種不同消息保證機(jī)制,用戶可以根據(jù)自己的需求選擇不同的消息保證機(jī)制。
不同消息可靠性保證的使用場景
對于 Storm 提供的三種消息可靠性保證,優(yōu)缺點(diǎn)以及使用場景如下所示:
| At most once | 處理速度快 | 數(shù)據(jù)可能丟失 | 都處理速度要求高,且對數(shù)據(jù)丟失容忍度高的場景 |
| At least once | 數(shù)據(jù)不會丟失 | 數(shù)據(jù)可能重復(fù) | 不能容忍數(shù)據(jù)丟失,可以容忍數(shù)據(jù)重復(fù)的場景 |
| Exactly once | 數(shù)據(jù)不會丟失,不會重復(fù) | 處理速度慢 | 對數(shù)據(jù)不丟不重性質(zhì)要求非常高,且處理速度要求沒那么高,比如支付金額 |
如何實(shí)現(xiàn)不同層次的消息可靠性保證
對于 At Least Once 的保證需要做如下幾步:
不滿足以上三條中任意一條的都只提供 At Most Once 的消息可靠性保證,如果希望得到 Exactly Once 的消息可靠性保證,可以使用 Trident 進(jìn)行實(shí)現(xiàn)。
不同層次的可靠性保證如何實(shí)現(xiàn)
如何實(shí)現(xiàn)可靠的 Spout
實(shí)現(xiàn)可靠的 Spout 需要在 nextTuple 函數(shù)中發(fā)送消息時,調(diào)用帶 msgID 的 emit 方法,然后實(shí)現(xiàn)失敗消息的重傳(fail 函數(shù)),參考如下示例:
/*** 想實(shí)現(xiàn)可靠的 Spout,需要實(shí)現(xiàn)如下兩點(diǎn)* 1. 在 nextTuple 函數(shù)中調(diào)用 emit 函數(shù)時需要帶一個 msgId,用來表示當(dāng)前的消息(如果消息發(fā)送失敗會用 msgId 作為參數(shù)回調(diào) fail 函數(shù))* 2. 自己實(shí)現(xiàn) fail 函數(shù),進(jìn)行重發(fā)(注意,在 storm 中沒有 msgId 和消息的對應(yīng)關(guān)系,需要自己進(jìn)行維護(hù))*/ public void nextTuple() {//設(shè)置 msgId 和 Value 一樣,方便 fail 之后重發(fā)collector.emit(new Values(curNum + "", round + ""), curNum + ":" + round); }@Override public void fail(Object msgId) {//消息發(fā)送失敗時的回調(diào)函數(shù) String tmp = (String)msgId; //上面我們設(shè)置了 msgId 和消息相同,這里通過 msgId 解析出具體的消息 String[] args = tmp.split(":");//消息進(jìn)行重發(fā) collector.emit(new Values(args[0], args[1]), msgId); }如何實(shí)現(xiàn)可靠的 Bolt
Storm 提供兩種不同類型的 Bolt,分別是 BaseRichBolt 和 BaseBasicBolt,都可以實(shí)現(xiàn)可靠性消息傳遞,不過 BaseRichBolt 需要自己做很多周邊的事情(建立 anchor 樹,以及手動 ACK/FAIL 通知 Acker),使用場景更廣泛,而 BaseBasicBolt 則由 Storm 幫忙實(shí)現(xiàn)了很多周邊的事情,實(shí)現(xiàn)起來方便簡單,但是使用場景單一。如何用這兩個 Bolt 實(shí)現(xiàn)(不)可靠的消息傳遞如下所示:
//BaseRichBolt 實(shí)現(xiàn)不可靠消息傳遞 public class SplitSentence extends BaseRichBolt {//不建立 anchor 樹的例子OutputCollector _collector;public void prepare(Map conf, TopologyContext context, OutputCollector collector) {_collector = collector;}public void execute(Tuple tuple) {String sentence = tuple.getString(0);for(String word: sentence.split(" ")) {_collector.emit(new Values(word)); // 不建立 anchor 樹}_collector.ack(tuple); //手動 ack,如果不建立 anchor 樹,是否 ack 是沒有區(qū)別的,這句可以進(jìn)行注釋}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));} }//BaseRichBolt 實(shí)現(xiàn)可靠的 Bolt public class SplitSentence extends BaseRichBolt {//建立 anchor 樹以及手動 ack 的例子OutputCollector _collector;public void prepare(Map conf, TopologyContext context, OutputCollector collector) {_collector = collector;}public void execute(Tuple tuple) {String sentence = tuple.getString(0);for(String word: sentence.split(" ")) {_collector.emit(tuple, new Values(word)); // 建立 anchor 樹}_collector.ack(tuple); //手動 ack,如果想讓 Spout 重發(fā)該 Tuple,則調(diào)用 _collector.fail(tuple);}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));} }下面的示例會可以建立 Multi-anchoring List<Tuple> anchors = new ArrayList<Tuple>(); anchors.add(tuple1); anchors.add(tuple2); _collector.emit(anchors, new Values(1, 2, 3));//BaseBasicBolt 是吸納可靠的消息傳遞 public class SplitSentence extends BaseBasicBolt {//自動建立 anchor,自動 ackpublic void execute(Tuple tuple, BasicOutputCollector collector) {String sentence = tuple.getString(0);for(String word: sentence.split(" ")) {collector.emit(new Values(word));}}public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("word"));} }Trident
在 Trident 中,Spout 和 State 分別有三種狀態(tài),如下圖所示:
其中表格中的 Yes 表示相應(yīng)的 Spout 和 State 組合可以實(shí)現(xiàn) Exactly Once 語義,No 表示相應(yīng)的 Spout 和 State 組合不保證 Exactly Once 語義。下面的代碼是一個 Trident 示例:
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf); //Opaque Spout//TransactionalTridentKafkaSpout spout = new TransactionalTridentKafkaSpout(spoutConf); //Transaction SpoutTridentTopology topology = new TridentTopology();String spoutTxid = Utils.kafkaSpoutGroupIdBuilder(topologyConfig.kafkaSrcTopic, topologyConfig.topologyName);Stream stream = topology.newStream(spoutTxid, spout).name("new stream").parallelismHint(1);// kafka configKafkaProducerConfig kafkaProducerConfig = new KafkaProducerConfig(); //KafkaProducerConfig 僅對 kafka 相關(guān)配置進(jìn)行了封裝,具體可以參考 TridentKafkaStateFactory2(Map<String, String> config)Map<String, String> kafkaConfigs = kafkaProducerConfig.loadFromConfig(topologyConfig);TridentToKafkaMapper tridentToKafkaMapper = new TridentToKafkaMapper(); //TridentToKafkaMapper 繼承自 TridentTupleToKafkaMapper<String, String>,實(shí)現(xiàn) getMessageFromTuple 接口,該接口中返回 tridentTuple.getString(0);String dstTopic = "test__topic_for_all";TridentKafkaStateFactory2 stateFactory = new TridentKafkaStateFactory2(kafkaConfigs);stateFactory.withTridentTupleToKafkaMapper(tridentToKafkaMapper);stateFactory.withKafkaTopicSelector(new DefaultTopicSelector(dstTopic));stream.each(new Fields("bytes"), new AddMarkFunction(), new Fields("word")) //從spout 出來數(shù)據(jù)是一個 bytes 類型的數(shù)據(jù),第二個是參數(shù)是自己的處理函數(shù),第三個參數(shù)是處理函數(shù)的輸出字段.name("write2kafka").partitionPersist(stateFactory //將數(shù)據(jù)寫入到 Kafka 中,可以保證寫入到 Kafka 的數(shù)據(jù)是 exactly once 的, new Fields("word"), new TridentKafkaUpdater()).parallelismHint(1);總結(jié)
以上是生活随笔為你收集整理的Storm 的可靠性保证测试的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。