使用 Redis 如何实现延迟队列?
延遲消息隊列在我們的日常工作中經(jīng)常會被用到,比如支付系統(tǒng)中超過 30 分鐘未支付的訂單,將會被取消,這樣就可以保證此商品庫存可以釋放給其他人購買,還有外賣系統(tǒng)如果商家超過 5 分鐘未接單的訂單,將會被自動取消,以此來保證用戶可以更及時的吃到自己點的外賣,等等諸如此類的業(yè)務(wù)場景都需要使用到延遲消息隊列,又因為它在業(yè)務(wù)中比較常見,因此這個知識點在面試中也會經(jīng)常被問到。
我們本文的面試題是,使用 Redis 如何實現(xiàn)延遲消息隊列?
典型回答
延遲消息隊列的常見實現(xiàn)方式是通過 ZSet 的存儲于查詢來實現(xiàn),它的核心思想是在程序中開啟一個一直循環(huán)的延遲任務(wù)的檢測器,用于檢測和調(diào)用延遲任務(wù)的執(zhí)行,如下圖所示: ZSet 實現(xiàn)延遲任務(wù)的方式有兩種,第一種是利用 zrangebyscore 查詢符合條件的所有待處理任務(wù),循環(huán)執(zhí)行隊列任務(wù);第二種實現(xiàn)方式是每次查詢最早的一條消息,判斷這條信息的執(zhí)行時間是否小于等于此刻的時間,如果是則執(zhí)行此任務(wù),否則繼續(xù)循環(huán)檢測。
方式一:zrangebyscore 查詢所有任務(wù) 此實現(xiàn)方式是一次性查詢出所有的延遲任務(wù),然后再進行執(zhí)行,實現(xiàn)代碼如下:
import redis.clients.jedis.Jedis; import utils.JedisUtils;import java.time.Instant; import java.util.Set;/*** 延遲隊列*/ public class DelayQueueExample {// zset keyprivate static final String _KEY = "myDelayQueue";public static void main(String[] args) throws InterruptedException {Jedis jedis = JedisUtils.getJedis();// 延遲 30s 執(zhí)行(30s 后的時間)long delayTime = Instant.now().plusSeconds(30).getEpochSecond();jedis.zadd(_KEY, delayTime, "order_1");// 繼續(xù)添加測試數(shù)據(jù)jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");// 開啟延遲隊列doDelayQueue(jedis);}/*** 延遲隊列消費* @param jedis Redis 客戶端*/public static void doDelayQueue(Jedis jedis) throws InterruptedException {while (true) {// 當前時間Instant nowInstant = Instant.now();long lastSecond = nowInstant.plusSeconds(-1).getEpochSecond(); // 上一秒時間long nowSecond = nowInstant.getEpochSecond();// 查詢當前時間的所有任務(wù)Set<String> data = jedis.zrangeByScore(_KEY, lastSecond, nowSecond);for (String item : data) {// 消費任務(wù)System.out.println("消費:" + item);}// 刪除已經(jīng)執(zhí)行的任務(wù)jedis.zremrangeByScore(_KEY, lastSecond, nowSecond);Thread.sleep(1000); // 每秒輪詢一次}} }以上程序執(zhí)行結(jié)果如下:
消費:order2 消費:order3 消費:order4 消費:order5 消費:order_1
方式二:判斷最早的任務(wù) 此實現(xiàn)方式是每次查詢最早的一條任務(wù),再與當前時間進行判斷,如果任務(wù)執(zhí)行時間大于當前時間則表示應(yīng)該立即執(zhí)行延遲任務(wù),實現(xiàn)代碼如下:
import redis.clients.jedis.Jedis; import utils.JedisUtils;import java.time.Instant; import java.util.Set;/*** 延遲隊列*/ public class DelayQueueExample {// zset keyprivate static final String _KEY = "myDelayQueue";public static void main(String[] args) throws InterruptedException {Jedis jedis = JedisUtils.getJedis();// 延遲 30s 執(zhí)行(30s 后的時間)long delayTime = Instant.now().plusSeconds(30).getEpochSecond();jedis.zadd(_KEY, delayTime, "order_1");// 繼續(xù)添加測試數(shù)據(jù)jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_2");jedis.zadd(_KEY, Instant.now().plusSeconds(2).getEpochSecond(), "order_3");jedis.zadd(_KEY, Instant.now().plusSeconds(7).getEpochSecond(), "order_4");jedis.zadd(_KEY, Instant.now().plusSeconds(10).getEpochSecond(), "order_5");// 開啟延遲隊列doDelayQueue2(jedis);}/*** 延遲隊列消費(方式 2)* @param jedis Redis 客戶端*/public static void doDelayQueue2(Jedis jedis) throws InterruptedException {while (true) {// 當前時間long nowSecond = Instant.now().getEpochSecond();// 每次查詢一條消息,判斷此消息的執(zhí)行時間Set<String> data = jedis.zrange(_KEY, 0, 0);if (data.size() == 1) {String firstValue = data.iterator().next();// 消息執(zhí)行時間Double score = jedis.zscore(_KEY, firstValue);if (nowSecond >= score) {// 消費消息(業(yè)務(wù)功能處理)System.out.println("消費消息:" + firstValue);// 刪除已經(jīng)執(zhí)行的任務(wù)jedis.zrem(_KEY, firstValue);}}Thread.sleep(100); // 執(zhí)行間隔}} }以上程序執(zhí)行結(jié)果和實現(xiàn)方式一相同,結(jié)果如下:
消費:order2 消費:order3 消費:order4 消費:order5 消費:order_1
其中,執(zhí)行間隔代碼 Thread.sleep(100) 可根據(jù)實際的業(yè)務(wù)情況刪減或配置。
考點分析
延遲消息隊列的實現(xiàn)方法有很多種,不同的公司可能使用的技術(shù)也是不同的,我上面是從 Redis 的角度出發(fā)來實現(xiàn)了延遲消息隊列,但一般面試官不會就此罷休,會借著這個問題來問關(guān)于更多的延遲消息隊列的實現(xiàn)方法,因此除了 Redis 實現(xiàn)延遲消息隊列的方式,我們還需要具備一些其他的常見的延遲隊列的實現(xiàn)方法。
和此知識點相關(guān)的面試題還有以下這些:
- 使用 Java 語言如何實現(xiàn)一個延遲消息隊列?
- 你還知道哪些實現(xiàn)延遲消息隊列的方法?
知識擴展
Java 中的延遲消息隊列
我們可以使用 Java 語言中自帶的 DelayQueue 數(shù)據(jù)類型來實現(xiàn)一個延遲消息隊列,實現(xiàn)代碼如下:
public class DelayTest {public static void main(String[] args) throws InterruptedException {DelayQueue delayQueue = new DelayQueue();delayQueue.put(new DelayElement(1000));delayQueue.put(new DelayElement(3000));delayQueue.put(new DelayElement(5000));System.out.println("開始時間:" + DateFormat.getDateTimeInstance().format(new Date()));while (!delayQueue.isEmpty()){System.out.println(delayQueue.take());}System.out.println("結(jié)束時間:" + DateFormat.getDateTimeInstance().format(new Date()));}static class DelayElement implements Delayed {// 延遲截止時間(單面:毫秒)long delayTime = System.currentTimeMillis();public DelayElement(long delayTime) {this.delayTime = (this.delayTime + delayTime);}@Override// 獲取剩余時間public long getDelay(TimeUnit unit) {return unit.convert(delayTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);}@Override// 隊列里元素的排序依據(jù)public int compareTo(Delayed o) {if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {return 1;} else if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {return -1;} else {return 0;}}@Overridepublic String toString() {return DateFormat.getDateTimeInstance().format(new Date(delayTime));}} }以上程序執(zhí)行的結(jié)果如下:
開始時間:2019-6-13 20:40:38 2019-6-13 20:40:39 2019-6-13 20:40:41 2019-6-13 20:40:43 結(jié)束時間:2019-6-13 20:40:43
此實現(xiàn)方式的優(yōu)點是開發(fā)比較方便,可以直接在代碼中使用,實現(xiàn)代碼也比較簡單,但它缺點是數(shù)據(jù)保存在內(nèi)存中,因此可能存在數(shù)據(jù)丟失的風險,最大的問題是它無法支持分布式系統(tǒng)。
使用 MQ 實現(xiàn)延遲消息隊列
我們使用主流的 MQ 中間件也可以方便的實現(xiàn)延遲消息隊列的功能,比如 RabbitMQ,我們可以通過它的 rabbitmq-delayed-message-exchange 插件來實現(xiàn)延遲隊列。
首先我們需要配置并開啟 rabbitmq-delayed-message-exchange 插件,然后再通過以下代碼來實現(xiàn)延遲消息隊列。
配置消息隊列:
import com.example.rabbitmq.mq.DirectConfig; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map;@Configuration public class DelayedConfig {final static String QUEUE_NAME = "delayed.goods.order";final static String EXCHANGE_NAME = "delayedec";@Beanpublic Queue queue() {return new Queue(DelayedConfig.QUEUE_NAME);}// 配置默認的交換機@BeanCustomExchange customExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct");//參數(shù)二為類型:必須是 x-delayed-messagereturn new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args);}// 綁定隊列到交換器@BeanBinding binding(Queue queue, CustomExchange exchange) {return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs();} }發(fā)送者實現(xiàn)代碼如下:
import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date;@Component public class DelayedSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg) {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("發(fā)送時間:" + sf.format(new Date()));rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setHeader("x-delay", 3000);return message;}});} }從上述代碼我們可以看出,我們配置 3s 之后再進行任務(wù)執(zhí)行。
消費者實現(xiàn)代碼如下:
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date;@Component @RabbitListener(queues = "delayed.goods.order") public class DelayedReceiver {@RabbitHandlerpublic void process(String msg) {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.println("接收時間:" + sdf.format(new Date()));System.out.println("消息內(nèi)容:" + msg);} }測試代碼如下:
import com.example.rabbitmq.RabbitmqApplication; import com.example.rabbitmq.mq.delayed.DelayedSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;import java.text.SimpleDateFormat; import java.util.Date;@RunWith(SpringRunner.class) @SpringBootTest public class DelayedTest {@Autowiredprivate DelayedSender sender;@Testpublic void Test() throws InterruptedException {SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd");sender.send("Hi Admin.");Thread.sleep(5 * 1000); //等待接收程序執(zhí)行之后,再退出測試} }以上程序的執(zhí)行結(jié)果為:
發(fā)送時間:2020-06-11 20:47:51 接收時間:2018-06-11 20:47:54 消息內(nèi)容:Hi Admin.
從上述結(jié)果中可以看出,當消息進入延遲隊列 3s 之后才被正常消費,執(zhí)行結(jié)果符合我的預(yù)期,RabbitMQ 成功的實現(xiàn)了延遲消息隊列。
總結(jié)
本文我們講了延遲消息隊列的兩種使用場景:支付系統(tǒng)中的超過 30 分鐘未支付的訂單,將會被自動取消,以此來保證此商品的庫存可以正常釋放給其他人購買,還有外賣系統(tǒng)如果商家超過 5 分鐘未接單的訂單,將會被自動取消,以此來保證用戶可以更及時的吃到自己點的外賣。并且我們講了延遲隊列的 4 種實現(xiàn)方式,使用 ZSet 的 2 種實現(xiàn)方式,以及 Java 語言中的 DelayQueue 的實現(xiàn)方式,還有 RabbitMQ 的插件 rabbitmq-delayed-message-exchange 的實現(xiàn)方式。
總結(jié)
以上是生活随笔為你收集整理的使用 Redis 如何实现延迟队列?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 算法图解:如何用两个栈实现一个队列?
- 下一篇: IDEA 终于支持中文版和 JDK 直接