Flink+Hologres亿级用户实时UV精确去重最佳实践
UV、PV計算,因為業務需求不同,通常會分為兩種場景:
- 離線計算場景:以T+1為主,計算歷史數據
- 實時計算場景:實時計算日常新增的數據,對用戶標簽去重
針對離線計算場景,Hologres基于RoaringBitmap,提供超高基數的UV計算,只需進行一次最細粒度的預聚合計算,也只生成一份最細粒度的預聚合結果表,就能達到亞秒級查詢。具體詳情可以參見往期文章>>Hologres如何支持超高基數UV計算(基于RoaringBitmap實現)
對于實時計算場景,可以使用Flink+Hologres方式,并基于RoaringBitmap,實時對用戶標簽去重。這樣的方式,可以較細粒度的實時得到用戶UV、PV數據,同時便于根據需求調整最小統計窗口(如最近5分鐘的UV),實現類似實時監控的效果,更好的在大屏等BI展示。相較于以天、周、月等為單位的去重,更適合在活動日期進行更細粒度的統計,并且通過簡單的聚合,也可以得到較大時間單位的統計結果。
主體思想
方案最佳實踐
1.創建相關基礎表
1)創建表uid_mapping為uid映射表,用于映射uid到32位int類型。
- RoaringBitmap類型要求用戶ID必須是32位int類型且越稠密越好(即用戶ID最好連續)。常見的業務系統或者埋點中的用戶ID很多是字符串類型或Long類型,因此需要使用uid_mapping類型構建一張映射表。映射表利用Hologres的SERIAL類型(自增的32位int)來實現用戶映射的自動管理和穩定映射。
- 由于是實時數據, 設置該表為行存表,以提高Flink維表實時JOIN的QPS。
2)創建表dws_app為基礎聚合表,用于存放在基礎維度上聚合后的結果。
- 使用RoaringBitmap前需要創建RoaringBitmap extention,同時也需要Hologres實例為0.10版本
- 為了更好性能,建議根據基礎聚合表數據量合理的設置Shard數,但建議基礎聚合表的Shard數設置不超過計算資源的Core數。推薦使用以下方式通過Table Group來設置Shard數
- 相比離線結果表,此結果表增加了時間戳字段,用于實現以Flink窗口周期為單位的統計。結果表DDL如下:
2.Flink實時讀取數據并更新dws_app基礎聚合表
完整示例源碼請見alibabacloud-hologres-connectors examples
1)Flink 流式讀取數據源(DataStream),并轉化為源表(Table)
//此處使用csv文件作為數據源,也可以是kafka等 DataStreamSource odsStream = env.createInput(csvInput, typeInfo); // 與維表join需要添加proctime字段,詳見https://help.aliyun.com/document_detail/62506.html Table odsTable = tableEnv.fromDataStream( odsStream, $("uid"), $("country"), $("prov"), $("city"), $("ymd"), $("proctime").proctime()); // 注冊到catalog環境 tableEnv.createTemporaryView("odsTable", odsTable);2)將源表與Hologres維表(uid_mapping)進行關聯
其中維表使用insertIfNotExists參數,即查詢不到數據時自行插入,uid_int32字段便可以利用Hologres的serial類型自增創建。
// 創建Hologres維表,其中nsertIfNotExists表示查詢不到則自行插入 String createUidMappingTable = String.format( "create table uid_mapping_dim(" + " uid string," + " uid_int32 INT" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," //Hologres DB名 + " 'tablename' = '%s',"//Hologres 表名 + " 'username' = '%s'," //當前賬號access id + " 'password' = '%s'," //當前賬號access key + " 'endpoint' = '%s'," //Hologres endpoint + " 'insertifnotexists'='true'" + ")", database, dimTableName, username, password, endpoint); tableEnv.executeSql(createUidMappingTable); // 源表與維表join String odsJoinDim = "SELECT ods.country, ods.prov, ods.city, ods.ymd, dim.uid_int32" + " FROM odsTable AS ods JOIN uid_mapping_dim FOR SYSTEM_TIME AS OF ods.proctime AS dim" + " ON ods.uid = dim.uid"; Table joinRes = tableEnv.sqlQuery(odsJoinDim);3)將關聯結果轉化為DataStream,通過Flink時間窗口處理,結合RoaringBitmap進行聚合
DataStream<Tuple6<String, String, String, String, Timestamp, byte[]>> processedSource = source // 篩選需要統計的維度(country, prov, city, ymd) .keyBy(0, 1, 2, 3) // 滾動時間窗口;此處由于使用讀取csv模擬輸入流,采用ProcessingTime,實際使用中可使用EventTime .window(TumblingProcessingTimeWindows.of(Time.minutes(5))) // 觸發器,可以在窗口未結束時獲取聚合結果 .trigger(ContinuousProcessingTimeTrigger.of(Time.minutes(1))) .aggregate( // 聚合函數,根據key By篩選的維度,進行聚合 new AggregateFunction< Tuple5<String, String, String, String, Integer>, RoaringBitmap, RoaringBitmap>() { public RoaringBitmap createAccumulator() { return new RoaringBitmap(); } public RoaringBitmap add( Tuple5<String, String, String, String, Integer> in, RoaringBitmap acc) { // 將32位的uid添加到RoaringBitmap進行去重 acc.add(in.f4); return acc; } public RoaringBitmap getResult(RoaringBitmap acc) { return acc; } public RoaringBitmap merge( RoaringBitmap acc1, RoaringBitmap acc2) { return RoaringBitmap.or(acc1, acc2); } }, //窗口函數,輸出聚合結果 new WindowFunction< RoaringBitmap, Tuple6<String, String, String, String, Timestamp, byte[]>, Tuple, TimeWindow>() { public void apply( Tuple keys, TimeWindow timeWindow, Iterable<RoaringBitmap> iterable, Collector< Tuple6<String, String, String, String, Timestamp, byte[]>> out) throws Exception { RoaringBitmap result = iterable.iterator().next(); // 優化RoaringBitmap result.runOptimize(); // 將RoaringBitmap轉化為字節數組以存入Holo中 byte[] byteArray = new byte[result.serializedSizeInBytes()]; result.serialize(ByteBuffer.wrap(byteArray)); // 其中 Tuple6.f4(Timestamp) 字段表示以窗口長度為周期進行統計,以秒為單位 out.collect( new Tuple6<>( keys.getField(0), keys.getField(1), keys.getField(2), keys.getField(3), new Timestamp( timeWindow.getEnd() / 1000 * 1000), byteArray)); } });4)寫入結果表
需要注意的是,Hologres中RoaringBitmap類型在Flink中對應Byte數組類型
// 計算結果轉換為表 Table resTable = tableEnv.fromDataStream( processedSource, $("country"), $("prov"), $("city"), $("ymd"), $("timest"), $("uid32_bitmap")); // 創建Hologres結果表, 其中Hologres的RoaringBitmap類型通過Byte數組存入 String createHologresTable = String.format( "create table sink(" + " country string," + " prov string," + " city string," + " ymd string," + " timetz timestamp," + " uid32_bitmap BYTES" + ") with (" + " 'connector'='hologres'," + " 'dbname' = '%s'," + " 'tablename' = '%s'," + " 'username' = '%s'," + " 'password' = '%s'," + " 'endpoint' = '%s'," + " 'connectionSize' = '%s'," + " 'mutatetype' = 'insertOrReplace'" + ")", database, dwsTableName, username, password, endpoint, connectionSize); tableEnv.executeSql(createHologresTable); // 寫入計算結果到dws表 tableEnv.executeSql("insert into sink select * from " + resTable);3.數據查詢
查詢時,從基礎聚合表(dws_app)中按照查詢維度做聚合計算,查詢bitmap基數,得出group by條件下的用戶數
- 查詢某天內各個城市的uv
- 查詢某段時間內各個省份的uv
原文鏈接:https://developer.aliyun.com/article/784354?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的Flink+Hologres亿级用户实时UV精确去重最佳实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 云上创新 | 阿里云边缘云场景化商业实践
- 下一篇: 阿里云资深技术专家李克畅谈边缘云计算趋势