java redis延迟队列_基于redis实现的延迟消息队列
delay-queue
redis實(shí)現(xiàn)延遲消息隊(duì)列
需求背景
最近在做一個(gè)排隊(duì)取號(hào)的系統(tǒng)
在用戶預(yù)約時(shí)間到達(dá)前XX分鐘發(fā)短信通知
在用戶預(yù)約時(shí)間結(jié)束時(shí)要判斷用戶是否去取號(hào)了,不然就記錄為爽約
在用戶取號(hào)后開(kāi)始,等待XX分鐘后要發(fā)短信提醒是否需要使用其他渠道辦理
類似的場(chǎng)景太多,最簡(jiǎn)單的解決辦法就是定時(shí)任務(wù)去掃表。這樣每個(gè)業(yè)務(wù)都要維護(hù)自己的掃表邏輯,
而且隨著時(shí)間的推移數(shù)據(jù)量會(huì)越來(lái)越多的,有的數(shù)據(jù)可能會(huì)延遲比較大
經(jīng)過(guò)一番搜索,網(wǎng)上說(shuō)rabbitmq可以滿足延遲執(zhí)行需求,但是目前系統(tǒng)用了其他消息中間件,所以不打算用。
基于Redis實(shí)現(xiàn)的延遲消息隊(duì)列java版:項(xiàng)目github地址:delay-queue
整體結(jié)構(gòu)
整個(gè)延遲隊(duì)列由4個(gè)部分組成:
JobPool用來(lái)存放所有Job的元信息。利用redis 的hash結(jié)構(gòu)
DelayBucket是一組以時(shí)間為維度的有序隊(duì)列,用來(lái)存放所有需要延遲的Job(這里只存放Job Id)。利用redis 的 有序集合zset
Timer負(fù)責(zé)實(shí)時(shí)掃描各個(gè)Bucket,并將delay時(shí)間大于等于當(dāng)前時(shí)間的Job放入到對(duì)應(yīng)的Ready Queue。利用redis 的list 結(jié)構(gòu)
ReadyQueue存放處于Ready狀態(tài)的Job(這里只存放JobId),以供消費(fèi)程序消費(fèi)。
結(jié)構(gòu)圖
消息結(jié)構(gòu)
每個(gè)Job必須包含一下幾個(gè)屬性:
topic:Job類型。可以理解成具體的業(yè)務(wù)名稱。
id:Job的唯一標(biāo)識(shí)。用來(lái)檢索和刪除指定的Job信息。
delayTime:jod延遲執(zhí)行的時(shí)間,13位時(shí)間戳
ttr(time-to-run):Job執(zhí)行超時(shí)時(shí)間。單位:秒。主要是為了消息可靠性
message:Job的內(nèi)容,供消費(fèi)者做具體的業(yè)務(wù)處理,以json格式存儲(chǔ)。
舉例說(shuō)明一個(gè)Job的生命周期
用戶預(yù)約后,同時(shí)往JobPool里put一個(gè)job。job結(jié)構(gòu)為:{‘topic':'book’, ‘id':'123456’, ‘delayTime’:1517069375398 ,’ttrTime':60 , ‘message':’XXXXXXX’}
同時(shí)以jobId作為value,delayTime作為score 存到bucket 中,用jobId取模,放到10個(gè)bucket中,以提高效率
timer每時(shí)每刻都在輪詢各個(gè)bucket,按照score排序去最小的一個(gè),當(dāng)delayTime < 當(dāng)前時(shí)間后,,取得job id從job pool中獲取元信息。
如果這時(shí)該job處于deleted狀態(tài),則pass,繼續(xù)做輪詢;如果job處于非deleted狀態(tài),首先再次確認(rèn)元信息中delayTime是否大于等于當(dāng)前時(shí)間,
如果滿足則根據(jù)topic將jobId放入對(duì)應(yīng)的ready queue,然后從bucket中移除,并且;如果不滿足則重新計(jì)算delay時(shí)間,再次放入bucket,并將之前的job id從bucket中移除。
消費(fèi)端輪詢對(duì)應(yīng)的topic的ready queue,獲取job后做自己的業(yè)務(wù)邏輯。與此同時(shí),服務(wù)端將已經(jīng)被消費(fèi)端獲取的job按照其設(shè)定的TTR,重新計(jì)算執(zhí)行時(shí)間,并將其放入bucket。
消費(fèi)端處理完業(yè)務(wù)后向服務(wù)端響應(yīng)finish,服務(wù)端根據(jù)job id刪除對(duì)應(yīng)的元信息。如果消費(fèi)端在ttr時(shí)間內(nèi)沒(méi)有響應(yīng),則ttr時(shí)間后會(huì)再收到該消息
后續(xù)擴(kuò)展
加上超時(shí)重發(fā)次數(shù)
實(shí)現(xiàn)思路
任務(wù)job內(nèi)容包含Array{0,0,2m,10m,10m,1h,2h,6h,15h}和通知到第幾次N(這里N=1, 即第1次).
消費(fèi)者從隊(duì)列中取出任務(wù), 根據(jù)N取得對(duì)應(yīng)的時(shí)間間隔為0, 立即發(fā)送通知.
第1次通知失敗, N += 1 => 2
從Array中取得間隔時(shí)間為2m, 添加一個(gè)延遲時(shí)間為2m的任務(wù)到延遲隊(duì)列, 任務(wù)內(nèi)容仍包含Array和N
第2次通知失敗, N += 1 => 3, 取出對(duì)應(yīng)的間隔時(shí)間10m, 添加一個(gè)任務(wù)到延遲隊(duì)列, 同上
......
第7次通知失敗, N += 1 => 8, 取出對(duì)應(yīng)的間隔時(shí)間15h, 添加一個(gè)任務(wù)到延遲隊(duì)列, 同上
第8次通知失敗, N += 1 => 9, 取不到間隔時(shí)間, 結(jié)束通知
引用說(shuō)明
參考有贊延遲隊(duì)列思路設(shè)計(jì)實(shí)現(xiàn)
總結(jié)
以上是生活随笔為你收集整理的java redis延迟队列_基于redis实现的延迟消息队列的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 2021高考成绩23号几点可以查询,20
- 下一篇: APMServ中Apache启动失败解决