使用Blink CEP实现差值聚合计算
使用Blink SQL+UDAF實現差值聚合計算介紹了如何使用Blink SQL+UDAF實現實時流上的差值聚合計算,后來在與@付典就業務需求和具體實現方式進行探討時,付典提出通過CEP實現的思路和方法。
本文介紹通過CEP實現實時流上的差值聚合計算。
感謝@付典在實現過程中的指導。筆者水平有限,若有紕漏,請批評指出。
一、客戶需求
電網公司每天采集各個用戶的電表數據(格式如下表),其中data_date為電表數據上報時間,cons_id為電表id,r1為電表度數,其他字段與計算邏輯無關,可忽略。為了后續演示方便,僅輸入cons_id=100000002的數據。
| 101 | 20190716 | 100000002 | 35401 | 13.76 |
| 101 | 20190717 | 100000002 | 35401 | 14.12 |
| 101 | 20190718 | 100000002 | 35401 | 16.59 |
| 101 | 20190719 | 100000002 | 35401 | 18.89 |
表1:輸入數據
電網公司希望通過實時計算(Blink)對電表數據處理后,每天得到每個電表最近兩天(當天和前一天)的差值數據,結果類似如下表:
| 100000002 | 20190717 | 0.36 |
| 100000002 | 20190718 | 2.47 |
| 100000002 | 20190719 | 2.3 |
表2:期望的輸出數據
二、需求分析
根據業務需求以及CEP跨事件模式匹配的特性,定義兩個CEP事件e1和e2,輸出e2.r1-e1.r1即可得到差值。
三、CEP開發及測試結果
參考復雜事件處理(CEP)語句,CEP代碼如下:
CREATE TABLE input_dh_e_mp_read_curve (`no` VARCHAR,data_date VARCHAR,cons_id VARCHAR,org_no VARCHAR,r1 DOUBLE,ts as TO_TIMESTAMP(concat(data_date,'000000'),'yyyyMMddHHmmss'),WATERMARK wk FOR ts as withOffset(ts, 2000) ) WITH (type = 'datahub',endPoint = 'http://dh-cn-shanghai.aliyun-inc.com',roleArn='acs:ram::XXX:role/aliyunstreamdefaultrole',project = 'jszc_datahub',topic = 'input_dh_e_mp_read_curve' );CREATE TABLE data_out(cons_id varchar,data_date varchar,subDegreeR1 DOUBLE )with(type = 'print' );insert into data_out selectcons_id,data_date,subDegreeR1 from input_dh_e_mp_read_curve MATCH_RECOGNIZE(PARTITION BY cons_idORDER BY tsMEASURESe2.data_date as data_date,e2.r1 - e1.r1 as subDegreeR1ONE ROW PER MATCHAFTER MATCH SKIP TO NEXT ROWPATTERN(e1 e2)DEFINEe1 as TRUE,e2 as TRUE );由于使用了print connector,從對應的sink的taskmanager.out日志中可以查看到輸出如下:
task-1> (+)100000002,20190717,0.35999999999999943 task-1> (+)100000002,20190718,2.4700000000000006對比期望輸出(表2),20190717和20190718兩個窗口的數據均正確,表明業務邏輯正確,但此輸出與期望輸出有少許差異:
(1)20190719的數據沒有輸出,這是因為我們設置了watermark,測試環境下20190719之后沒有數據進來觸發20190719對應的窗口的結束。
四、其他說明
1、對比使用Blink SQL+UDAF實現差值聚合計算(1),我們可以看出使用CEP開發代碼非常簡潔,所以在跨事件處理的情況下CEP還是非常的合適。從另外一個方面講,同樣的需求有不同的實現方式,所以融會貫通Blink SQL中的各種語法,利用更合適的語法來實現業務需求,將可能大大提升工作效率和業務性能。
2、在實現本案例時,筆者發現使用CEP時有如下需要注意的地方:
(1)partiton by里的字段(如本案的cons_id),默認會帶到輸出里,若同時在MEASURES中定義,則可能會報類似如下錯誤:
(2)define及其內容必須定義,否則前端頁面提示類似如下錯誤:
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的使用Blink CEP实现差值聚合计算的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: MaxCompute Mars 完全指南
- 下一篇: 阿里云峰会|数据库也能自动驾驶?DAS全