Storm ack和fail机制再论
之前對這個的理解有些問題,今天用到有仔細(xì)梳理了一遍,記錄一下
?
首先開啟storm tracker機制的前提是,
1. 在spout emit tuple的時候,要加上第3個參數(shù)messageid?
2. 在配置中acker數(shù)目至少為1?
3. 在bolt emit的時候,要加上第二個參數(shù)anchor tuple,以保持tracker鏈路
?
流程,
1. 當(dāng)tuple具有messageid時,spout會把該tuple加到pending list里面?
?? 并發(fā)消息給acker,通知acker開始tracker這條tuple
2. 然后再后續(xù)的bolt的處理邏輯中,你必須顯式的ack或fail所有處理的tuple?
?? 如果這條tuple在整個DAG圖上都成功執(zhí)行了,那么acker會發(fā)現(xiàn)該tuple的track異或值為0?
?? 于是acker會發(fā)ack_message給spout?
?? 當(dāng)然如果在DAG圖上任意一個節(jié)點bolt上fail,那么acker會認(rèn)為該tuple fail?
?? 于是acker會發(fā)fail_message給spout
3. 當(dāng)spout收到ack或fail message如何處理,?
??? 首先是從pending list里面刪掉這條tuple,因為無論ack或fail,只要得到結(jié)果,這條tuple就沒有繼續(xù)被cache的必要了?
??? 然后做的事是調(diào)用spout.ack或spout.fail?
??? 所以系統(tǒng)默認(rèn)是不會做任何事的,甚至是fail后的重發(fā),你也需要在fail里面自己實現(xiàn)?
??? 如何實現(xiàn)后面看
4. 如果一條tuple沒有被ack或fail,最終是會超時的?
??? Spout會根據(jù)system tick去rotate pending list,對于每個過時的tuple,都調(diào)用spout.fail
?
下面的問題就是如何做fail重發(fā),
這個必須用戶通過自己處理fail來做,系統(tǒng)是不會自己做的,
public void fail(Object msgId)看看系統(tǒng)提供的接口,只有msgId這個參數(shù),這里的設(shè)計不合理,其實在系統(tǒng)里是有cache整個msg的,只給用戶一個messageid,用戶如何取得原來的msg
貌似需要自己cache,然后用這個msgId去查詢,太坑爹了
阿里自己的Jstorm會提供
public interface IFailValueSpout { void fail(Object msgId, List<object>values); }這樣更合理一些, 可以直接取得系統(tǒng)cache的msg values
本文章摘自博客園,原文發(fā)布日期:?2014-06-24?總結(jié)
以上是生活随笔為你收集整理的Storm ack和fail机制再论的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Eclipse 控制console
- 下一篇: python3.7安装包百度云_Pyth