Storm-源码分析-Stats (backtype.storm.stats)
會發現, 現在storm里面有兩套metrics系統, metrics framework和stats framework
并且在所有地方都是同時注冊兩套, 貌似準備用metrics來替代stats, 但當前版本UI仍然使用stats
?
這個模塊統計的數據怎么被使用,
1. 在worker中, 會定期調用do-executor-heartbeats去往zk同步hb
可以看到, stats也會作為hb的一部分被同步到zk上
2. 現在任何人都可以通過nimbus的thrift接口來得到相關信息
3. 最直接的用戶就是storm UI, 在準備topology page的時候, 就會調用getTopologyInfo來獲取數據
(defn topology-page [id window include-sys?](with-nimbus nimbus(let [summ (.getTopologyInfo ^Nimbus$Client nimbus id)] )?
Stats
這個模塊用于spout和bolt來抽樣統計數據, 需要統計的具體metics如下
(def COMMON-FIELDS [:emitted :transferred]) (defrecord CommonStats [emitted transferred rate])(def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies]) ;;acked and failed count individual tuples (defrecord BoltExecutorStats [common acked failed process-latencies executed execute-latencies])(def SPOUT-FIELDS [:acked :failed :complete-latencies]) ;;acked and failed count tuple completion (defrecord SpoutExecutorStats [common acked failed complete-latencies])?
抽樣的比例在storm-conf, TOPOLOGY_STATS_SAMPLE_RATE, 配置
為什么統計時每次加rate, 而不是加1?
因為這里的統計是抽樣的, 所以如果抽樣比例是10%, 那么發現一個, 應該加1/(10%), 10個
(defn sampling-rate [conf](->> (conf TOPOLOGY-STATS-SAMPLE-RATE)(/ 1)int))?
然后統計是基于時間窗口的, 底下是對應默認的bucket和時間窗口的定義
(def NUM-STAT-BUCKETS 20) ;;bucket數 ;; 10 minutes, 3 hours, 1 day ;;定義3種時間窗口 (def STAT-BUCKETS [30 540 4320]) ;;bucket大小分別是30,540,4320秒?
核心數據結構是RollingWindowSet, 包含:
統計數據需要的函數, updater extractor, 之所以治理也需要是因為需要統計all-time?
一組rolling windows, 默認是3個時間窗, 10 minutes, 3 hours, 1 day
all-time, 在完整的時間區間上的統計結果
?
繼續看看rolling window的定義,
核心數據, buckets, hashmap, {streamid, data}, 初始化為{}
統計data需要的函數, updater merger extractor
時間窗口, buckets大小和buckets個數
?
1. mk-stats
在mk-executedata的時候需要創建stats
mk-executor-stats <> (sampling-rate storm-conf)?
;; TODO: refactor this to be part of an executor-specific map (defmethod mk-executor-stats :spout [_ rate](stats/mk-spout-stats rate)) (defmethod mk-executor-stats :bolt [_ rate](stats/mk-bolt-stats rate))第一個參數忽略, 其實就是分別調用stats/mk-spout-stats或stats/mk-bolt-stats, 可見就是對于每個需要統計的數據, 創建一個rolling-windows-set
(defn- mk-common-stats [rate](CommonStats. (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))rate))(defn mk-bolt-stats [rate](BoltExecutorStats. (mk-common-stats rate)(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))(defn mk-spout-stats [rate](SpoutExecutorStats. (mk-common-stats rate)(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))))?
2. 數據更新
(defn spout-acked-tuple! [^SpoutExecutorStats stats stream latency-ms](update-executor-stat! stats :acked stream (stats-rate stats))(update-executor-stat! stats :complete-latencies stream latency-ms)) (defmacro update-executor-stat! [stats path & args](let [path (collectify path)]`(swap! (-> ~stats ~@path) update-rolling-window-set ~@args)))就以update-executor-stat! stats :acked stream (stats-rate stats)為例子看看怎么做的?
SpoutExecutorStats取出用于記錄spout acked情況的rolling-windows-set
然后使用update-rolling-window-set來swap這個atom
來看看記錄acked的rolling-windows-set是如何定義的?
keyed-counter-rolling-window-set, 預定義了updater merger extractor
updater, incr-val [amap key amt], 把給定的值amt加到amap的對應的key的value上
merger, (partial merge-with +), 用+作為map merge的邏輯, 即出現相同key則相加
extractor, counter-extract, (if v v {}), 有則返回, 無則返回{}
windows, rolling-window的list
all-time, 初始化為nil
?
好, 下面就看看, 當spout-acked-tuple!時更新:acked時, 如何update的?
首先更新每個rolling-window, 并把更新過的rolling-window-set更新到:windows
并且更新:all-time, (apply (:updater rws) (:all-time rws) args)
updated, incr-val [amap key amt]
args, steamid, rate
all-time, 是用來記錄整個時間區間上的, 某個stream的統計情況
看下如何更新某個rolling-windw
根據now算出當前屬于哪個bucket, time-bucket
取出buckets, 并使用:updater更新相應的bucket, 這里的操作仍然是把rate疊加到streamid的value上
轉載于:https://www.cnblogs.com/fxjwind/p/3223110.html
《新程序員》:云原生和全面數字化實踐50位技術專家共同創作,文字、視頻、音頻交互閱讀總結
以上是生活随笔為你收集整理的Storm-源码分析-Stats (backtype.storm.stats)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 注解技术
- 下一篇: 《OpenGL超级宝典第5版》学习笔记(