as点击发送广播_Apache Flink 中广播状态的实用指南
自版本 Flink 1.5.0 以來,Apache Flink 提供了一種新的狀態類型,稱為廣播狀態(Broadcast State)。在本文中,將解釋什么是廣播狀態,并通過示例演示如何將廣播狀態應用在評估基于事件流的動態模式的應用程序,并指導大家學習廣播狀態的處理步驟和相關源碼,以便在今后的實踐中能實現此類的應用。
什么是廣播狀態
廣播狀態可以用于通過一個特定的方式來組合并共同處理兩個事件流。第一個流的事件被廣播到另一個 operator 的所有并發實例,這些事件將被保存為狀態。另一個流的事件不會被廣播,而是發送給同一個 operator 的各個實例,并與廣播流的事件一起處理。廣播狀態非常適合兩個流中一個吞吐大,一個吞吐小,或者需要動態修改處理邏輯的情況。我們將使用后者的一個具體實例來解釋廣播狀態,并在本文的其余部分里對詳細的 API 加以說明。
使用廣播狀態的動態模型評估
假設電子商務類型的網站獲取了所有用戶的操作行為數據作為用戶的操作流,網站的運營團隊致力于分析用戶的操作,來提高銷售額,改善用戶體驗,并監測和預防惡意行為。網站期望實現一個流應用程序,用于檢測用戶事件流中的模式,但需要避免在每次模式有變化的時候還要修改和重新部署應用程序,因此我們使用另外一個特征流來讀取、更新當前特征,接下來我們通過一個實例逐步闡述如何通過 Apache Flink 中的廣播狀態來完成相應工作。
實例的程序獲取兩個數據流,第一個流提供了網站上的用戶操作行為數據,如上圖左上方所示,一個用戶的交互事件由操作的類型(用戶登錄、用戶注銷、添加到購物車或者完成付款等)和用戶的 ID(按顏色編碼的)組成。圖中的用戶操作事件流包含用戶 1001 的“登出”操作,然后是用戶 1003 的“支付完成”事件,以及用戶 1002 的“添加到購物車”操作。
第二個流提供了應用程序要評估的用戶操作模式,模式是由兩個連續的操作組成的。在上圖中,模式流包含了以下兩種:
模式1:用戶登錄并立即登出,并沒有點擊網站上其它的頁面;
模式2:用戶將商品添加到購物車,然后登出,而并沒有完成購買操作;
這樣的模式有助于企業更好地分析用戶行為、檢測惡意行為和提高網站體驗。例如,如果只是將商品添加到購物車里而沒有完成后續的支付,那么網站可以采取合適的方法,更好地了解用戶沒有購買的原因,并采取一定的措施以提高網站的購買轉化率(例如提供優惠券、免運費等)。
在上圖右側,顯示了一個 operator 的三個并發實例,這些實例獲取模式和用戶操作行為的數據流,評估數據流上的模式,并向下游發出模式匹配事件。為了簡便起見,我們的例子中的 operator 只對一個進行兩次后續操作行為的模式進行評估。當從模式流中獲取到新模式的時候,將替換當前活動的模式。原則上,該 operator 也可以實現評估更復雜的模式或多個模式,這些模式可以單獨添加或是刪除。
我們將描述負責模式匹配的程序如何處理用戶的操作和模式流。
首先,向 operator 發送一個模式,該模式被廣播給這個 operator 的三個并發實例,接著,每個并發實例將模式存儲在廣播狀態中,由于廣播狀態只能使用廣播數據來進行更新,因此所有并發實例的狀態都應該是相同的。
接下來,第一個用戶信息流會基于用戶 ID 進行劃分,并發送給 operator 的實例,分區會確保同一用戶的所有操作都由同一并發實例處理。上圖顯示了在 operator 實例處理了第一個模式和前三個操作行為事件之后應用程序的狀態。
當任務接收到新的用戶操作數據時,它通過查看用戶最新的和歷史的操作記錄來評估當前的活動模式。對于每個用戶,operator 都在 keyed state 中存儲用戶的上一個操作。到目前為止,由于上圖中的任務只為每個用戶接收一個操作(我們剛剛啟動了應用程序),因此不需要評估模式。最后,keyed state 中用戶的上一個操作將更新為最新的操作,以便在同一用戶的下一個操作行為到達時能夠進行查找。
在前三個操作行為被處理了之后,下一個事件,即用戶 1001 的注銷操作,將被發送到處理用戶 1001 的并發實例中。當并發實例接收到用戶操作的數據時,它從廣播狀態和用戶 1001 的上一個操作中查找當前的模式。由于這兩個操作符合模式匹配,因此會往下游發送匹配事件。最后,該任務會通過使用最新的操作來覆蓋前一個事件以更新其 keyed state。
當一個新模式進入了模式流,它會被廣播給所有任務,并且每個并發實例通過使用新模式替換當前模式來更新其廣播狀態。
一旦廣播狀態更新為新模式,那么匹配邏輯將像以前一樣繼續執行,即用戶操作行為事件按鍵(key)進行分區,并由負責的并發實例進行評估。
如何實現廣播狀態的應用程序?
到目前為止,我們在概念上討論了應用程序,并解釋了如何使用廣播狀態來評估事件流上的動態模式。接下來,我們將展示如何使用 Flink 的 DataStream API 和廣播狀態功能實現該實例的程序代碼。
讓我們從程序的輸入數據開始。有兩個數據流:操作行為流和模式流,在這一點上,我們并不關心數據流從何而來,這些流可以從 Apache Kafka、Kinesis 或任何其它系統中獲取。
DataStream<Action> actions = ???DataStream<Pattern> patterns = ???Action 和 Pattern 都是 POJO,每個都含有兩個字段:
Action的字段:Long userId, String action
Pattern的字段:String firstAction, String secondAction
作為第一步,我們將 userId 作為操作行為流上的鍵:
KeyedStream<Action, Long> actionsByUser = actions .keyBy((KeySelector<Action, Long>) action -> action.userId);接下來,我們準備廣播狀態,廣播狀態通常表示為 MapState,這是 Flink 提供的最通用的狀態接口類。
MapStateDescriptor<Void, Pattern> bcStateDescriptor = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));由于這個應用程序一次只評估和存儲一個 Pattern,所以我們將廣播狀態配置成具有鍵類型 Void 和值類型 Pattern 的 MapState。MapState 的鍵永遠為 null。
BroadcastStream<Pattern> bcedPatterns = patterns.broadcast(bcStateDescriptor);以 MapStateDescriptor 為參數,調用模式流上的 Broadcast 轉換操作,得到一個? BroadcastStream 對象 bcedPatterns。
DataStream>> matches = actionsByUser .connect(bcedPatterns) .process(new PatternEvaluator());在獲得了 keyedstreamactionsByUser 和廣播流 bcedPatterns 之后,我們對兩個流使用了 connect() 方法,并在連接的流上調用了 PatternEvaluator 類(見下面 PatternEvaluator 的代碼)。PatternEvaluator 是實現 KeyedBroadcastProcessFunction 接口的自定義類。它調用了我們之前討論過的模式匹配邏輯,并發出 Tuple2 的記錄,其中包含用戶 ID 和匹配的模式。
public static class PatternEvaluator extends KeyedBroadcastProcessFunction<Long, Action, Pattern, Tuple2<Long, Pattern>> { // handle for keyed state (per user) ValueState prevActionState; // broadcast state descriptor MapStateDescriptor patternDesc; @Override public void open(Configuration conf) { // initialize keyed state prevActionState = getRuntimeContext().getState( new ValueStateDescriptor<>("lastAction", Types.STRING)); patternDesc = new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class)); } /** * Called for each user action. * Evaluates the current pattern against the previous and * current action of the user. */ @Override public void processElement( Action action, ReadOnlyContext ctx, Collector> out) throws Exception { // get current pattern from broadcast state Pattern pattern = ctx .getBroadcastState(this.patternDesc) // access MapState with null as VOID default value .get(null); // get previous action of current user from keyed state String prevAction = prevActionState.value(); if (pattern != null && prevAction != null) { // user had an action before, check if pattern matches if (pattern.firstAction.equals(prevAction) && pattern.secondAction.equals(action.action)) { // MATCH out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern)); } } // update keyed state and remember action for next pattern evaluation prevActionState.update(action.action); } /** * Called for each new pattern. * Overwrites the current pattern with the new pattern. */ @Override public void processBroadcastElement( Pattern pattern, Context ctx, Collector> out) throws Exception { // store the new pattern by updating the broadcast state BroadcastState bcState = ctx.getBroadcastState(patternDesc); // storing in MapState with null as VOID default value bcState.put(null, pattern); }}KeyedBroadcastProcessFunction 接口提供了三種方法來處理數據記錄和發出的結果:
processBroadcastElement() 方法:每次收到廣播流的記錄時會調用。在 PatternEvaluator 類中,我們只需使用 null 鍵將接收到的 Pattern 記錄放入廣播狀態中(記住,我們只在 MapState 中存儲一個模式);
processElement() 方法:接受到用戶行為流的每條消息時會調用,并能夠對廣播狀態進行只讀操作,以防止導致跨越類中多個并發實例的不同廣播狀態的修改。PatternEvaluator 類的 processElement() 方法從廣播狀態中獲取當前模式,并從 keyed state 中獲取用戶的前一個操作。如果兩者都存在,它會檢查前一個和當前的操作行為是否與模式匹配,如果是這樣,則會發出模式匹配記錄。最后,它將 keyed state 更新為當前用戶操作;
onTimer() 方法:當之前注冊過的計時器觸發時被調用。計時器可以在processElement 方法中定義,用于執行計算或是清除狀態。為了保持代碼的簡潔性,我們沒有在例子中實現這個方法,但當用戶在某段時間內沒有操作時,它可以用來刪除最后一個操作,以避免由于非活動用戶而導致狀態增長;
你可能注意到了 KeyedBroadcastProcessFunction 類方法的上下文對象,提供了對其它功能的訪問方法,例如:
廣播狀態(讀寫或只讀,取決于方法)
TimerService,允許訪問記錄的時間戳、當前的水印,并可以注冊計時器
當前鍵(僅在 processElement() 方法中可用)
一種將函數應用于每個已注冊鍵的 keyed state 的方法(僅在 processBroadcastElement() 方法中可用) KeyedBroadcastProcessFunction 類與其它任何 ProcessFunction 類一樣,完全可以調用 Flink 的狀態和時間功能,因此可以用于實現復雜的程序邏輯。廣播狀態被設計成了多功能,能夠適應不同的場景和用例,雖然我們只討論了一個比較簡單的應用程序,但是你可以通過多個方式使用廣播狀態來實現應用的需求。
結論
在本文中,我們通過學習一個應用程序的實例,來解釋 Apache Flink 的廣播狀態是什么,以及如何應用它來評估事件流上的動態模式,除此之外本文還討論了廣播狀態的 API,并展示了相關源代碼。
Via:https://flink.apache.org/2019/06/26/broadcast-state.html
作者?:Fabian Hueske
活動推薦
持續學習、和同行交流的機會來啦,由賈揚清助陣,阿里云計算平臺事業部、天池平臺、intel 聯合舉辦的首屆 Apache Flink 極客挑戰賽重磅來襲!
聚焦機器學習與計算性能兩大時下熱門領域,參與比賽,讓自己成為技術多面手,還有機會贏得 10W 獎金。大賽詳情請點擊下方圖片?
(點擊圖片即可查看大賽詳細信息)你也「在看」嗎?總結
以上是生活随笔為你收集整理的as点击发送广播_Apache Flink 中广播状态的实用指南的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mac+修改+ssh文件夹权限_用SSH
- 下一篇: excel概率密度函数公式_干货|利用e