?
在實際的業務中我們會遇見生產者產生的消息,不立即消費,而是延時一段時間在消費。RabbitMQ本身沒有直接支持延遲隊列功能,但是我們可以根據其特性Per-Queue Message TTL和?Dead Letter Exchanges實現延時隊列。也可以通過改特性設置消息的優先級。
1.Per-Queue Message TTL
RabbitMQ可以針對消息和隊列設置TTL(過期時間)。隊列中的消息過期時間(Time To Live, TTL)有兩種方法可以設置。第一種方法是通過隊列屬性設置,隊列中所有消息都有相同的過期時間。第二種方法是對消息進行單獨設置,每條消息TTL可以不同。如果上述兩種方法同時使用,則消息的過期時間以兩者之間TTL較小的那個數值為準。消息在隊列的生存時間一旦超過設置的TTL值,就成為dead message,消費者將無法再收到該消息。
2.Dead Letter Exchanges
當消息在一個隊列中變成死信后,它能被重新publish到另一個Exchange。消息變成Dead Letter一向有以下幾種情況:
消息被拒絕(basic.reject or basic.nack)并且requeue=false
消息TTL過期
隊列達到最大長度
實際上就是設置某個隊列的屬性,當這個隊列中有Dead Letter時,RabbitMQ就會自動的將這個消息重新發布到設置的Exchange中去,進而被路由到另一個隊列,publish可以監聽這個隊列中消息做相應的處理,這個特性可以彌補RabbitMQ 3.0.0以前支持的immediate參數中的向publish確認的功能。
雖然 consumer 從來看不到過期的 message ,但是在過期 message 到達 queue 的頭部時確實會被真正的丟棄(或者 dead-lettered )。當對每一個 queue 設置了 TTL 值時不會產生任何問題,因為過期的 message 總是會出現在 queue 的頭部。當對每一條 message 設置了 TTL 時,過期的 message 可能會排隊于未過期 message 的后面,直到這些消息被 consume 到或者過期了。在這種情況下,這些過期的 message 使用的資源將不會被釋放,且會在 queue 統計信息中被計算進去(例如,queue 中存在的 message 的數量)。對于第一種設置隊列TTL屬性的方法,一旦消息過期,就會從隊列中抹去,而第二種方法里,即使消息過期,也不會馬上從隊列中抹去,因為每條消息是否過期時在即將投遞到消費者之前判定的,為什么兩者得處理方法不一致?因為第一種方法里,隊列中已過期的消息肯定在隊列頭部,RabbitMQ只要定期從隊頭開始掃描是否有過期消息即可,而第二種方法里,每條消息的過期時間不同,如果要刪除所有過期消息,勢必要掃描整個隊列,所以不如等到此消息即將被消費時再判定是否過期,如果過期,再進行刪除。
?
一、在隊列上設置TTL
1.建立delay.exchange
這里Internal設置為NO,否則將無法接受dead letter,YES表示這個exchange不可以被client用來推送消息,僅用來進行exchange和exchange之間的綁定。
2.建立延時隊列(delay queue)
如上配置延時5min隊列(x-message-ttl=300000)
x-max-length:最大積壓的消息個數,可以根據自己的實際情況設置,超過限制消息不會丟失,會立即轉向delay.exchange進行投遞
x-dead-letter-exchange:設置為剛剛配置好的delay.exchange,消息過期后會通過delay.exchange進行投遞
這里不需要配置"dead letter routing key"否則會覆蓋掉消息發送時攜帶的routingkey,導致后面無法路由為剛才配置的delay.exchange
3.配置延時路由規則
需要延時的消息到exchange后先路由到指定的延時隊列
1)創建delaysync.exchange通過Routing key將消息路由到延時隊列
?
2.配置delay.exchange 將消息投遞到正常的消費隊列
?
配置完成。
下面使用代碼測試一下:
生產者:
?
package cn.slimsmart.study.rabbitmq.delayqueue.queue; import java.io.IOException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private static String queue_name = "test.queue"; public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.1.199.169"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queue_name, true, false, false, null); String message = "hello world!" + System.currentTimeMillis(); channel.basicPublish("delaysync.exchange", "deal.message", null, message.getBytes()); System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis()); channel.close(); connection.close(); } } ?消費者: ?
package cn.slimsmart.study.rabbitmq.delayqueue.queue; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Consumer { private static String queue_name = "test.queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.1.199.169"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(queue_name, true, false, false, null); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queue_name, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("received message:" + message + ",date:" + System.currentTimeMillis()); } } } 二、在消息上設置TTL
?
實現代碼:
生產者:?
?
package cn.slimsmart.study.rabbitmq.delayqueue.message; import java.io.IOException; import java.util.HashMap; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Producer { private static String queue_name = "message_ttl_queue"; public static void main(String[] args) throws IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.1.199.169"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); HashMap<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "amq.direct"); arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey"); channel.queueDeclare("delay_queue", true, false, false, arguments); channel.queueDeclare(queue_name, true, false, false, null); channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey"); String message = "hello world!" + System.currentTimeMillis(); AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); AMQP.BasicProperties properties = builder.expiration("300000").deliveryMode(2).build(); channel.basicPublish("", "delay_queue", properties, message.getBytes()); System.out.println("sent message: " + message + ",date:" + System.currentTimeMillis()); channel.close(); connection.close(); } } ?
消費者:
?
package cn.slimsmart.study.rabbitmq.delayqueue.message; import java.util.HashMap; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; public class Consumer { private static String queue_name = "message_ttl_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("10.1.199.169"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); HashMap<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-dead-letter-exchange", "amq.direct"); arguments.put("x-dead-letter-routing-key", "message_ttl_routingKey"); channel.queueDeclare("delay_queue", true, false, false, arguments); channel.queueDeclare(queue_name, true, false, false, null); channel.queueBind(queue_name, "amq.direct", "message_ttl_routingKey"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queue_name, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println("received message:" + message + ",date:" + System.currentTimeMillis()); } } } ?
spring-rabbit整合教程
maven依賴:
?
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>1.4.6.RELEASE</version> </dependency> spring配置文件(在文件頭部引入rabbit的命名空間和約束文件):
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:context="http://www.springframework.org/schema/context" xmlns:mvc="http://www.springframework.org/schema/mvc" xmlns:task="http://www.springframework.org/schema/task" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd"> <rabbit:connection-factory id="connectionFactory" host="你的rabbitMQ服務的ip" virtual-host="/vhost名稱" username="用戶名" password="密碼" port="5672" /> <rabbit:admin connection-factory="connectionFactory"/> <rabbit:template id="amqpTemplate" exchange="my_exchange" connection-factory="connectionFactory" /> <rabbit:queue name="my_queue" durable="true" auto-delete="false" exclusive="false"/> <rabbit:topic-exchange name="my_exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="my_queue" pattern="my_patt"/> </rabbit:bindings> </rabbit:topic-exchange> <bean id="rabbitmqService" class="com.group.service.RabbitmqService" /> <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" > <rabbit:listener queue-names="my_queue" ref="rabbitmqService" method="test"/> </rabbit:listener-container> </beans> 那么在項目中裝配amqpTemplate中就可以發送消息了
?
?
總結
以上是生活随笔為你收集整理的java实现rabbitMQ延时队列详解以及spring-rabbit整合教程的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。