Storm-源码分析-EventManager (backtype.storm.event)
生活随笔
收集整理的這篇文章主要介紹了
Storm-源码分析-EventManager (backtype.storm.event)
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
Protocol and DataType
大體結(jié)構(gòu),
定義protocol EventManager, 其實就是定義interface
函數(shù)event-manager, 主要做2件事
1. 啟動event queue的處理線程, 不斷從queue中取出event-fn并執(zhí)行
2. 返回實現(xiàn)EventManager的匿名record(reify部分, 實現(xiàn)protocol)
這里使用了reify的close over特性, reify會將用到的局部變量打包到閉包內(nèi), 包含queue, runner
(ns backtype.storm.event(:use [backtype.storm log util])(:import [backtype.storm.utils Time Utils])(:import [java.util.concurrent LinkedBlockingQueue TimeUnit]))(defprotocol EventManager(add [this event-fn])(waiting? [this])(shutdown [this]))(defn event-manager"Creates a thread to respond to events. Any error will cause process to halt"[daemon?](let [added (atom 0)processed (atom 0)^LinkedBlockingQueue queue (LinkedBlockingQueue.)running (atom true)runner (Thread.(fn [](try-cause(while @running(let [r (.take queue)](r)(swap! processed inc)))(catch InterruptedException t(log-message "Event manager interrupted"))(catch Throwable t(log-error t "Error when processing event")(halt-process! 20 "Error when processing an event")))))](.setDaemon runner daemon?)(.start runner)(reifyEventManager(add [this event-fn];; should keep track of total added and processed to know if this is finished yet(when-not @running(throw (RuntimeException. "Cannot add events to a shutdown event manager")))(swap! added inc)(.put queue event-fn))(waiting? [this](or (Time/isThreadWaiting runner)(= @processed @added)))(shutdown [this](reset! running false)(.interrupt runner)(.join runner)))))?
使用的時候很簡單, 如下
let [event-manager processes-event-manager :as managers] [(event/event-manager false) (event/event-manager false)] (.add processes-event-manager sync-processes)可以直接調(diào)用add或其他的function
相當(dāng)于給event-manager增加EventManager protocol, 反過來說, 給add或其他接口functions增加對event-manager record的support, 因為protocol函數(shù)的第一個參數(shù)總是類型
比較神奇的是, 閉包產(chǎn)生的效果, 可以在完全沒有queue, runner定義或聲明的情況下, 方便的操作他們
總結(jié)
以上是生活随笔為你收集整理的Storm-源码分析-EventManager (backtype.storm.event)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: E431 笔记本电池问题 0190 C
- 下一篇: 迷宫问题算法分析