ClickHouse:人群圈选业务的大杀器
什么是人群圈選
隨著數據時代的發展,各行各業數據平臺的體量越來越大,用戶個性化運營的訴求也越來越突出,用戶標簽系統,做為個性化千人千面運營的基礎服務,應運而生。如今,幾乎所有行業(如互聯網、游戲、教育等)都有實時精準營銷的需求。通過系統生成用戶畫像,在營銷時通過條件組合篩選用戶,快速提取目標群體,例如:
? 電商行業中,商家在運營活動前,需要根據活動的目標群體的特征,圈選出一批目標用戶進行廣告推送或進行活動條件的判斷。
? 游戲行業中,商家需要根據玩家的某些特征進行圈選,針對性地發放大禮包,提高玩家活躍度。
? 教育行業中,需要根據學生不同的特征,推送有針對性的習題,幫助學生查缺補漏。
? 搜索、門戶、視頻網站等業務中,根據用戶的關注熱點,推送不同的內容。
以電商平臺中一個典型的目標群體圈選場景為例,如服裝行業對其潛在客戶信息采集,打標,清洗后如下表:
(以上表結構中,第一列為用戶身份的唯一標識,往往作為主鍵,其他列均為標簽列。)
如公司想推出一款高端男性運動產品,則可能的圈選條件為:
1.男性,推出產品的受眾群體為男性。
2.運動愛好者,運動愛好者更有可能消費運動類產品。
3.一線城市,一線城市用戶相比于二三線城市用戶,可能更傾向于消費高端產品。
4....
從上述表結構(人群圈選典型表結構,且大都如此,第一列為用戶id,其余皆為標簽列)和查詢條件可以看出,人群圈選業務都面臨一些共同的痛點:
? 用戶標簽多、標簽豐富,標簽列可達成百甚至上千列。
? 數據量龐大,用戶數多,從而所需運算量也極大。
? 圈選條件組合多樣化,沒有固定索引可以優化,存儲空間占用極大。
? 性能要求高,圈選結果要求及時響應,過長的延時會造成營銷人群的不準確。
? 數據更新時效要求高,用戶畫像要求近實時的更新,過期的人群信息也將直接影響圈選的精準性。
針對以上痛點,本文將從原理層面深度分析,多角度對比講解如何使用ClickHouse搭建人群圈選系統,為何選擇ClickHouse,以及選用ClickHouse搭建人群圈選系統的優勢。
為什么選擇ClickHouse
本文以開ElasticSearch(ES)為例,僅針對人群圈選場景與ClickHouse做對比。開源版ES是一款高效的搜索分析引擎,利用其優秀的索引技術,可以完成各種復雜的條件組合和數據聚合運算。ClickHouse是最近比較火的一款開源列式存儲分析型數據庫,它最核心的特點就是極致存儲壓縮率和查詢性能,尤其擅長單個大寬表的查詢場景。因此細比兩者,相較與ClickHouse,ES雖具備人群圈選業務所需的必要能力,但仍有以下3方面不足:
成本方面:
開源ES的底層存儲使用lucene,主要包含行存儲(storefiled),列存儲(docvalues)和倒排索引(invertindex)。行存中_source字段用于控制doc原始數據的存儲。在寫入數據時,ES把doc原始數據的整個json結構體當做一個string,存儲為_source字段,因此_source字段對存儲占用量大且關閉_source將不支持update操作。同時,索引也是ES不可缺少的一部分,ES默認全列索引,雖可手動設置對特定的列取消索引,但取消索引的列將不可查詢。在人群圈選場景下,選取標簽過濾條件是任意的,多樣的,不斷變化的。對任意一條標簽列不做索引都是不現實的,因此針對成百上千列的大寬表,全列索引必然使得存儲成本翻倍。
ClickHouse是一款徹底的列式存儲數據庫,且ClickHouse的查詢不依賴索引,使用過程中也不強制構建索引,因此不需要保留額外的索引文件。同時ClickHouse存儲數據的副本數量靈活可配,可將使用成本降至最低。
數據更新與治理方面:
索引為ES帶來了高效的查詢性能,但是索引的構造過程是復雜的,耗時的。每一次索引的構建都需對全列數據進行掃描,排序來生成索引文件。而在人群圈選業務中,人群信息必然是不斷增長的。標簽的不斷更新將會使得ES不得不頻繁的重構索引,這將對ES的性能造成巨大的開銷 。
ClickHouse的查詢不依賴索引,使用過程中也不強制構建索引。因此對于新增數據,ClickHouse不涉及索引的更新與維護。
易用性方面:
開源ES缺少完備的sql支持,查詢請求的json格式復雜。同時ES對多條件過濾聚合的執行策略缺少優化,還以文章開頭的典型場景為例,圈出一款高端男性運動產品的受眾人群。可得如下sql:“SELECT user_id FROM whatever_table WHERE city_level = '一線城市' AND gender = '男性' AND is_like_sports = '是';”
針對以上sql,ES的執行會對3個標簽分別做3次索引掃描,之后再將3次掃描的結果做merge,流程如下圖所示
而ClickHouse的執行則更優雅一些。ClickHouse采用標準sql,語法簡單且功能強大。在執行where語句時,會自動優化形成prewhere分層執行,因此二次掃描將基于一次掃描的結果進行,執行流程如下圖所示:
顯而易見,針對復雜條件過濾的場景,ClickHouse對多條件篩選流程做出優化,掃描的數據量更小,性能也較ES而言更高效。
如何基于ClickHouse搭建人群圈選系統:
對比選型完成后,接下來講解如何基于ClickHouse搭建人群圈選系統,回顧文章開頭的業務描述和上一部分的典型sql(“SELECT user_id FROM whatever_table WHERE city_level = '一線城市' AND gender = '男性' AND is_like_sports = '是';”),再次總結人群圈選業務對數據庫能力的要求如下:
1.具備高效的批量數據導入性能。
2.具備處理頻繁,實時update的能力。
3.具備加列/減列的DDL能力。
4.可以指定任意列為過濾條件的高效查詢能力。
面對以上需求,ClickHouse如何使用才能在人群圈選場景下物盡其用,揚長避短?
insert代替update
首先要解決的是ClickHouse的異步update機制。ClickHouse對update的執行是低效的,ClickHouse內核中的MergeTree存儲一旦生成一個Data Part,這個Data Part就不可再更改了。所以從MergeTree存儲內核層面,ClickHouse就不擅長做數據更新刪除操作。ClickHouse的語法把Update操作也加入到了Alter Table的范疇中,它并不支持裸的Update操作。
ALTER TABLE [db.]table UPDATE column1 = expr1 [, ...] WHERE filter_expr;
當用戶執行一個如上的Update操作獲得返回時,ClickHouse內核其實只做了兩件事情:
1.檢查Update操作是否合法;
2.保存Update命令到存儲文件中,喚醒一個異步處理merge和mutation的工作線程;異步線程的工作流程極其復雜,總結其精髓描述如下:先查找到需要update的數據所在datapart,之后對整個datapart做掃描,更新需要變更的數據,然后再將數據重新落盤生成新的datapart,最后用新的datapart做替代并remove掉過期的datapart。
這就是ClickHouse對update指令的執行過程,可以看出,頻繁的update指令對于ClickHouse來說將是災難性的。
因此,我們使用insert語句代替update語句。當需要對某一指定user更新標簽時,就重新插入一條該user的數據,
如對表中07號用戶進行數據更新:
最終,每個user可能都存在多條記錄。針對人群圈選場景,同一user錯亂冗余的信息顯然對查詢結果產生誤導,無法滿足精準圈選的需求。接下來講解如何使用ClickHouse進行主鍵去重,即同一user,讓后insert進來的數據覆蓋掉已有的數據,實現update的效果。
選用AggregatingMergeTree表引擎
MergeTree是ClickHouse中最重要,最核心的存儲內核,MergeTree思想上與LSM-Tree相似,其實現原理復雜,不在此展開,因為一篇文章也難以講解清楚。本篇圍繞人群圈選場景,著重從功能層面描述如何在人群圈選場景下使用MergeTree的變種AggregatingMergeTree以及使用AggregatingMergeTree可實現的數據聚合效果。AggregatingMergeTree繼承自 MergeTree,存儲上和基礎的MergeTree其實沒有任何差異,而是在數據Merge的過程中加入了“額外的合并邏輯”, AggregatingMergeTree 會將相同主鍵的所有行(在一個數據片段內)替換為單個存儲一系列聚合函數狀態的行。以文章開頭部分的表結構為例,使用AggregatingMergeTree表引擎的建表語句如下:
CREATE TABLE IF NOT EXISTS whatever_table ON CLUSTER default (user_id UInt64,city_level SimpleAggregateFunction(anyLast, Nullable(Enum('一線城市' = 0, '二線城市' = 1, '三線城市' = 2, '四線城市' = 3))),gender SimpleAggregateFunction(anyLast, Nullable(Enum('女' = 0, '男' = 1))),interest_sports SimpleAggregateFunction(anyLast, Nullable(Enum('否' = 0, '是' = 1))),reg_date SimpleAggregateFunction(anyLast, Datetime),comment_like_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),last30d_share_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),user_like_consume_trend_type SimpleAggregateFunction(anyLast, Nullable(String)),province SimpleAggregateFunction(anyLast, Nullable(String)),last_access_version SimpleAggregateFunction(anyLast, Nullable(String)),others SimpleAggregateFunction(anyLast,Array(String)) )ENGINE = AggregatingMergeTree() partition by toYYYYMMDD(reg_date) ORDER BY user_id;就以上建標語句展開分析,AggregatingMergeTree會將除主鍵(user)外的其余列,配合anyLast函數,替換每行數據為一種預聚合狀態。其中anyLast聚合函數聲明聚合策略為保留最后一次的更新數據。
數據一致性保證
上一部分講述了如何針對人群圈選場景選擇表引擎和聚合函數,但是AggregatingMergeTree并不能保證任何時候的查詢都是聚合過后的結果,并且也沒有提供標志位用于查詢數據的聚合狀態與進度。因此,為了確保數據在查詢前處于已聚合的狀態,還需手動下發optimize指令強制聚合過程的執行。同時方便起見,可自行配置周期性optimize指令的下發。例如每10分鐘執行一次optimize指令。optimize的執行周期可在業務的實時性需求與計算資源之間做權衡。如數據量過大,optimize生效慢,可按partition級別并行下發做優化。optimize生效后即可實現去重邏輯。
Demo:
import java.sql.*; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.concurrent.TimeoutException;public class Main {private static final String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss";private static final SimpleDateFormat SIMPLE_DATE_FORMAT = new SimpleDateFormat(DATE_FORMAT);public static void main(String[] args) throws ClassNotFoundException, SQLException, InterruptedException, ParseException {String url = "your url";String username = "your username";String password = "your password";Class.forName("ru.yandex.clickhouse.ClickHouseDriver");String connectionStr = "jdbc:clickhouse://" + url + ":8123";try {Connection connection = DriverManager.getConnection(connectionStr, username, password);Statement stmt = connection.createStatement();// 創建local表String createLocalTableDDL = "CREATE TABLE IF NOT EXISTS whatever_table ON CLUSTER default " +"(user_id UInt64, " +"city_level SimpleAggregateFunction(anyLast, Nullable(Enum('一線城市' = 0, '二線城市' = 1, '三線城市' = 2, '四線城市' = 3))), " +"gender SimpleAggregateFunction(anyLast, Nullable(Enum('女' = 0, '男' = 1)))," +"interest_sports SimpleAggregateFunction(anyLast, Nullable(Enum('否' = 0, '是' = 1)))," +"reg_date SimpleAggregateFunction(anyLast, Datetime)) " +"comment_like_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),\n" +"last30d_share_cnt SimpleAggregateFunction(anyLast, Nullable(UInt32)),\n" +"user_like_consume_trend_type SimpleAggregateFunction(anyLast, Nullable(String)),\n" +"province SimpleAggregateFunction(anyLast, Nullable(String)),\n" +"last_access_version SimpleAggregateFunction(anyLast, Nullable(String)),\n" +"others SimpleAggregateFunction(anyLast, Array(String)),\n" +"ENGINE = AggregatingMergeTree() PARTITION by toYYYYMM(reg_date) ORDER BY user_id;";stmt.execute(createLocalTableDDL);System.out.println("create local table done.");// 創建distributed表String createDistributedTableDDL = "CREATE TABLE IF NOT EXISTS whatever_table_dist ON cluster default " +"AS default.whatever_table " +"ENGINE = Distributed(default, default, whatever_table, intHash64(user_id));";stmt.execute(createDistributedTableDDL);System.out.println("create distributed table done");// 插入mock數據String insertSQL = "INSERT INTO whatever_table(\n" +"\tuser_id,\n" +"\tcity_level,\n" +"\tgender,\n" +"\tinterest_sports,\n" +"\treg_date,\n" +"\tcomment_like_cnt,\n" +"\tlast30d_share_cnt,\n" +"\tuser_like_consume_trend_type,\n" +"\tprovince,\n" +"\tlast_access_version,\n" +"\tothers\n" +"\t)SELECT\n" +" number as user_id,\n" +" toUInt32(rand(11)%4) as city_level,\n" +" toUInt32(rand(30)%2) as gender,\n" +" toUInt32(rand(28)%2) as interest_sports,\n" +" (toDateTime('2020-01-01 00:00:00') + rand(1)%(3600*24*30*4)) as reg_date,\n" +" toUInt32(rand(15)%10) as comment_like_cnt,\n" +" toUInt32(rand(16)%10) as last30d_share_cnt,\n" +"randomPrintableASCII(64) as user_like_consume_trend_type,\n" +"randomPrintableASCII(64) as province,\n" +"randomPrintableASCII(64) as last_access_version,\n" +"[randomPrintableASCII(64)] as others\n" +" FROM numbers(100000);\n";stmt.execute(insertSQL);System.out.println("Mock data and insert done.");System.out.println("Select count(user_id)...");ResultSet rs = stmt.executeQuery("select count(user_id) from whatever_table_dist");while (rs.next()) {int count = rs.getInt(1);System.out.println("user_id count: " + count);}// 數據合并String optimizeSQL = "OPTIMIZE table whatever_table final;";// 如數據合并時間過長,可在partition級別并行執行String optimizeByPartitionSQL = "OPTIMIZE table whatever_table PARTITION 202001 final;";try {stmt.execute(optimizeByPartitionSQL);}catch (SQLTimeoutException e){// 查看merge進展// String checkMergeSQL = "select * from system.merges where database = 'default' and table = 'whatever_table';";Thread.sleep(60*1000);}// 人群圈選(city_level='一線城市',gender='男性',interest_sports='是', reg_date<='2020-01-31 23:59:59')String selectSQL = "SELECT user_id from whatever_table_dist where city_level=0 and gender=1 and interest_sports=1 and reg_date <= NOW();";rs = stmt.executeQuery(selectSQL);while (rs.next()) {int user_id = rs.getInt(1);System.out.println("Got suitable user: " + user_id);}} catch (Exception e) {e.printStackTrace();}} }寫在最后
阿里云已經推出了ClickHouse的云托管產品,產品首頁地址:云數據庫ClickHouse,歡迎大家試用,對Clickhouse感興趣的也可加入Clickhouse技術交流群。
原文鏈接:https://developer.aliyun.com/article/781084?
版權聲明:本文內容由阿里云實名注冊用戶自發貢獻,版權歸原作者所有,阿里云開發者社區不擁有其著作權,亦不承擔相應法律責任。具體規則請查看《阿里云開發者社區用戶服務協議》和《阿里云開發者社區知識產權保護指引》。如果您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將立刻刪除涉嫌侵權內容。總結
以上是生活随笔為你收集整理的ClickHouse:人群圈选业务的大杀器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何跑通第一个 DataStream 作
- 下一篇: 如何将实时计算 Flink 与自身环境打