redis延迟队列 如何确保成功消费_千万级延时任务队列如何实现,看美图开源的-LMSTFY...
導讀:Task是web開發中一個經典場景,我們時常需要延時任務,或者定時任務,通常都需要任務隊列。常見的任務隊列如celery,lmstfy是美圖開源的任務隊列。本文作者詳細剖析了lmstfy的架構實現,干貨滿滿,適合技術人員閱讀。
lmstfy(Let Me Schedule Task For You) 是美圖架構基礎服務團隊在 2018 年初基于 Redis 實現的簡單任務隊列(Task Queue)服務,目前在美圖多個線上產品使用接近兩年的時間。主要提供以下特性:
任務具備延時、自動重試、優先級以及過期等功能
通過 HTTP restful API 提供服務
具備橫向擴展能力
豐富的業務和性能指標
Github 項目地址: https://github.com/meitu/lmstfy
使用場景
任務隊列跟消息隊列在使用場景上最大的區別是: 任務之間是沒有順序約束而消息要求順序(FIFO),且可能會對任務的狀態更新而消息一般只會消費不會更新。 類似 Kafka 利用消息 FIFO 和不需要更新(不需要對消息做索引)的特性來設計消息存儲,將消息讀寫變成磁盤的順序讀寫來實現比較好的性能。而任務隊列需要能夠任務狀態進行更新則需要對每個消息進行索引,如果把兩者放到一起實現則很難實現在功能和性能上兼得。在美圖內部選型上,如果是異步消息模型一般會選擇消息隊列,比如類似日志上報,搶購等。而對于需要延時/定時下發或者修改狀態任務則是使用任務隊列。
比如在以下幾種場景會使用任務隊列:
定時任務,如每天早上 8 點開始推送消息,定期刪除過期數據等
任務流,如自動創建 Redis 流程由資源創建,資源配置,DNS 修改等部分組成,使用任務隊列可以簡化整體的設計和重試流程
重試任務,典型場景如離線圖片處理
目標與調研
在自研任務隊列之前,我們基于以下幾個要求作為約束調研了現有一些開源方案:
任務支持延時/優先級任務和自動重試
高可用,服務不能有單點以及保證數據不丟失
可擴展,主要是容量和性能需要可擴展
第一種方案是 Redis 作者開源的分布式內存隊列 disque(https://github.com/antirez/disque)。disque 采用和 Redis Cluster 類似無中心設計,所有節點都可以寫入并復制到其他節點。不管是從功能上、設計還是可靠性都是比較好的選擇。我們在 2017 年也引入 disque 在部分業務使用過一段時間,后面遇到 bug 在內部修復后想反饋到社區,發現 Redis 作者決定不再維護這個項目(要把 disque 功能作為 redis module 來維護,應該是會伴隨 Redis 6 發布)。最終我們也放棄了 disque 方案,將數據遷移到我們自研任務隊列服務。
第二種方案是 2007 年就開源的 beanstalkd(https://github.com/beanstalkd/beanstalkd),現在仍然還是在維護狀態。beanstalkd 是類 memcached 協議全內存任務隊列,斷電或者重啟時通過 WAL 文件來恢復數據。但 benstalkd 不支持復制功能,服務存在單點問題且數據可靠性也無法滿足。當時也有考慮基于 beanstalkd 去做二次開發,但看完代碼之后覺得需要改造的點不只是復制,還有類似內存控制等等,所以沒有選擇 beanstalkd 二次開發的方案。
也考慮過類似基于 kafka/rocketmq 等消息隊列作為存儲的方案,最后從存儲設計模型和團隊技術棧等原因決定選擇基于 redis 作為存儲來實現任務隊列的功能。舉個例子,假設以 Kafka 這種消息隊列存儲來實現延時功能,每個隊列的時間都需要創建一個單獨的 topic(如: Q1-1s, Q1-2s..)。這種設計在延時時間比較固定的場景下問題不太大,但如果是延時時間變化比較大會導致 topic 數目過多,會把磁盤從順序讀寫會變成隨機讀寫從導致性能衰減,同時也會帶來其他類似重啟或者恢復時間過長的問題。
設計和實現
整體設計
lmstfy 是 HTTP 協議的無狀態服務,可以通過 4/L7 的 LB 來接入。內部主要由四個模塊組成:
Pump Thread: 每秒輪詢 Redis 將到期的任務遷移到就緒隊列(ready queue)
Metric Collector, 定時收集隊列相關統計數據到實例再通過 prometheus exporter 暴露給監控系統
Token Manager,用來管理 namespace 和 token 的模塊,namespace 是用來做業務隔離的單位
Producer/Consumer,用來處理用戶的任務和消費請求
Default Pool 除了用來存儲業務數據,namespace/token 這類元數據也會默認存儲到 Default 這個 Redis 池子里面
基礎概念
namespace - 用來隔離業務,每個業務是獨立的 namespace
queue - 隊列名稱,用區分同一業務不同消息類型
job - 業務定義的業務,主要包含以下幾個屬性:
id: 任務 ID,全局唯一
delay: 任務延時下發時間, 單位是秒
tries: 任務最大重試次數,tries = N 表示任務會最多下發 N 次
ttl(time to live): 任務最長有效期,超過之后任務自動消失
ttr(time to run): 任務預期執行時間,超過 ttr 則認為任務消費失敗,觸發任務自動重試
數據存儲
lmstfy 的 redis 存儲由四部分組成:
timer(sorted set) - 用來實現延遲任務的排序,再由后臺線程定期將到期的任務寫入到 Ready Queue 里面
ready queue (list) - 無延時或者已到期任務的隊列
deadletter (list) - 消費失敗(重試次數到達上限)的任務,可以手動重新放回隊列
job pool(string) - 存儲消息內容的池子
支持延遲的任務隊列本質上是兩個數據結構的結合: FIFO 和 sorted set。sorted set 用來實現延時的部分,將任務按照到期時間戳升序存儲,然后定期將到期的任務遷移至 FIFO(ready queue)。任務的具體內容只會存儲一份在 job pool 里面,其他的像 ready queue,timer,deadletter 只是存儲 job id,這樣可以節省一些內存空間。
以下是整體設計:
任務寫入
任務在寫入時會先產生一個 job id,目前 job id (16bytes) 包含寫入時間戳、 隨機數和延遲秒數, 然后寫入 key 為 j:{namespace}/{queue}/{ID} 的任務到任務池 (pool) 里面。之后根據延時時間來決定這個 job id 應該到 ready queue 還是 timer 里面:
delay = 0,表示不需要延時則直接寫到 ready queue(list)
delay = n(n > 0),表示需要延時,將延時加上當前系統時間作為絕對時間戳寫到 timer(sorted set)
timer 的實現是利用 zset 根據絕對時間戳進行排序,再由旁路線程定期輪詢將到期的任務通過 redis lua script 來將數據原子地轉移到 ready queue 里面。
任務消費
之前提到任務在消費失敗之后預期能夠重試,所以必須知道什么時候可認為任務消費失敗?業務在消費時需要攜帶 ttr(time to run) 參數,用來表示業務預期任務最長執行時間,如果在 ttr 時間內沒有收到業務主動回復 ACK 消息則會認為任務失敗(類似 tcp 的重傳 timer)。
消費時從 ready queue 中 (B)RPOP 出任務的 job id,然后根據 job id 從 pool 中將任務內容發送給消費者。同時對 tries 減1,根據消費的 ttr(time to run) 參數, 將任務放入 timer 中。如果 tries 為零, 在 ttr 時間到期后該 job id 會被放入 dead letter 隊列中(表示任務執行失敗)。
同步任務模型
lmstfy 除了可以用來實現異步和延時任務模型之外,因為 namespace 下面的隊列是動態創建且 job id 全局唯一,還可以用來實現同步任務模型 (producer 等到任務執行成功之后返回)。大概如下:
producer 寫入任務之后拿到 job id, 然后監聽(consume)以 job id 為名的隊列
consumer 消費任務成功后,寫回復消息到同樣以 job id 為名的隊列中
producer 如果規定時間內能讀到回復消息則認為消費成功,等待超時則認為任務失敗
如何實現橫向擴展
lmstfy 本身是無狀態的服務可以很簡單的實現橫向擴展,這里的橫向擴展主要是存儲(目前只支持 Redis)的橫向擴展。設計也比較簡單,主要通過通過 namespace 對應的 token 路由來實現, 比如我們當前配置兩組 Redis 資源: default 和 meipai:
[Pool][Pool.default]Addr = "1.1.1.1:6379"[Pool.meipai]Addr = "2.2.2.2:6389"在創建 namespace 時可以指定資源池,token 里面會攜帶資源池名字作為前綴。比指定美拍資源池,那么 token 類似: meipai:01DT8EZ1N6XT ,后續在處理請求時就可以根據 token 里面攜帶的資源池名稱來進行路由數據。不過這種設計實現隊列級別的擴展,如果單隊列存儲消息量超過 Redis 內存上限則需要其他手段來解決(后面會支持磁盤類型存儲)。
如何使用
# 創建 namespace 和 token, 注意這里使用管理端口$ ./scripts/token-cli -c -n test_ns -p default -D "test ns apply by @hulk" 127.0.0.1:7778{ "token": "01DT9323JACNBQ9JESV80G0000"}# 寫入內容為 value 的任務$ curl -XPUT -d "value" -i "http://127.0.0.1:7777/api/test_ns/q1?tries=3&delay=1&token=01DT931XGSPKNB7E2XFKPY3ZPB"{"job_id":"01DT9323JACNBQ9JESV80G0000總結
以上是生活随笔為你收集整理的redis延迟队列 如何确保成功消费_千万级延时任务队列如何实现,看美图开源的-LMSTFY...的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: redis数据库价格_Caffeine和
- 下一篇: 8s pod 查看 的yaml_Kube