【学习笔记】rabbitmq设置队列ttl和使用延迟插件的代码示例
文章目錄
- 設置隊列ttl
- 配置文件
- 生產(chǎn)者
- 消費者
- 設置消息ttl
- 延遲插件的使用
- 修改配置文件
- 修改生產(chǎn)者
- 修改消費者
設置隊列ttl
代碼架構:
創(chuàng)建兩個隊列QA和QB,兩者隊列TTL分別設置為10S和40S,然后在創(chuàng)建一個交換機X和死信交換機Y,它們的類型都是direct,創(chuàng)建一個死信隊列QD
配置文件
spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ @Configuration public class Rabbitmqconfig {public static final String X_EXCHANGE = "X";public static final String QUEUE_A = "QA";public static final String QUEUE_B = "QB"; public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; public static final String DEAD_LETTER_QUEUE = "QD";// 聲明xExchange@Bean("xExchange")public DirectExchange xExchange(){return new DirectExchange(X_EXCHANGE); } // 聲明xExchange@Bean("yExchange")public DirectExchange yExchange(){return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); }//聲明隊列A ttl為10s并綁定到對應的死信交換機@Bean("queueA")public Queue queueA(){Map<String,Object> args = new HashMap<String,Object>();args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//聲明當前隊列綁定的死信交換機args.put("x-dead-letter-routing-key","YD");args.put("x-message-ttl",10000);return QueueBuilder.durable(QUEUE_A).withArguments(args).build();}@Beanpublic Binding queueaBingX(@Qualifier("queueA")Queue queueA,@Qualifier("xExchange")DirectExchange exchange){return BindingBuilder.bind(queueA).to(exchange).with("XA");//通過XA路由鍵讓交換機與隊列A綁定}//聲明隊列A ttl為40s并綁定到對應的死信交換機@Bean("queueB")public Queue queueB(){Map<String,Object> args = new HashMap<String,Object>();args.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);//聲明當前隊列綁定的死信交換機args.put("x-dead-letter-routing-key","YD");args.put("x-message-ttl",40000);return QueueBuilder.durable(QUEUE_B).withArguments(args).build();}@Beanpublic Binding queuebBingX(@Qualifier("queueB")Queue queueA,@Qualifier("xExchange")DirectExchange exchange){return BindingBuilder.bind(queueA).to(exchange).with("XB");//通過XB路由鍵讓交換機與隊列B綁定//這里隊列A和隊列B綁定的是同一個交換機}@Bean("queueD")public Queue queueD(){return new Queue(DEAD_LETTER_QUEUE);//聲明死信隊列}@Bean//死信隊列與交換機通過yd路由鍵綁定 這里隊列綁定的是y交換機而不是x交換機,上面兩個隊列綁定的是x交換機public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){return BindingBuilder.bind(queueD).to(yExchange).with("YD");} }生產(chǎn)者
@RestController public class Producer { @Autowiredprivate RabbitTemplate rabbitTemplate;@GetMapping("sendMsg/{message}")public void sendMsg(@PathVariable String message){System.out.println("當前時間"+new Date().toString()+" 發(fā)送的消息:"+message);rabbitTemplate.convertAndSend("X","XA","消息來自ttl為10的隊列"+message);rabbitTemplate.convertAndSend("X","XB","消息來自ttl為40的隊列"+message);} }消費者
@Component public class DeadLetterQueueConsumer {@RabbitListener(queues = "QD")//配置文件已經(jīng)聲明了死信隊列 Queue("QD");//聲明死信隊列public void receiveD(Message message, Channel channel) throws IOException {String msg = new String(message.getBody());System.out.println("當前時間:" +new Date().toString()+",收到死信隊列信息"+msg); } }啟動項目:
控制臺:
設置消息ttl
上面是對隊列屬性設置了過期時間,但如果有很多數(shù)據(jù)需要設置不同的過期時間則需要很多隊列,這樣明顯浪費不必要的內(nèi)存,這里也可以對消息設置不同過期時間:
再定義一個新隊列,這里隊列不再設置ttl屬性:
修改生產(chǎn)者:
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){ rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{correlationData.getMessageProperties().setExpiration(ttlTime); return correlationData; });System.out.println("當前時間:{}"+ new Date().toString()+"發(fā)送一條時長"+ttlTime+"毫秒TTL信息給隊列C:"+ message); }看起來似乎沒什么問題,但是在最開始的時候,就介紹過如果使用在消息屬性上設置TTL的方式,消息可能并不會按時“死亡“,因為RabbitMQ只會檢查第一個消息是否過期,如果過期則丟到死信隊列,如果第一個消息的延時時長很長,而第二個消息的延時時長很短,第二個消息并不會優(yōu)先得到執(zhí)行。
延遲插件的使用
官網(wǎng)https://www.rabbitmq.com/community-plugins.html
下載:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
允許使用插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
修改綁定關系:
修改配置文件
在我們自定義的交換機中,這是一種新的交換類型,該類型消息支持延遲投遞機制 消息傳遞后并不會立即投遞到目標隊列中,而是存儲在mnesia(一個分布式數(shù)據(jù)系統(tǒng))表中,當達到投遞時間時,才投遞到目標隊列中。
@Configuration public class DelayedQueueConfig {//自定義交換機 我們在這里定義的是一個延遲交換機@Bean public CustomExchange delayedExchange() { Map<String, Object> args = new HashMap<>(); //自定義交換機的類型 args.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args); } @Bean public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange) {return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();} }修改生產(chǎn)者
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";@GetMapping("sendDelayMsg/{message}/{delayTime}")public void sendMsg(@PathVariable String message, @PathVariable Integer delayTime) {rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message, correlationData -> {correlationData.getMessageProperties().setDelay(delayTime);return correlationData;});}修改消費者
public static final String DELAYED_QUEUE_NAME = "delayed.queue";@RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveDelayedQueue(Message message){ String msg = new String(message.getBody()); log.info("當前時間:{},收到延時隊列的消息:{}", new Date().toString(), msg); }小結:
延時隊列在需要延時處理的場景下非常有用,使用RabbitMQ來實現(xiàn)延時隊列可以很好的利用RabbitMQ的特性,如:消息可靠發(fā)送、消息可靠投遞、死信隊列來保障消息至少被消費一次以及未被正確處理的消息不會被丟棄。另外,通過RabbitMQ集群的特性,可以很好的解決單點故障問題,不會因為單個節(jié)點掛掉導致延時隊列不可用或者消息丟失。
當然,延時隊列還有很多其它選擇,比如利用Java的DelayQueue,利用Redis的zset,利用Quartz或者利用kafka的時間輪,這些方式各有特點,看需要適用的場景
總結
以上是生活随笔為你收集整理的【学习笔记】rabbitmq设置队列ttl和使用延迟插件的代码示例的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: java解压文件、复制文件、删除文件代码
- 下一篇: 对二维数组自定义排序、Treemap自定