视频直播:实时数据可视化分析
作者:spiderwu,騰訊 CSIG 高級工程師
1 解決方案描述
1.1 概述
本方案結合騰訊云 CKafka、流計算 Oceanus、私有網絡 VPC、商業智能分析 BI 等,對視頻直播行業數字化運營進行實時可視化分析。分析指標包含觀看直播人員的地區分布、各級別會員統計、各模塊打賞禮物情況、在線人數等。
視頻直播場景
1.2 方案架構及優勢
根據以上視頻直播場景,設計了如下架構圖:
架構圖涉及產品列表:
流計算 Oceanus
私有網絡 VPC
消息隊列 CKafka
云數據庫 MySQL
EMR 集群 HBase 組件
商業智能分析服務
2 前置準備
購買并創建相應的大數據組件。
2.1 創建 VPC 私有網絡
私有網絡是一塊您在騰訊云上自定義的邏輯隔離網絡空間,在構建 MySQL、EMR,ClickHouse 集群等服務時選擇的網絡必須保持一致,網絡才能互通。否則需要使用對等連接、VPN 等方式打通網絡。頁面地址:https://console.cloud.tencent.com/vpc/vpc?rid=8
2.2 創建 Oceanus 集群
流計算 Oceanus 服務兼容原生的 Flink 任務。在 Oceanus 控制臺的【集群管理】->【新建集群】頁面創建集群,選擇地域、可用區、VPC、日志、存儲,設置初始密碼等。VPC 及子網使用剛剛創建好的網絡。創建完后 Flink 的集群如下:
Oceanus集群2.3 創建消息隊列 Ckafka
消息隊列 CKafka(Cloud Kafka)是基于開源 Apache Kafka 消息隊列引擎,提供高吞吐性能、高可擴展性的消息隊列服務。消息隊列 CKafka 完美兼容 Apache kafka 0.9、0.10、1.1、2.4、2.8 版本接口,在性能、擴展性、業務安全保障、運維等方面具有超強優勢,讓您在享受低成本、超強功能的同時,免除繁瑣運維工作。頁面地址:https://cloud.tencent.com/product/ckafka
2.2.1 創建 Ckafka 集群
注意私有網絡和子網選擇之前創建的網絡和子網
Kafka集群2.2.2 創建 topic
創建topic
2.2.3 模擬發送數據到 topic
1)kafka 客戶端
進入同子網的 CVM 下,啟動 kafka 客戶端,模擬發送數據,具體操作文檔參考官網:
https://cloud.tencent.com/document/product/597/56840
2)使用腳本發送
腳本一:Java 參考地址:https://cloud.tencent.com/document/product/597/54834
腳本二:Python 腳本生成模擬數據:
#!/usr/bin/python3 #?首次使用該腳本,需?"pip3?install?kafka"?安裝kafka模塊 import?json import?random import?time from?kafka?import?KafkaProducerTIME_FORMAT?=?"%Y-%m-%d?%H:%M:%S" PROVINCES?=?["北京",?"廣東",?"山東",?"江蘇",?"河南",?"上海",?"河北",?"浙江",?"香港","陜西",?"湖南",?"重慶",?"福建",?"天津",?"云南",?"四川",?"廣西",?"安徽","海南",?"江西",?"湖北",?"山西",?"遼寧",?"臺灣",?"黑龍江",?"內蒙古","澳門",?"貴州",?"甘肅",?"青海",?"新疆",?"西藏",?"吉林",?"寧夏"]broker_lists?=?['172.28.28.13:9092'] topic_live_gift_total?=?'live_gift_total' topic_live_streaming_log?=?'live_streaming_log'producer?=?KafkaProducer(bootstrap_servers=broker_lists,value_serializer=lambda?m:?json.dumps(m).encode('ascii'))#?模擬幾天前,幾小時前的數據 pre_day_count?=?0 pre_hour_count?=?0 hour_unit?=?3600 day_unit?=?3600?*?24def?generate_data_live_gift_total():#?construct?timeupdate_time?=?time.time()?-?day_unit?*?pre_day_countupdate_time_str?=?time.strftime(TIME_FORMAT,?time.localtime(update_time))create_time?=?update_time?-?hour_unit?*?pre_hour_countcreate_time_str?=?time.strftime(TIME_FORMAT,?time.localtime(create_time))results?=?[]for?_?in?range(0,?10):user_id?=?random.randint(2000,?4000)random_gift_type?=?random.randint(1,?10)random_gift_total?=?random.randint(1,?100)msg_kv?=?{"user_id":?user_id,?"gift_type":?random_gift_type,"gift_total_amount":?random_gift_total,"create_time":?create_time_str,?"update_time":?update_time_str}results.append(msg_kv)return?resultsdef?generate_live_streaming_log():#?construct?timeupdate_time?=?time.time()?-?day_unit?*?pre_day_countleave_time_str?=?time.strftime(TIME_FORMAT,?time.localtime(update_time))create_time?=?update_time?-?hour_unit?*?pre_hour_countcreate_time_str?=?time.strftime(TIME_FORMAT,?time.localtime(create_time))results?=?[]for?_?in?range(0,?10):user_id?=?random.randint(2000,?4000)random_province?=?random.randint(0,?len(PROVINCES)?-?1)province_name?=?PROVINCES[random_province]grade?=?random.randint(1,?5)msg_kv?=?{"user_id":?user_id,?"ip":?"123.0.0."?+?str(user_id?%?255),"room_id":?20210813,?"arrive_time":?create_time_str,"create_time":?create_time_str,?"leave_time":?leave_time_str,"region":?1122,?"grade":?(user_id?%?5?+?1),?"province":?province_name}results.append(msg_kv)return?resultsdef?send_data(topic,?msgs):count?=?0#?produce?asynchronouslyfor?msg?in?msgs:import?timetime.sleep(1)count?+=?1producer.send(topic,?msg)print("?send?%d?data...\n?%s"?%?(count,?msg))producer.flush()if?__name__?==?'__main__':count?=?1while?True:time.sleep(60)#for?_?in?range(count):msg_live_stream_logs?=?generate_live_streaming_log()send_data(topic_live_streaming_log,?msg_live_stream_logs)msg_topic_live_gift_totals?=?generate_data_live_gift_total()send_data(topic_live_gift_total,?msg_topic_live_gift_totals)2.4 創建 EMR 集群
EMR 是云端托管的彈性開源泛 Hadoop 服務,支持 Spark、HBase、Presto、Flink、Druid 等大數據框架,本次示例主要需要使用 Flume、Hive、YARN、HUE、Oozie 組件。頁面地址https://console.cloud.tencent.com/emr
1)在 EMR 集群中安裝 HBase 組件。
HBase組件2)如果生產環境,服務器配置可根據實際情況選擇,示例中選擇了低配服務器,網絡需要選擇之前創建好的 VPC 網絡,始終保持服務組件在同一 VPC 下。
網絡選擇3)進入 HBase Master 節點
HBaseMaster節點4)點擊登錄進入服務器
5)創建 Hbase 表
#?進入HBase命令 [root@172~]#?hbase?shell #?建表語句 create?‘dim_hbase’,?‘cf’2.5 創建云數據庫 MySQL
云數據庫 MySQL(TencentDB for MySQL)是騰訊云基于開源數據庫 MySQL 專業打造的高性能分布式數據存儲服務,讓用戶能夠在云中更輕松地設置、操作和擴展關系數據庫。頁面地址:https://console.cloud.tencent.com/cdb
新建 MySQL 服務的頁面需要注意選擇的網絡是之前創建好的。
MySQL創建創建完 MySQL 服務后,需要修改 binlog 參數,如圖修改為 FULL(默認值為 MINIMAL)
mysql修改參數修改完參數后,登陸 MySQL 創建示例所需要的數據庫和數據庫表。
1) 登陸 MySQL 云數據庫
mysql登錄2) 新建數據庫
打開 SQL 窗口或可視化頁面創建數據庫和表
CREATE?DATABASE?livedb;?????--創建數據庫列表2.6 創建商業智能分析
商業智能分析(Business Intelligence,BI)支持自服務數據準備、探索式分析和企業級管控,是新一代的敏捷自助型 BI 服務平臺。只需幾分鐘,您就可以在云端輕松自如地完成數據分析、業務數據探查、報表制作等一系列數據可視化操作。便捷的拖拉拽式交互操作方式,讓您無需依賴 IT 人員,無需擔心試錯成本,快速洞察數據背后的關聯、趨勢和邏輯。
頁面地址:https://cloud.tencent.com/product/bi
2.6.1 購買商業智能分析
需要主賬號購買資源,購買時需根據創建的子賬號數來進行購買。
子用戶提出申請
主賬號審核通過。并給子用戶授予添加數據源,創建數據集,查看報告的權限。
2.6.2 添加 MySQL 數據源
(這里選用開啟外網方式連接,更多連接方式見官方文檔:https://cloud.tencent.com/document/product/590/19294)
1) 打開購買的 MySQL 實例,開啟外網
mysql開啟外網2)將 SaaS BI(119.29.66.144:3306)添加到 MySQL 數據庫安全組
添加安全組1
添加安全組2
注意添加的是 MySQL 3306 端口,不是外網映射的端口。
添加安全組3
3)創建 MySQL 賬戶并配置權限
創建賬戶,并設置賬號密碼,注意主機 IP 設置為%
創建賬戶1創建賬戶2
設置賬號權限:
設置權限1設置權限2
4) 進入智能商業分析,連接 MySQL 數據庫。添加數據源->MySQL,填寫完成后點擊測試連接。
3 方案實現
接下來通過案例為您介紹如何利用流計算服務 Oceanus 實現視頻直播數字化運營的實時可視化數據處理與分析。
3.1 解決方案
3.1.1 業務目標
這里只列取以下 3 種統計指標:
全站觀看直播用戶分布
禮物總和統計
各模塊禮物統計 3.1.2 源數據格式
事件 log:live_streaming_log(topic)
Ckafka 內部采用 json 格式存儲,展現出來的數據如下所示:
{ 'user_id':?3165 ,?'ip':?'123.0.0.105' ,?'room_id':?20210813 ,?'arrive_time':?'2021-08-16?09:48:01' ,?'create_time':?'2021-08-16?09:48:01' ,?'leave_time':?'2021-08-16?09:48:01' ,?'region':?1122 ,?'grade':?1 ,?'province':?'浙江' }禮物記錄:live_gift_log(topic 名)
{'user_id':?3994,?'gift_type':?3,?'gift_total_amount':?28,?'room_id':?20210813,?'ip':?'123.0.0.105',?'create_time':?'2021-08-16?09:46:51',?'update_time':?'2021-08-16?09:46:51' }模塊記錄表:live_module_roomid(Hbase 維表)
3.1.2Oceanus SQL 作業編寫
全網觀看直播用戶分布(需提前在 MySQL 建表)
1、定義 source
CREATE?TABLE?`live_streaming_log_source?`?(`user_id`???????BIGINT,`ip`????????????VARCHAR,`room_id`???????BIGINT,`arrive_time`???TIMESTAMP,`leave_time`????TIMESTAMP,`create_time`???TIMESTAMP,`region_code`???INT,`grade`?????????INT,`province`??????VARCHAR)?WITH?('connector'?=?'kafka','topic'?=?'live_streaming_log','scan.startup.mode'?=?'earliest-offset','properties.bootstrap.servers'?=?'172.28.28.13:9092','properties.group.id'?=?'joylyu-consumer-2','format'?=?'json','json.ignore-parse-errors'?=?'false','json.fail-on-missing-field'?=?'false');2、定義 sink
CREATE?TABLE?`live_streaming_log_sink`?(`user_id`?????????BIGINT,`ip`?????????????VARCHAR,`room_id`????????BIGINT,`arrive_time`??????TIMESTAMP,`leave_time`??????TIMESTAMP,`create_time`?????TIMESTAMP,`region_code`?????INT,`grade`???????????INT,`province`????????VARCHAR,primary?key(`user_id`,?`ip`,`room_id`,`arrive_time`)?not?enforced )?WITH?('connector'?=?'jdbc','url'?='jdbc:mysql://172.28.28.227:3306/livedb? rewriteBatchedStatements=true&serverTimezon=Asia/Shanghai','table-name'?=?'live_streaming_log','username'?=?'root','password'?=?'xxxxx','sink.buffer-flush.max-rows'?=?'5000','sink.buffer-flush.interval'?=?'2s','sink.max-retries'?=?'3' );3、業務邏輯
INSERT?INTO?`live_streaming_log_sink` SELECT?`*`?FROM?`live_streaming_log_source`;禮物總和統計(需提前在 MySQL 建表)
1、 定義 source
CREATE?TABLE?`?live_gift_total_source`?(`user_id`???????????VARCHAR,`gift_type`??????????VARCHAR,`gift_total_amount`???BIGINT,`ip`????????????????VARCHAR,`create_time`????????VARCHAR )?WITH?('connector'?=?'kafka','topic'?=?'live_gift_total','scan.startup.mode'?=?'earliest-offset','properties.bootstrap.servers'?=?'172.28.28.13:9092','properties.group.id'?=?'joylyu-consumer-1','format'?=?'json','json.ignore-parse-errors'?=?'false','json.fail-on-missing-field'?=?'false');2、 定義 sink
CREATE?TABLE?`live_gift_total_sink`?( `gift_type`?VARCHAR, `gift_total_amount`?BIGINT, primary?key(`user_id`,?`gift_type`)?not?enforced )?WITH?( 'connector'?=?'jdbc', 'url'?=?'jdbc:mysql://172.28.28.227:3306/livedb? rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', 'table-name'?=?'live_gift_total', 'username'?=?'root', 'password'?=?'xxxxx', 'sink.buffer-flush.max-rows'?=?'5000', 'sink.buffer-flush.interval'?=?'2s', 'sink.max-retries'?=?'3' );3、 業務邏輯
INSERT?INTO?`live_gift_total_sink` SELECT?`gift_type`,SUM(`gift_total_amount`)?as?`gift_total_amount_all` FROM?`live_gift_total_source` GROUP?BY?`gift_type`;各模塊禮物統計(需提前在 MySQL 建表)
1、 定義 source
CREATE?TABLE?`live_gift_total_source`?( `user_id`????????????VARCHAR, `gift_type`??????????VARCHAR, `gift_total_amount`???BIGINT, `ip`???????????????????VARCHAR, `create_time`?????VARCHAR, proc_time?AS?PROCTIME() )?WITH?('connector'?=?'kafka','topic'?=?'live_gift_total','scan.startup.mode'?=?'earliest-offset','properties.bootstrap.servers'?=?'172.28.28.13:9092','properties.group.id'?=?'joylyu-consumer-1','format'?=?'json','json.ignore-parse-errors'?=?'false','json.fail-on-missing-field'?=?'false');2、 定義 Hbase 維表
CREATE?TABLE?`dim_hbase`?( `rowkey`?STRING, `cf`?ROW?<`module_id`?STRING>, PRIMARY?KEY?(`rowkey`)?NOT?ENFORCED )?WITH?( 'connector'?=?'hbase-1.4', 'table-name'?=?'dim_hbase', 'zookeeper.quorum'?=?'用戶自己的hbase服務器zookeeper地址' );3、 定義 sink
CREATE?TABLE?`module_gift_total_sink`?( `module_id`?BIGINT, `module_gift_total_amount`?BIGINT, primary?key(`module_id`)?not?enforced )?WITH?( 'connector'?=?'jdbc', 'url'?=?'jdbc:mysql://172.28.28.227:3306/livedb? rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', 'table-name'?=?'live_gift_total', 'username'?=?'root', 'password'?=?'xxxxx', 'sink.buffer-flush.max-rows'?=?'5000', 'sink.buffer-flush.interval'?=?'2s', 'sink.max-retries'?=?'3' );4、業務邏輯
INSERT?INTO?`module_gift_total_sink` SELECT `b`.`cf`.`module_id`, SUM(`a`.`gift_total_amount`)?AS?`module_gift_total_amount` FROM?`live_gift_total_source`?AS?`a` LEFT?JOIN?`dim_hbase`?AS?`b`?for?SYSTEM_TIME?as?of?`a`.`proc_time`ON?`a`.`room_id`?=?`b`.`rowkey` GROUP?BY?`b`.`cf`.`module_id`;3.2 實時大屏可視化展示
3.2.1 添加數據源
進入商業智能分析界面,點擊添加數據源->MySQL,按上面方法連接到指定 MySQL 數據庫,點擊保存。
3.2.2 創建數據集
點擊創建數據集->SQL 數據集(可根據實際業務場景選擇其他數據集),從剛才的數據源中添加數據集,點擊保存。
3.2.3 制作報告
新建報告。點擊制作報告->新建報告(可選擇任意模版),拖拽組件到中間空白處完成報告的制作。
設置實時刷新。點擊左上角報告設置->高級,勾選獲取實時數據,刷新間隔設置為 3s(根據實際業務情況自行選擇),這樣可以根據 Mysq 數據源間隔 3s 一次自動刷新報告。完成之后點擊保存。
具體步驟見官網文檔:https://cloud.tencent.com/document/product/590/19753
3.2.4 查看報告
點擊查看報告,選擇剛才保存的報告,可以動態展示報告。(注:此報告只做演示使用,可以參考官方文檔優化報告:https://cloud.tencent.com/document/product/590/19784)
如下圖所示,大屏中總共 6 個圖表。
圖表 1:用戶地區分布。表示觀看直播客戶在全國范圍內的地區分布;
圖表 2:各級別會員人數。表示各個會員等級的總人數;
圖表 3:禮物類型總和。表示收到各禮物類型的總和;
圖表 4:最近 6h 禮物總數統計。表示最近 6 小時收到的禮物總計和;
圖表 5:刷禮物排行前 10。表示刷禮物最多的 10 個客戶;
圖表 6:在線人數。當天每個時間段進入直播間的人數。
實時大屏
4 總結
通過騰訊云CKafka組件采集數據,在兼容Flink開源版本的流計算Oceanus中實時進行維表關聯等加工處理,將加工后的數據存儲在MySQL等數據庫中,最終通過商業智能分析BI組件實時刷新MySQL的數據繪制出了實時大屏,得到了實時刷新的效果。這個方案在數據庫表設計時為了簡便易懂做了簡化處理,重點打通騰訊云產品展現整個方案。限于個人水平,如有理解有誤之處歡迎批評指正。
總結
以上是生活随笔為你收集整理的视频直播:实时数据可视化分析的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何做一款面向企业客户的商用级 SDK
- 下一篇: Mdebug:基于React开发的移动w