Tablestore + Blink实战:交易数据的实时统计
背景
交易數據的實時統計是電商網站一個核心功能,可以幫助用戶實時統計網站的整體銷售情況,快速驗證“新銷售策略”的效果。我們今天介紹一個基于表格存儲(Tablestore)實現交易數據的實時計算,給大家提供一個新使用方式。
Tablestore作為在線的結構化數據庫,提供了毫秒級的訪問延時和豐富的查詢方式,能高效的支撐交易數據的存儲和查詢,同時Tablestore已經原生支持阿里云的流計算框架Flink/Blink,可以實現數據的實時計算。
架構
?
示例設計
基本場景
注意:示例是模擬一個電商網站的交易數據的存儲和實時計算,目的是為了展示Tablestore + Blink的使用流程。
用戶通過SDK將網站交易數據實時的存儲(PutRow/BatchWrite/TableStoreWriter)到Tablestore的source_order表中,Tablestore通過Tunnel服務,將實時增量的數據流入到Flink/Blink中,每5秒進行一次聚合計算,并將計算的結果重新寫回Tablestore的sink_order表中。最后提供給“大屏”實時讀取(GetRange)展示。
Source表(源表)- source_order
source表是原始數據表,存儲了所有交易記錄。
| metering(PrimaryKey) | string | 計量類型,樣例中默認是web |
| orderid(PrimaryKey) | string | 訂單號ID |
| ts | integer | 交易時間(Unix時間戳,毫秒精度) |
| price | double | 交易金額 |
| buyerid | integer | 買家ID |
| sellerid | integer | 賣家ID |
| productid | integer | 商品ID |
Sink表(結果表)- sink_order
| metering(PrimaryKey) | string | 計量類型,樣例中默認是web |
| ts(PrimaryKey) | integer | 交易時間(Unix時間戳,毫秒精度) |
| price | double | 交易金額 |
| ordercount | integer | 交易次數 |
Flink SQL
DDL參考
注意:當前Blink在支持Tablestore source上還有些限制,只能運行ProcessingTime的方式,未來會支持EventTime模式,按照用戶數據參數的時間進行計算。
-- Source 源表創建 CREATE TABLE ots_input (metering VARCHAR,orderid VARCHAR,price DOUBLE,byerid BIGINT,sellerid BIGINT,productid BIGINT,primary key(metering,orderid),ts AS PROCTIME() ) WITH (type = 'ots',instanceName = 'ordertest',tableName = 'source_order',accessId = '******************',accessKey = '******************',endpoint = 'http://ordertest.cn-zhangjiakou.vpc.tablestore.aliyuncs.com',tunnelName = 'blink_agg' ); -- Sink 結果表創建 CREATE TABLE ots_output (metering VARCHAR,ts BIGINT,price DOUBLE,ordercount BIGINT,primary key(metering,ts) ) WITH (type = 'ots',instanceName = 'ordertest',tableName = 'sink_order',accessId = '******************',accessKey = '******************',endpoint = 'http://ordertest.cn-zhangjiakou.vpc.tablestore.aliyuncs.com',valueColumns = 'price,ordercount' );-- 計算 INSERT INTO ots_output SELECT DISTINCT metering as metering,CAST(TUMBLE_START(ots_input.ts, INTERVAL '5' SECOND) AS BIGINT) AS ts,SUM(price) as price,COUNT(orderid) as ordercount FROM ots_input GROUP BY TUMBLE(ts, INTERVAL '5' SECOND),metering;實戰
第一步:準備賬戶與開服
準備表格存儲實例,可以參考《表格存儲實例創建》
準備Flink/Blink項目,可以參考《Blink如何購買》
第二步:創建資源
表格存儲資源
表格存儲控制臺入口,創建表格存儲實例ordertest (用戶自定義即可,下面對于參數位置更換為自定義的實例名),并記錄實例的VPC地址
同時在控制臺創建Source表和Sink表, 并為Source表(source_order)開啟一個Tunnel服務blink_agg
圖三 Source表(source_order)
圖四 Sink表(sink_order)
圖五 源表和目標表
圖六 創建通道
Blink資源
Blink控制臺入口,創建一個Blink項目(獨享模式要創建集群之后才能創建項目),分別創建一個作業,agg_order,并將上面的Flink SQL粘貼到窗口中,上線服務
在運維窗口中啟動該任務
第三步:壓入數據 并 實時獲取結算結果
1 準備配置文件
程序默認會從'~/tablestoreConf.json'獲取配置
vim ~/tablestoreConf.json# 內容 {"endpoint":"http://ordertest.cn-zhangjiakou.ots.aliyuncs.com","accessId":"************","accessKey":"************","instanceName":"ordertest" }2 構建源碼
mvn install cd target tar xzvf stream-compute-1.0-SNAPSHOT-release.tar.gz3 啟動壓力器和模擬大屏
可以直接下載工具包:stream-compute-1.0-SNAPSHOT-release.tar.gz
# 窗口1 ./bin/mock_order_generator # 窗口2 ./bin/data_show_screen4 效果
原文鏈接
本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的Tablestore + Blink实战:交易数据的实时统计的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 浅析基于 Serverless 的前后端
- 下一篇: 语雀携手Teambition,玩转项目协