利用blink+MQ实现流计算中的超时统计问题
案例與解決方案匯總頁:
阿里云實時計算產品案例&解決方案匯總
一. 背景介紹
菜鳥的物流數(shù)據(jù)本身就有鏈路復雜、實操節(jié)點多、匯總維度多、考核邏輯復雜的特點,對于實時數(shù)據(jù)的計算存在很大挑戰(zhàn)。經過倉配ETL團隊的努力,目前倉配實時數(shù)據(jù)已覆蓋了絕大多數(shù)場景,但是有這樣一類特殊指標:“晚點超時指標”(例如:出庫超6小時未攬收的訂單量),仍存在實時匯總計算困難。原因在于:流計算是基于消息觸發(fā)計算的,若沒有消息到達到則無法計算,這類指標恰好是要求在指定的超時時間計算出有多少未達到的消息。然而,這類指標對于指導實操有著重要意義,可以告知運營小二當前多少訂單積壓在哪些作業(yè)節(jié)點,應該督促哪些實操人員加快作業(yè),這對于物流的時效KPI達成至關重要。
之前的方案是:由產品前端根據(jù)用戶的請求查詢OLAP數(shù)據(jù)庫,由OLAP從明細表出結果。大促期間,用戶請求量大,加之數(shù)據(jù)量大,故對OLAP的明細查詢造成了比較大的壓力。
二. 解決方案
2.1 ? 問題定義
“超時晚點指標” 是指,一筆訂單的兩個相鄰的實操節(jié)點node_n-1 、node_n 的完成時間 time_n-1、time_n,
當滿足 : time_n is null ?&& current_time - time_n-1 > kpi_length 時,time_flag_n 為 true , 該筆訂單計入 超時晚點指標的計數(shù)。
如下圖,有一筆訂單其 node_1 為出庫節(jié)點,時間為time_1 = '2018-06-18 00:00:00' ,運營對出庫與攬收之間考核的時長 kpi_length = 6h, 那么當前自然時間 current_time > '2018-06-18 06:00:00' 時,且node_2攬收節(jié)點的time_2 為null,則該筆訂單的 timeout_flag_2 = true , “出庫超6小時未攬收訂單量” 加1。由于要求time_2 為null,即要求沒有攬收消息下發(fā)的情況下讓流計算做匯總值更新,這違背了流計算基于消息觸發(fā)的基本原理,故流計算無法直接算出這種“超時晚點指標”。
決問題的基本思路是:在考核時刻(即 kpi_time = time_n-1+kpi_length )“制造”出一條消息下發(fā)給流計算,觸發(fā)匯總計算。繼續(xù)上面的例子:在考核時刻“2018-06-18 06:00:00”利用MetaQ定時消息功能“制造”出一條消息下發(fā)給流計算匯總任務,觸發(fā)對該筆訂單的 time_out_flag_2 的判斷,增加匯總計數(shù)。同時,還利用 Blink 的Retraction 機制,當time_2 由null變成有值的時候,Blink 可以對 time_out_flag_2 更新,重新計數(shù)。
2.2 方案架構
如上圖所示:
Step1: ?Blink job1 接收來自上游系統(tǒng)的訂單數(shù)據(jù),做清洗加工,生成訂單明細表:dwd_ord_ri,利用TT下發(fā)給Blink job2 和 Blink job3。
Step2:Blink job2 收到 dwd_ord_ri后,對每筆訂單算出考核時刻 kpi_time = time_n-1+kpi_length,作為MetaQ消息的“TIMER_DELIVER_MS” 屬性,寫入MetaQ。MetaQ的定時消息功能,可以根據(jù)用戶寫入的TIMER_DELIVER_MS 在指定時刻下發(fā)給消費者,即上圖中的Blink job3。
Step3:Blink job3 接收 TT、MetaQ 兩個消息源,先做Join,再對time_flag判斷,最后做Aggregate計算。同一筆訂單,dwd_ord_ri、timing_msg任意一個消息到來,都會觸發(fā)join,time_flag判斷,aggregate重新計算一遍,Blink的Retraction可對結果進行實時更新。
2.3 實現(xiàn)細節(jié)
本方案根據(jù)物流場景中多種實操節(jié)點、多種考核時長的特點,從Blink SQL代碼 和 自定義Sink兩方面做了特殊設計,從而實現(xiàn)了靈活配置、高效開發(fā)。
(1) Blink job2 --- 生成定時消息
關鍵Blink SQL 代碼如下。約定每條record的第一個字段為投遞時間列表,即MetaQ向消費者下發(fā)消息的時刻List,也就是上面所說的多個考核時刻。第二個字段為保序字段,比如在物流場景中經常以訂單code、運單號作為保序主鍵。該代碼實現(xiàn)了對每個出庫的物流訂單,根據(jù)其出庫時間,向后延遲6小時(21600000毫秒)、12小時(43200000毫秒)、24小時(86400000毫秒)由MetaQ向消費者下發(fā)三個定時消息。
(2) Blink 自定義Sink --- MetaQTimingMsg Sink
Blink的當前版本還不支持 MetaQ的定時消息功能的Sink,故利用 Blink的自定義Sink功能,并結合菜鳥物流數(shù)據(jù)的特點開發(fā)了MetaQTimingMsg Sink。關鍵代碼如下(實現(xiàn) writeAddRecord 方法)。
(3)Blink job3 --- 匯總計算
關鍵Blink SQL 代碼如下,統(tǒng)計了每個倉庫的“出庫超6小時未攬收物理訂單”、“出庫超12小時未攬收物理訂單”、“出庫超24小時未攬收物理訂單”的匯總值。代碼中使用了“stringLast()”函數(shù)處理來自dwd_ord_ri的每條消息,以取得每個物流訂單的最新出庫攬收情況,利用Blink Retraction機制,更新匯總值。
三. ?方案優(yōu)勢
3.1 配置靈活
我們從“Blink SQL 代碼” 和“自定義MetaQ” 兩個方面設計,用戶可以根據(jù)具體的業(yè)務場景,在Blink SQL的一個view里就能實現(xiàn)多種節(jié)點多種考核時間的定時消息生成,而不是針對每一個實操節(jié)點的每一種定時指標都要寫一個view,這樣大大節(jié)省了代碼量,提升了開發(fā)效率。例如對于倉庫節(jié)點的出庫超6小時未攬收、超12小時未攬收、超24小時未攬收,這三個指標利用上述方案,僅需在Blink job2的中metaq_timing_msg的第一個字段deliver_time_list中拼接三個kpi_length,即6小時、12小時、24小時為一個字符串即可,由MetaQTimingMsg Sink自動拆分成三條消息下發(fā)給MetaQ。對于不同的節(jié)點的考核,僅需在node_name,node_time填寫不同的節(jié)點名稱和節(jié)點實操時間即可。
3.2 主鍵保序
如2.3節(jié)所述,自定義的Sink中 實現(xiàn)了MetaQ的 MessageQueueSelector 接口的 select() 方法,同時在Blink SQL 生成的MetaQ消息默認第二個字段為保序主鍵字段。從而,可以根據(jù)用戶自定義的主鍵,保證同一主鍵的所有消息放在同一個通道內處理,從而保證按主鍵保序,這對于流計算非常關鍵,能夠實現(xiàn)數(shù)據(jù)的實時準確性。
3.3 性能優(yōu)良
讓專業(yè)的團隊做專業(yè)的事。個人認為,這種大規(guī)模的消息存儲、消息下發(fā)的任務本就應該交給“消息中間件”來處理,這樣既可以做到計算與消息存儲分離,也可以方便消息的管理,比如針對不同的實操節(jié)點,我們還可以定義不同的MetaQ的tag。
另外,正如2.2節(jié)所述,我們對定時消息量做了優(yōu)化??紤]到一筆訂單的屬性字段或其他節(jié)點更新會下發(fā)多條消息,我們利用了Blink的FIRST_VALUE函數(shù),在Blink job2中同一筆訂單的的一種考核指標只下發(fā)一條定時消息,大大減少了消息量,減輕了Blink的寫壓力,和MetaQ的存儲。
四. 自我介紹
馬汶園 ? ?阿里巴巴 -菜鳥網(wǎng)絡—數(shù)據(jù)部 ? ? ?數(shù)據(jù)工程師
菜鳥倉配實時研發(fā)核心成員,主導多次倉配大促實時數(shù)據(jù)研發(fā),對利用Blink的原理與特性解決物流場景問題有深入思考與理解。
?
原文鏈接
本文為云棲社區(qū)原創(chuàng)內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的利用blink+MQ实现流计算中的超时统计问题的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 结合实际场景谈一谈微服务配置
- 下一篇: 阿里云不做SaaS、要练好内功被集成,发