PostgreSQL 无会话、有会话模式 - 客服平均响应速度(RT)实时计算实践(窗口查询\流计算)...
標簽
PostgreSQL , 無會話 , 客服響應速度 , 觸發器 , rule , 窗口查詢
背景
通常客服系統可能存在一對多,多對多的情況。
例如,
我們在使用淘寶時,與店家交流時,你根本不知道后面的小二是一個人還是多個人共用一個賬號,還有可能是多個人使用了多個賬號但是對消費者只看到一個。
例如:
小二(n)賬號 -> 統一對外賬號 -> 消費者
還有的情況是一個小二為多個消費者服務:
小二賬號 -> 統一對外賬號 -> 消費者(n)
小二重要的KPI之一是響應速度,因為這直接反應到消費者的感受上。如果消費者一個問題,很久沒人回復,可能就直接關閉頁面,更換其他商家了。
那么如何統計響應速度呢?
通常來說,需要從消費者維度看待響應速度,因為一個問題可能被多個小二回復,也可能被1個小二回復,這種情況下,應該統計第一反饋時間作為響應時間。
另一方面,如果系統沒有會話機制的話,統計起來會比較麻煩。(并且,一個真實的會話里面的若干次交互,可能統計時會被抽象成若干的“虛擬會話”)
我們來看個例子。
1 無會話模式的響應速度統計
假設數據以TS字段順序到達為前提(通常這種場景,按TS到達的可能性較大,或者你可以使用clock_timestamp()來作為這個時間,可能性就更大了。),后面會講如果不這樣有什么問題,以及解決方案。
無會話模式,適合于客戶發起消息后,后臺任意分配一個客服給他(或者分配一個客服池子給他),第一時間響應他的可以是任意客服。
1、客服、客戶交談表(只展示重要字段)
create table tbl ( a int not null, -- 客服ID b int not null, -- 客戶ID ts timestamp not null, -- 消息時間 direct boolean not null -- 消息方向 true: a->b, false: b->a );2、客服的平均響應時間
一個客戶的最早發言時間,下一時刻任意客服最早回復這位客戶的回復時間。(中間部分略過)
例如
1, 2, 0001, false -- 客戶2給客服1發信息時間,作為一次虛擬會話的開始時間 100, 2, 0003, false -- 客戶2給客服100發信息時間,如果比下一條先到達,這次虛擬會話 ,按這種方法將計算不到。 22, 2, 0002, true -- 客服22給客戶2發信息時間,作為一次虛擬會話的最早響應時間 1, 2, 0005, true -- 客服1給客戶2發信息時間3、實時計算解決這個問題
結果表結構
create table tbl_result ( b int not null, -- 客戶ID b_ts timestamp, -- 客戶發起一次虛擬會話的最早時間 a int default -1, -- 最先響應這次虛擬會話的客服ID, -1表示還沒人響應 a_ts timestamp -- 最先響應這次虛擬會話的時間 ); -- 添加約束,當客戶的虛擬會話沒有完結時,不計新虛擬會話。 -- 保證同一時刻,同一客戶,只有一個未完結的虛擬會話。 alter table tbl_result add constraint uk exclude (b with =) where (a=-1);4、實時處理邏輯
when insert into tbl
if b -> a 邏輯(客戶發給客服) select 1 from tbl_result where b=? and a = -1; if not found then insert into tbl_result (b,b_ts) values (NEW.b,NEW.ts) on conflict ON CONSTRAINT uk do nothing; -- update set b_ts=excluded.b_ts -- where tbl_result.b_ts > excluded.b_ts; -- 僅當新寫入時間小于原記錄時更新, 也可以不做,假設TS是順序的。 -- else -- 說明還沒有人回復它,跳過,等第一次客服響應來更新這條記錄 end if; if a -> b 邏輯(客服發給客戶) select 1 from tbl_result where b=? and a = -1; if found then update tbl_result set a=? , a_ts=? where b=? and a = -1 and NEW.ts >= b_ts; -- else -- 說明已有人回復,不需要更新 end if;5、tbl的insert trigger函數
create or replace function tb() returns trigger as $$ declare begin if not NEW.direct then -- b -> a 邏輯(客戶發給客服) perform 1 from tbl_result where b=NEW.b and a = -1; if not found then insert into tbl_result (b,b_ts) values (NEW.b,NEW.ts) on conflict ON CONSTRAINT uk do nothing; -- update set b_ts=excluded.b_ts -- where tbl_result.b_ts > excluded.b_ts; -- 僅當新寫入時間小于原記錄時更新, 也可以不做,假設TS是順序的。 -- else -- 說明還沒有人回復它,跳過,等第一次客服響應來更新這條記錄 end if; else -- a -> b 邏輯(客服發給客戶) perform 1 from tbl_result where b=NEW.b and a = -1; if found then update tbl_result set a=NEW.a , a_ts=NEW.ts where b=NEW.b and a = -1 and NEW.ts >= b_ts; -- else -- 說明已有人回復,不需要更新 end if; end if; return NULL; end; $$ language plpgsql strict;創建觸發器
create trigger tg0 after insert on tbl for each row execute procedure tb();6、寫入壓測
假設有100個客服 100萬個客戶 使用clock_timestamp生成TS,確保數據按一定時序順序寫入。 vi test.sql \set a random(1,100) \set b random(1,1000000) \set bo random(0,1) insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean); pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120 postgres=# select count(*) from tbl; count ---------- 19805266 (1 row) postgres=# select count(*) from tbl_result; count --------- 5202622 (1 row)7、算法校驗,正確
postgres=# select * from tbl where b=1 order by ts limit 10; a | b | ts | direct ----+---+----------------------------+-------- 25 | 1 | 2018-08-15 09:43:22.862526 | f 17 | 1 | 2018-08-15 09:43:25.180255 | f 63 | 1 | 2018-08-15 09:43:29.901536 | t 3 | 1 | 2018-08-15 09:43:31.906753 | t 38 | 1 | 2018-08-15 09:43:52.035444 | f 24 | 1 | 2018-08-15 09:43:52.679127 | f 69 | 1 | 2018-08-15 09:43:54.855426 | t 44 | 1 | 2018-08-15 09:44:05.735922 | t 75 | 1 | 2018-08-15 09:44:10.555001 | t 17 | 1 | 2018-08-15 09:44:10.565798 | f (10 rows) postgres=# select * from tbl_result where b=1 order by b_ts limit 10; b | b_ts | a | a_ts ---+----------------------------+----+---------------------------- 1 | 2018-08-15 09:43:22.862526 | 63 | 2018-08-15 09:43:29.901536 1 | 2018-08-15 09:43:52.035444 | 69 | 2018-08-15 09:43:54.855426 1 | 2018-08-15 09:44:10.565798 | 86 | 2018-08-15 09:44:33.090099 1 | 2018-08-15 09:44:33.815634 | 63 | 2018-08-15 09:44:45.737907 1 | 2018-08-15 09:44:52.277396 | 45 | 2018-08-15 09:44:59.006899 1 | 2018-08-15 09:45:19.288931 | -1 | (6 rows)性能,寫入吞吐達到16.5萬行/s。
transaction type: ./test.sql scaling factor: 1 query mode: prepared number of clients: 32 number of threads: 32 duration: 120 s number of transactions actually processed: 19805266 latency average = 0.194 ms latency stddev = 0.221 ms tps = 165043.068862 (including connections establishing) tps = 165056.827167 (excluding connections establishing) statement latencies in milliseconds: 0.001 \set a random(1,100) 0.000 \set b random(1,1000000) 0.000 \set bo random(0,1) 0.191 insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);2 有會話模式的響應速度統計
假設數據以TS字段順序到達為前提(通常這種場景,按TS到達的可能性較大,或者你可以使用clock_timestamp()來作為這個時間,可能性就更大了。),后面會講如果不這樣有什么問題,以及解決方案。
相比前面的不同之處,a,b一一對應,即有會話模式。
客戶1發給客服2 那么就只看客服2第一次響應客戶1的時間。有會話模式,適合于客戶發起消息后,后臺分配一個客服給他,第一時間響應他的必須是這個分配的客服。
稍微修改前面的代碼即可。
1、客服、客戶交談表(只展示重要字段)
create table tbl ( a int not null, -- 客服ID b int not null, -- 客戶ID ts timestamp not null, -- 消息時間 direct boolean not null -- 消息方向 true: a->b, false: b->a );2、客服的平均響應時間
一個客戶的最早發言時間,下一時刻對應客服最早回復這位客戶的回復時間。(中間部分略過)
例如
1, 2, 0001, false -- 客戶2給客服1發信息時間,作為一次虛擬會話的開始時間 1, 2, 0003, false -- 客戶2給客服1發信息時間。 1, 2, 0002, true -- 客服1給客戶2發信息時間,作為一次虛擬會話的最早響應時間 1, 2, 0005, true -- 客服1給客戶2發信息時間3、實時計算解決這個問題
結果表結構
create table tbl_result ( b int not null, -- 客戶ID b_ts timestamp, -- 客戶發起一次虛擬會話的最早時間 a int, -- 客戶給誰發起了這次會話 rsp_a int default -1, -- 響應這次虛擬會話的客服ID, -1表示沒人響應 a_ts timestamp -- 最先響應這次虛擬會話的時間 ); -- 添加約束,當客戶的虛擬會話沒有完結時,不計新虛擬會話。 -- 保證同一時刻,同一客戶,與同一客服,只有一個未完結的虛擬會話。 alter table tbl_result add constraint uk exclude (b with =, a with =) where (rsp_a=-1);4、實時處理邏輯
when insert into tbl
if b -> a 邏輯(客戶發給客服) select 1 from tbl_result where b=? and a=? and rsp_a = -1; if not found then insert into tbl_result (b,b_ts,a) values (NEW.b,NEW.ts,NEW.a) on conflict ON CONSTRAINT uk do nothing; -- update set b_ts=excluded.b_ts -- where tbl_result.b_ts > excluded.b_ts; -- 僅當新寫入時間小于原記錄時更新, 也可以不做,假設TS是順序的。 -- else -- 說明還沒有人回復它,跳過,等第一次客服響應來更新這條記錄 end if; if a -> b 邏輯(客服發給客戶) select 1 from tbl_result where b=? and a=? and rsp_a = -1; if found then update tbl_result set rsp_a=? , a_ts=? where b=? and a=? and rsp_a = -1 and NEW.ts >= b_ts; -- else -- 說明已有人回復,不需要更新 end if;5、tbl的insert trigger函數
create or replace function tb() returns trigger as $$ declare begin if not NEW.direct then -- b -> a 邏輯(客戶發給客服) perform 1 from tbl_result where b=NEW.b and a=NEW.a and rsp_a = -1; if not found then insert into tbl_result (b,b_ts,a) values (NEW.b,NEW.ts,NEW.a) on conflict ON CONSTRAINT uk do nothing; -- update set b_ts=excluded.b_ts -- where tbl_result.b_ts > excluded.b_ts; -- 僅當新寫入時間小于原記錄時更新, 也可以不做,假設TS是順序的。 -- else -- 說明還沒有人回復它,跳過,等第一次客服響應來更新這條記錄 end if; else -- a -> b 邏輯(客服發給客戶) perform 1 from tbl_result where b=NEW.b and a=NEW.a and rsp_a = -1; if found then update tbl_result set rsp_a=NEW.a , a_ts=NEW.ts where b=NEW.b and a=NEW.a and rsp_a = -1 and NEW.ts >= b_ts; -- else -- 說明已有人回復,不需要更新 end if; end if; return NULL; end; $$ language plpgsql strict;創建觸發器
create trigger tg0 after insert on tbl for each row execute procedure tb();6、寫入壓測
假設有10個客服 1萬個客戶 使用clock_timestamp生成TS,確保數據按一定時序順序寫入。 vi test.sql \set a random(1,10) \set b random(1,10000) \set bo random(0,1) insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean); pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120 postgres=# select count(*) from tbl; count ---------- 19771381 (1 row) postgres=# select count(*) from tbl_result; count --------- 4967253 (1 row)7、算法校驗,正確
postgres=# select * from tbl where b=1 and a=9 order by ts limit 30; a | b | ts | direct ---+---+----------------------------+-------- 9 | 1 | 2018-08-15 10:08:20.82439 | f 9 | 1 | 2018-08-15 10:08:21.341471 | f 9 | 1 | 2018-08-15 10:08:23.084166 | f 9 | 1 | 2018-08-15 10:08:23.160162 | f 9 | 1 | 2018-08-15 10:08:23.596106 | f 9 | 1 | 2018-08-15 10:08:23.735911 | f 9 | 1 | 2018-08-15 10:08:23.869232 | f 9 | 1 | 2018-08-15 10:08:25.379688 | t 9 | 1 | 2018-08-15 10:08:26.471402 | t 9 | 1 | 2018-08-15 10:08:26.622047 | t 9 | 1 | 2018-08-15 10:08:26.640313 | t 9 | 1 | 2018-08-15 10:08:27.28104 | f 9 | 1 | 2018-08-15 10:08:27.285187 | f 9 | 1 | 2018-08-15 10:08:27.992076 | t 9 | 1 | 2018-08-15 10:08:28.233072 | t 9 | 1 | 2018-08-15 10:08:28.590125 | t 9 | 1 | 2018-08-15 10:08:29.6004 | t 9 | 1 | 2018-08-15 10:08:30.058747 | f 9 | 1 | 2018-08-15 10:08:30.114936 | t 9 | 1 | 2018-08-15 10:08:30.237846 | f 9 | 1 | 2018-08-15 10:08:30.468956 | t 9 | 1 | 2018-08-15 10:08:31.904644 | t 9 | 1 | 2018-08-15 10:08:32.092077 | t 9 | 1 | 2018-08-15 10:08:32.407465 | t 9 | 1 | 2018-08-15 10:08:32.530952 | f 9 | 1 | 2018-08-15 10:08:32.991299 | f 9 | 1 | 2018-08-15 10:08:33.567598 | f 9 | 1 | 2018-08-15 10:08:33.726376 | f 9 | 1 | 2018-08-15 10:08:33.734359 | f 9 | 1 | 2018-08-15 10:08:34.288767 | f (30 rows) postgres=# select * from tbl_result where b=1 and a=9 order by b_ts limit 10; b | b_ts | a | rsp_a | a_ts ---+----------------------------+---+-------+---------------------------- 1 | 2018-08-15 10:08:20.82439 | 9 | 9 | 2018-08-15 10:08:25.379688 1 | 2018-08-15 10:08:27.28104 | 9 | 9 | 2018-08-15 10:08:27.992076 1 | 2018-08-15 10:08:30.058747 | 9 | 9 | 2018-08-15 10:08:30.114936 1 | 2018-08-15 10:08:30.237846 | 9 | 9 | 2018-08-15 10:08:30.468956 1 | 2018-08-15 10:08:32.530952 | 9 | 9 | 2018-08-15 10:08:34.749098 1 | 2018-08-15 10:08:35.615081 | 9 | 9 | 2018-08-15 10:08:35.681585 1 | 2018-08-15 10:08:35.689469 | 9 | 9 | 2018-08-15 10:08:37.099554 1 | 2018-08-15 10:08:40.70679 | 9 | 9 | 2018-08-15 10:08:40.80081 1 | 2018-08-15 10:08:40.892459 | 9 | 9 | 2018-08-15 10:08:44.732971 1 | 2018-08-15 10:08:45.685787 | 9 | 9 | 2018-08-15 10:08:46.301875 (10 rows)性能,寫入吞吐達到16.5萬行/s。
transaction type: ./test.sql scaling factor: 1 query mode: prepared number of clients: 32 number of threads: 32 duration: 120 s number of transactions actually processed: 19771381 latency average = 0.194 ms latency stddev = 0.222 ms tps = 164760.717898 (including connections establishing) tps = 164774.989399 (excluding connections establishing) statement latencies in milliseconds: 0.001 \set a random(1,10) 0.000 \set b random(1,10000) 0.000 \set bo random(0,1) 0.192 insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean);看似問題解決了嗎?
3 統計算法問題與解決辦法
前面都是假設數據按TS到達的情況(使用clock_timestamp生成ts還是比較靠譜的),如果數據完全不按TS到達,會出現什么問題么?
1、如果不按順序到達,會話的發起時間、第一響應時間可能無法得到正確結果
因為一旦觸發生成tbl_result后,后面進來的數據無法修正前面的錯誤。
2、允許一定時間的延遲,同時容忍一定的錯誤率的情況下。比如每小時消費前一小時的數據,中間預留1小時的緩沖時間,降低錯誤率:
2.1、按時間區間,延遲消費適當解決以上問題。
單線程消費,統計。
with tmp as ( delete from tbl where ctid = any(array( select ctid from tbl where ts < now()-interval '1 hour' order by ts limit 10000 )) returning * ) select * from tmp order by ts;然后,按順序消費。
2.2、按時間區間,延遲并行消費,解決大數據量的問題。例如按客戶ID,HASH,并行消費。
多線程(每個HASH一個線程),消費,統計。
create index idx_tbl_mod_32 on tbl (abs(mod(hashint4(b), 32)), ts); with tmp as ( delete from tbl where ctid = any(array( select ctid from tbl where ts < now()-interval '1 hour' and abs(mod(hashint4(b), 32))=0 -- hash 并行 order by ts limit 10000 )) returning * ) select * from tmp order by ts;然后,按順序消費。
例子1
以第一種場景(無會話狀態)為例。延遲批量消費的方法生成最終數據。
1、會話表
create table tbl ( a int not null, -- 客服ID b int not null, -- 客戶ID ts timestamp not null, -- 消息時間 direct boolean not null -- 消息方向 true: a->b, false: b->a ); create index idx_tbl_ts on tbl(ts);2、統計結果表
create table tbl_result ( b int not null, -- 客戶ID b_ts timestamp, -- 客戶發起一次虛擬會話的最早時間 a int default -1, -- 最先響應這次虛擬會話的客服ID, -1表示還沒人響應 a_ts timestamp -- 最先響應這次虛擬會話的時間 ); -- 添加約束,當客戶的虛擬會話沒有完結時,不計新虛擬會話。 -- 保證同一時刻,同一客戶,只有一個未完結的虛擬會話。 alter table tbl_result add constraint uk exclude (b with =) where (a=-1);3、中間會話表(可以不落地,只順序計算)。
create table tbl_mid ( a int not null, -- 客服ID b int not null, -- 客戶ID ts timestamp not null, -- 消息時間 direct boolean not null -- 消息方向 true: a->b, false: b->a );4、中間會話表觸發器
(before 觸發器 return null(不落地,只順序計算))
(after 觸發器 return null(落地))
create or replace function tb() returns trigger as $$ declare begin if not NEW.direct then -- b -> a 邏輯(客戶發給客服) perform 1 from tbl_result where b=NEW.b and a = -1; if not found then insert into tbl_result (b,b_ts) values (NEW.b,NEW.ts) on conflict ON CONSTRAINT uk do nothing; -- update set b_ts=excluded.b_ts -- where tbl_result.b_ts > excluded.b_ts; -- 僅當新寫入時間小于原記錄時更新, 也可以不做,假設TS是順序的。 -- else -- 說明還沒有人回復它,跳過,等第一次客服響應來更新這條記錄 end if; else -- a -> b 邏輯(客服發給客戶) perform 1 from tbl_result where b=NEW.b and a = -1; if found then update tbl_result set a=NEW.a , a_ts=NEW.ts where b=NEW.b and a = -1 and NEW.ts >= b_ts; -- else -- 說明已有人回復,不需要更新 end if; end if; return NULL; end; $$ language plpgsql strict; create trigger tg0 after insert on tbl_mid for each row execute procedure tb();5、寫入大批量數據,由于觸發器轉移到了中間表,所以寫入吞吐達到了接近29萬行/s。
假設有100個客服 100萬個客戶 使用clock_timestamp生成TS,確保數據按一定時序順序寫入。 vi test.sql \set a random(1,100) \set b random(1,1000000) \set bo random(0,1) insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean); pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120 transaction type: ./test.sql scaling factor: 1 query mode: prepared number of clients: 32 number of threads: 32 duration: 120 s number of transactions actually processed: 34403943 latency average = 0.112 ms latency stddev = 0.229 ms tps = 286698.048259 (including connections establishing) tps = 286718.916176 (excluding connections establishing) statement latencies in milliseconds: 0.001 \set a random(1,100) 0.000 \set b random(1,1000000) 0.000 \set bo random(0,1) 0.109 insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean); postgres=# select count(*) from tbl; count ---------- 19805266 (1 row) postgres=# select count(*) from tbl_result; count --------- 5202622 (1 row)6、單線程消費,一次消費100萬行,速度約每秒6萬。
with tmp as ( delete from tbl where ctid = any(array( select ctid from tbl where ts < now()-interval '1 min' -- 測試時改成了消費1分鐘前的數據 order by ts limit 1000000 )) returning * ) insert into tbl_mid select * from tmp order by ts; Time: 16532.939 ms (00:16.533)7、算法校驗,正確
postgres=# select * from tbl_mid where b=2 order by ts limit 10; a | b | ts | direct ----+---+----------------------------+-------- 10 | 2 | 2018-08-15 10:24:58.538558 | t 25 | 2 | 2018-08-15 10:25:00.585426 | f 62 | 2 | 2018-08-15 10:25:04.2633 | f 45 | 2 | 2018-08-15 10:25:04.406764 | t (4 rows) postgres=# select * from tbl_result where b=2 order by b_ts limit 10; b | b_ts | a | a_ts ---+----------------------------+----+---------------------------- 2 | 2018-08-15 10:25:00.585426 | 45 | 2018-08-15 10:25:04.406764 (1 row)消費性能,單線程吞吐達到6萬行/s。
with tmp as ( delete from tbl where ctid = any(array( select ctid from tbl where ts < now()-interval '1 min' -- 測試時改成了消費1分鐘前的數據 order by ts limit 1000000 )) returning * ) insert into tbl_mid select * from tmp order by ts; Time: 16532.939 ms (00:16.533)消費節奏:
1、消費 2、VACUUM tbl; 3、消費 loop;例子2
以第一種場景(無會話狀態)為例。延遲批量統計的方法生成最終數據。(不消費(delete)已有數據)
1、會話表
create table tbl ( a int not null, -- 客服ID b int not null, -- 客戶ID ts timestamp not null, -- 消息時間 direct boolean not null -- 消息方向 true: a->b, false: b->a ); create index idx_tbl_ts on tbl(ts); -- 也可以使用brin索引 -- create index idx_tbl_ts on tbl using brin(ts);2、統計結果表
create table tbl_result ( b int not null, -- 客戶ID b_ts timestamp, -- 客戶發起一次虛擬會話的最早時間 a int default -1, -- 最先響應這次虛擬會話的客服ID, -1表示還沒人響應 a_ts timestamp -- 最先響應這次虛擬會話的時間 ); -- 添加約束,當客戶的虛擬會話沒有完結時,不計新虛擬會話。 -- 保證同一時刻,同一客戶,只有一個未完結的虛擬會話。 alter table tbl_result add constraint uk exclude (b with =) where (a=-1);3、中間會話表(可以不落地,只順序計算)。
create table tbl_mid ( a int not null, -- 客服ID b int not null, -- 客戶ID ts timestamp not null, -- 消息時間 direct boolean not null -- 消息方向 true: a->b, false: b->a );4、中間會話表觸發器
(before 觸發器 return null(不落地,只順序計算))
create or replace function tb() returns trigger as $$ declare begin if not NEW.direct then -- b -> a 邏輯(客戶發給客服) perform 1 from tbl_result where b=NEW.b and a = -1; if not found then insert into tbl_result (b,b_ts) values (NEW.b,NEW.ts) on conflict ON CONSTRAINT uk do nothing; -- update set b_ts=excluded.b_ts -- where tbl_result.b_ts > excluded.b_ts; -- 僅當新寫入時間小于原記錄時更新, 也可以不做,假設TS是順序的。 -- else -- 說明還沒有人回復它,跳過,等第一次客服響應來更新這條記錄 end if; else -- a -> b 邏輯(客服發給客戶) perform 1 from tbl_result where b=NEW.b and a = -1; if found then update tbl_result set a=NEW.a , a_ts=NEW.ts where b=NEW.b and a = -1 and NEW.ts >= b_ts; -- else -- 說明已有人回復,不需要更新 end if; end if; return NULL; end; $$ language plpgsql strict; create trigger tg0 before insert on tbl_mid for each row execute procedure tb();5、寫入大批量數據,由于觸發器轉移到了中間表,所以寫入吞吐達到了接近29萬行/s。
假設有100個客服 100萬個客戶 使用clock_timestamp生成TS,確保數據按一定時序順序寫入。 vi test.sql \set a random(1,100) \set b random(1,1000000) \set bo random(0,1) insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean); pgbench -M prepared -n -r -P 1 -f ./test.sql -c 32 -j 32 -T 120 transaction type: ./test.sql scaling factor: 1 query mode: prepared number of clients: 32 number of threads: 32 duration: 120 s number of transactions actually processed: 34403943 latency average = 0.112 ms latency stddev = 0.229 ms tps = 286698.048259 (including connections establishing) tps = 286718.916176 (excluding connections establishing) statement latencies in milliseconds: 0.001 \set a random(1,100) 0.000 \set b random(1,1000000) 0.000 \set bo random(0,1) 0.109 insert into tbl values (:a, :b, clock_timestamp(), :bo::boolean); postgres=# select count(*) from tbl; count ---------- 19805266 (1 row) postgres=# select count(*) from tbl_result; count --------- 5202622 (1 row)6、單線程讀取,統計,例如每次讀取一個小時的數據(定義清楚邊界,連續消費,同時避免并發、或重復消費,或者在寫統計結果時做到冪等,不用擔心重復消費)。
創建一張消費記錄表,統計已消費的時間間隔。
create table tbl_record (ts1 timestamp, ts2 timestamp);下次消費時,參考上次已消費的時間。
with tmp as ( insert into tbl_record (ts1, ts2) values ('2018-01-01 12:00:00', '2018-01-01 13:00:00') -- 記錄當前消費窗口 ) insert into tbl_mid select * from tbl where ts >= '2018-01-01 12:00:00' and ts < '2018-01-01 13:00:00' -- 上一個小時為窗口 (當前時間 大于等于 '2018-01-01 14:00:00') order by ts; -- 無會話模式 Time: 16532.939 ms (00:16.533)7、算法校驗,正確
postgres=# select * from tbl_mid where b=2 order by ts limit 10; a | b | ts | direct ----+---+----------------------------+-------- 10 | 2 | 2018-08-15 10:24:58.538558 | t 25 | 2 | 2018-08-15 10:25:00.585426 | f 62 | 2 | 2018-08-15 10:25:04.2633 | f 45 | 2 | 2018-08-15 10:25:04.406764 | t (4 rows) postgres=# select * from tbl_result where b=2 order by b_ts limit 10; b | b_ts | a | a_ts ---+----------------------------+----+---------------------------- 2 | 2018-08-15 10:25:00.585426 | 45 | 2018-08-15 10:25:04.406764 (1 row)消費性能,單線程吞吐達到6萬行/s。
with tmp as ( delete from tbl where ctid = any(array( select ctid from tbl where ts < now()-interval '1 min' -- 測試時改成了消費1分鐘前的數據 order by ts limit 1000000 )) returning * ) insert into tbl_mid select * from tmp order by b, ts; -- 無會話模式 Time: 16532.939 ms (00:16.533)消費節奏:
1、消費 2、VACUUM tbl; 3、消費 loop;例子3,使用窗口查詢解決同一問題
1、新增索引,用于窗口查詢加速
create index idx_tbl_1 on tbl (b,ts);2、無會話模式,使用窗口查詢,得到每個虛擬會話的開始時間、第一響應時間
select a, -- 虛擬會話的第一條消息,客戶發給了哪位客服IDb, -- 客戶IDts, -- 虛擬會話開始時間 lead_a, -- 最先響應的是誰(哪位客服)lead_session_end_ts, -- 虛擬會話第一次響應時間 lead_session_end_ts - ts as dur, -- 響應間隔 direct,lag_direct,lag_ts from ( select *, lead(session_end_ts) over w2 as lead_session_end_ts, -- 當前窗口,當前行的下一條ts值 , 即會話第一次響應時間 lead(a) over w2 as lead_a -- 當前窗口,當前行的下一條的b(客服ID) , 即響應的是哪位客服 from ( select * from ( select a,b,ts,direct,lag_direct,lag_ts, case when ((direct = false and lag_direct is null) -- 判斷虛擬會話開始時間的邏輯 or (direct = false and lag_direct = true)) then ts end as session_begin_ts, -- 虛擬會話開始時間 case when (direct = true and lag_direct = false) -- 判斷虛擬會話第一次響應時間的邏輯 then ts end as session_end_ts -- 虛擬會話第一次響應時間 from ( select a, -- 客服ID b, -- 客戶ID ts, -- 消息時間 direct, -- 消息方向 true: a->b, false: b->a lag(direct) over w1 as lag_direct, -- 當前窗口,當前行的上一條direct值 lag(ts) over w1 as lag_ts -- 當前窗口,當前行的上一條ts值 from tbl window w1 as (partition by b order by ts) -- where ts between xx and xx , 一次只查部分數據時可用 ) t ) t where session_begin_ts is not null -- 虛擬會話開始時間字段不為空,表示這條記錄是會話開始的記錄 or session_end_ts is not null -- 虛擬會話結束時間字段不為空,表示這條記錄是會話第一次響應的記錄 ) t window w2 as (partition by b order by ts) ) t where direct = false -- 客戶在虛擬會話中發起第一條消息的記錄 and lead_session_end_ts - ts is not null limit 100;3、結果、算法正確性驗證
a | b | ts | lead_a | lead_session_end_ts | dur | direct | lag_direct | lag_ts -----+----+----------------------------+--------+----------------------------+-----------------+--------+------------+----------------------------26 | 1 | 2018-08-15 10:25:13.056316 | 75 | 2018-08-15 10:25:16.546126 | 00:00:03.48981 | f | | 43 | 1 | 2018-08-15 10:25:21.483542 | 99 | 2018-08-15 10:25:25.552488 | 00:00:04.068946 | f | t | 2018-08-15 10:25:16.54612628 | 1 | 2018-08-15 10:25:28.287823 | 70 | 2018-08-15 10:25:37.375585 | 00:00:09.087762 | f | t | 2018-08-15 10:25:26.51835912 | 1 | 2018-08-15 10:25:47.203597 | 20 | 2018-08-15 10:26:03.423969 | 00:00:16.220372 | f | t | 2018-08-15 10:25:47.03645991 | 1 | 2018-08-15 10:26:05.332921 | 57 | 2018-08-15 10:26:08.070122 | 00:00:02.737201 | f | t | 2018-08-15 10:26:03.42396924 | 1 | 2018-08-15 10:26:16.798485 | 85 | 2018-08-15 10:26:22.222025 | 00:00:05.42354 | f | t | 2018-08-15 10:26:15.31928790 | 1 | 2018-08-15 10:26:22.58553 | 28 | 2018-08-15 10:26:25.987987 | 00:00:03.402457 | f | t | 2018-08-15 10:26:22.22202530 | 1 | 2018-08-15 10:26:31.458875 | 42 | 2018-08-15 10:26:36.259917 | 00:00:04.801042 | f | t | 2018-08-15 10:26:25.98798711 | 1 | 2018-08-15 10:26:37.828413 | 70 | 2018-08-15 10:26:49.212275 | 00:00:11.383862 | f | t | 2018-08-15 10:26:36.25991721 | 2 | 2018-08-15 10:25:15.532378 | 66 | 2018-08-15 10:25:19.742437 | 00:00:04.210059 | f | | 50 | 2 | 2018-08-15 10:25:30.988507 | 20 | 2018-08-15 10:25:36.645969 | 00:00:05.657462 | f | t | 2018-08-15 10:25:30.75022498 | 2 | 2018-08-15 10:25:47.075616 | 72 | 2018-08-15 10:25:52.34913 | 00:00:05.273514 | f | t | 2018-08-15 10:25:40.85846572 | 2 | 2018-08-15 10:25:56.595608 | 99 | 2018-08-15 10:26:11.46232 | 00:00:14.866712 | f | t | 2018-08-15 10:25:55.32413198 | 2 | 2018-08-15 10:26:12.303834 | 97 | 2018-08-15 10:26:15.341379 | 00:00:03.037545 | f | t | 2018-08-15 10:26:11.4623263 | 2 | 2018-08-15 10:26:19.116171 | 22 | 2018-08-15 10:26:23.743978 | 00:00:04.627807 | f | t | 2018-08-15 10:26:15.34137966 | 2 | 2018-08-15 10:26:30.024534 | 49 | 2018-08-15 10:26:41.196351 | 00:00:11.171817 | f | t | 2018-08-15 10:26:23.74397883 | 2 | 2018-08-15 10:26:41.962942 | 51 | 2018-08-15 10:26:43.172856 | 00:00:01.209914 | f | t | 2018-08-15 10:26:41.19635164 | 2 | 2018-08-15 10:26:43.575144 | 88 | 2018-08-15 10:26:44.17728 | 00:00:00.602136 | f | t | 2018-08-15 10:26:43.1728564、對比使用中間表得到的結果
insert into tbl_mid select * from tbl order by ts ; select * from tbl_result where b=1 or b=2 order by b_ts; b | b_ts | a | a_ts ---+----------------------------+----+----------------------------1 | 2018-08-15 10:25:13.056316 | 75 | 2018-08-15 10:25:16.5461261 | 2018-08-15 10:25:21.483542 | 99 | 2018-08-15 10:25:25.5524881 | 2018-08-15 10:25:28.287823 | 70 | 2018-08-15 10:25:37.3755851 | 2018-08-15 10:25:47.203597 | 20 | 2018-08-15 10:26:03.4239691 | 2018-08-15 10:26:05.332921 | 57 | 2018-08-15 10:26:08.0701221 | 2018-08-15 10:26:16.798485 | 85 | 2018-08-15 10:26:22.2220251 | 2018-08-15 10:26:22.58553 | 28 | 2018-08-15 10:26:25.9879871 | 2018-08-15 10:26:31.458875 | 42 | 2018-08-15 10:26:36.2599171 | 2018-08-15 10:26:37.828413 | 70 | 2018-08-15 10:26:49.2122751 | 2018-08-15 10:26:50.622352 | -1 | 2 | 2018-08-15 10:25:15.532378 | 66 | 2018-08-15 10:25:19.7424372 | 2018-08-15 10:25:30.988507 | 20 | 2018-08-15 10:25:36.6459692 | 2018-08-15 10:25:47.075616 | 72 | 2018-08-15 10:25:52.349132 | 2018-08-15 10:25:56.595608 | 99 | 2018-08-15 10:26:11.462322 | 2018-08-15 10:26:12.303834 | 97 | 2018-08-15 10:26:15.3413792 | 2018-08-15 10:26:19.116171 | 22 | 2018-08-15 10:26:23.7439782 | 2018-08-15 10:26:30.024534 | 49 | 2018-08-15 10:26:41.1963512 | 2018-08-15 10:26:41.962942 | 51 | 2018-08-15 10:26:43.1728562 | 2018-08-15 10:26:43.575144 | 88 | 2018-08-15 10:26:44.177282 | 2018-08-15 10:26:45.595639 | -1 | (20 rows)5、會話模式,SQL改動兩處即可。
create index idx_tbl_2 on tbl (b,a,ts); -- 窗口加速 select a, -- 虛擬會話的第一條消息,客戶發給了哪位客服IDb, -- 客戶IDts, -- 虛擬會話開始時間 lead_a, -- 最先響應的是誰(哪位客服)lead_session_end_ts, -- 虛擬會話第一次響應時間 lead_session_end_ts - ts as dur, -- 響應間隔 direct,lag_direct,lag_ts from ( select *, lead(session_end_ts) over w2 as lead_session_end_ts, -- 當前窗口,當前行的下一條ts值 , 即會話第一次響應時間 lead(a) over w2 as lead_a -- 當前窗口,當前行的下一條的b(客服ID) , 即響應的是哪位客服 from ( select * from ( select a,b,ts,direct,lag_direct,lag_ts, case when ((direct = false and lag_direct is null) -- 判斷虛擬會話開始時間的邏輯 or (direct = false and lag_direct = true)) then ts end as session_begin_ts, -- 虛擬會話開始時間 case when (direct = true and lag_direct = false) -- 判斷虛擬會話第一次響應時間的邏輯 then ts end as session_end_ts -- 虛擬會話第一次響應時間 from ( select a, -- 客服ID b, -- 客戶ID ts, -- 消息時間 direct, -- 消息方向 true: a->b, false: b->a lag(direct) over w1 as lag_direct, -- 當前窗口,當前行的上一條direct值 lag(ts) over w1 as lag_ts -- 當前窗口,當前行的上一條ts值 from tbl window w1 as (partition by b,a order by ts) -- 有會話模式,改這個partition-- where ts between xx and xx , 一次只查部分數據時可用 ) t ) t where session_begin_ts is not null -- 虛擬會話開始時間字段不為空,表示這條記錄是會話開始的記錄 or session_end_ts is not null -- 虛擬會話結束時間字段不為空,表示這條記錄是會話第一次響應的記錄 ) t window w2 as (partition by b,a order by ts) -- 有會話模式,改這個partition ) t where direct = false -- 客戶在虛擬會話中發起第一條消息的記錄 and lead_session_end_ts - ts is not null limit 100;性能,3000萬記錄,1毫秒響應。
小結
本文涉及的場景為無會話、或者會話無明顯標識的情況下,使用PostgreSQL高效率的統計客服的響應速度的問題。
使用到的方法與性能指標
1、實時計算,觸發器(當到達時間有序, 或者說大部分有序時。使用clock_timestamp可以讓數據基本有序)
寫入吞吐16.5萬行每秒。
2、閱后即焚(延遲消費,解決數據寫入無需的問題)。
寫入吞吐29萬行每秒。
單線程消費6萬行每秒。
3、閱后即焚,使用HASH,并行消費,提升消費吞吐。
4、使用窗口查詢,同樣能夠很好的解決此場景的需求,而且性能杠杠的。
參考
《HTAP數據庫 PostgreSQL 場景與性能測試之 27 - (OLTP) 物聯網 - FEED日志, 流式處理 與 閱后即焚 (CTE)》
總結
以上是生活随笔為你收集整理的PostgreSQL 无会话、有会话模式 - 客服平均响应速度(RT)实时计算实践(窗口查询\流计算)...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 四说大数据时代“神话”:从大数据到深数据
- 下一篇: LeetCode每日一题: 单值二叉树(