rabbitmq的死信队列(四)
死信隊列
死信隊列,英文縮寫:DLX 。Dead Letter Exchange(死信交換機),當消息成為Dead message后,可以被重新發送到另一個交換機,這個交換機就是DLX。
消息成為死信的三種情況:
1. 隊列消息長度到達限制;
2. 消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標隊列,requeue=false;
3. 原隊列存在消息過期設置,消息到達超時時間未被消費;
隊列綁定死信交換機:
給隊列設置參數: x-dead-letter-exchange 和 x-dead-letter-routing-key和x-message-ttl和x-max-length
x-dead-letter-exchange:綁定的死信交換機名稱
x-dead-letter-routing-key:綁定正常隊列和死信交換機的路由
x-dead-letter-routing-key:ttl過期時間
x-max-length:設置正常隊列長度限制
rabbitmq-high-producer項目
application.properties文件
server.port=8081
# ip
spring.rabbitmq.host=127.0.0.1
#默認5672
spring.rabbitmq.port=5672
#用戶名
spring.rabbitmq.username=guest
#密碼
spring.rabbitmq.password=guest
#連接到代理時用的虛擬主機
spring.rabbitmq.virtual-host=/
#是否啟用【發布確認】,默認false
#spring.rabbitmq.publisher-confirm-type=correlated替換spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=correlated
#是否啟用【發布返回】,默認false
spring.rabbitmq.publisher-returns=true
#表示消息確認方式,其有三種配置方式,分別是none、manual和auto;默認auto
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#rabbitmq限流,必須在ack確認才能使用
#消費者最小數量
spring.rabbitmq.listener.simple.concurrency=1
#最大的消費者數量
spring.rabbitmq.listener.simple.max-concurrency=10
#在單個請求中處理的消息個數,他應該大于等于事務數量(unack的最大數量)
spring.rabbitmq.listener.simple.prefetch=2
DlxQueueRabbitConfig類
package com.qingfeng.rabbitmqhighproducer.dlx.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* 死信隊列
*/
@Configuration
public class DlxQueueRabbitConfig {
//正常隊列名稱
public static final String NORMAL_DLX_QUEUE = "normal_dlx_queue";
//正常交換機名稱
public static final String NORMAL_DLX_Exchange = "normal_dlx_exchange";
//ttl過期時間毫秒
private static final int NORMAL_DLX_EXPIRATION = 10000;
//設置正常隊列長度限制
private static final int NORMAL_DLX_LENGTH = 10;
//死信隊列名稱
public static final String DLX_QUEUE = "dlx_queue";
//死信交換機名稱
public static final String DLX_Exchange = "dlx_exchange";
//聲明正常交換機
@Bean("normalDlxExchange")
public TopicExchange normalDlxExchange(){
return new TopicExchange(NORMAL_DLX_Exchange);
}
//聲明正常隊列綁定死信隊列的交換機
@Bean("normalDlxQueue")
public Queue normalDlxQueue(){
return QueueBuilder.durable(NORMAL_DLX_QUEUE)
.withArgument("x-dead-letter-exchange", DLX_Exchange)
.withArgument("x-dead-letter-routing-key", "dlx.wq")
.withArgument("x-message-ttl", NORMAL_DLX_EXPIRATION)
.withArgument("x-max-length",NORMAL_DLX_LENGTH)
.build();
}
//聲明正常隊列和正常交換機的綁定
@Bean
public Binding normalDlxBinding(){
return BindingBuilder.bind(normalDlxQueue()).to(normalDlxExchange()).with("test.dlx.#");
}
//=========================================================================
//聲明死信隊列
@Bean
public Queue dlxQueue(){
return new Queue(DLX_QUEUE);
}
//聲明死信交換機
@Bean
public TopicExchange dlxExchange(){
return new TopicExchange(DLX_Exchange);
}
//聲明死信隊列和死信交換機的綁定
@Bean
public Binding dlxBinding(){
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dlx.#");
}
}
DlxController類
package com.qingfeng.rabbitmqhighproducer.dlx;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
@RestController
@RequestMapping("dlx")
public class DlxController {
@Autowired
private RabbitTemplate rabbitTemplate;
//http://127.0.0.1:8081/dlx/testTimeDLX
//測試時間過期
@GetMapping("/testTimeDLX")
public String testTimeDLX() {
String messageId = String.valueOf(UUID.randomUUID());
//normal_dlx_exchange正常交換機 test.dlx.wq:正常交換機與正常綁定的隊列的路由
rabbitTemplate.convertAndSend("normal_dlx_exchange", "test.dlx.wq", messageId+"變成死信隊列消息");
return "ok";
}
}
啟動rabbitmq-high-producer項目
1.測試原隊列存在消息過期設置,消息到達超時時間未被消費
http://127.0.0.1:8081/dlx/testTimeDLX
我們在設置的ttl過期時間10000毫秒過后,也就是10秒后,正常隊列的消息會轉到死信隊列里面去
2.測試隊列消息長度到達限制
DlxController類
package com.qingfeng.rabbitmqhighproducer.dlx;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
@RestController
@RequestMapping("dlx")
public class DlxController {
@Autowired
private RabbitTemplate rabbitTemplate;
//http://127.0.0.1:8081/dlx/veroLengthDLX
//2.測試隊列超出長度
@GetMapping("/veroLengthDLX")
public String veroLengthDLX() {
for (int i=0;i<20;i++){
String messageId = String.valueOf(UUID.randomUUID());
rabbitTemplate.convertAndSend("normal_dlx_exchange", "test.dlx.wq", messageId+"變成死信隊列消息");
}
return "ok";
}
}
啟動rabbitmq-high-producer項目
訪問:http://127.0.0.1:8081/dlx/veroLengthDLX
設置正常隊列長度限制為10,我們生產者發送了20個消息,正常隊列只能保存10個
我們在設置的ttl過期時間10000毫秒過后,也就是10秒后,正常隊列的消息會全部轉到死信隊列里面去
3消費者拒接消費消息,basicNack/basicReject,并且不把消息重新放入原目標隊列,requeue=false;
在rabbitmq-high-producer項目的DlxController類添加
package com.qingfeng.rabbitmqhighproducer.dlx;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.UUID;
@RestController
@RequestMapping("dlx")
public class DlxController {
@Autowired
private RabbitTemplate rabbitTemplate;
//3.測試消息被消費者拒收
//http://127.0.0.1:8081/dlx/rejectionDLX
@GetMapping("/rejectionDLX")
public String rejectionDLX() {
String messageId = String.valueOf(UUID.randomUUID());
rabbitTemplate.convertAndSend("normal_dlx_exchange", "test.dlx.wq", messageId+"變成死信隊列消息");
return "ok";
}
}
在rabbitmq-high-consumer項目
DxlListener類 開啟int i = 1/0;//出現錯誤
package com.qingfeng.rabbitmqhighconsumer.dxl;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* Consumer ACK機制:
* 1. 設置手動簽收。 spring.rabbitmq.listener.simple.acknowledge-mode=manual
* 2. 讓監聽器類實現ChannelAwareMessageListener接口
* 3. 如果消息成功處理,則調用channel的 basicAck()簽收
* 4. 如果消息處理失敗,則調用channel的basicNack()拒絕簽收,broker重新發送給consumer
*/
@Component
public class DxlListener {
//手動簽收
@RabbitHandler
@RabbitListener(queues = "normal_dlx_queue")
public void onMessage(Message message, Channel channel) throws Exception {
//Thread.sleep(1000);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
//1.接收轉換消息
System.out.println("接受到的消息為"+new String(message.getBody()));
//2. 處理業務邏輯
System.out.println("處理業務邏輯...");
int i = 1/0;//出現錯誤
//3. 手動簽收
channel.basicAck(deliveryTag,true);
} catch (Exception e) {
/**
* 4.有異常就拒絕簽收
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
* 第三個參數:requeue:重回隊列。如果設置為true,則消息重新回到queue,broker會重新發送該消息給消費
* requeue:true為將消息重返當前消息隊列,還可以重新發送給消費者;
* alse:將消息丟棄
*/
System.out.println("有異常就拒絕簽收");
//拒絕簽收,不重回隊列,requeue為false,這樣才能到死信隊列里面去
channel.basicNack(deliveryTag,true,false);
}
}
}
啟動rabbitmq-high-producer和rabbitmq-high-consumer項目
測試:http://127.0.0.1:8081/dlx/rejectionDLX
在rabbitmq-high-consumer項目consumer拒絕接收消息,直接轉到死信隊列去了
小結:
1. 死信交換機和死信隊列和普通的沒有區別
2. 當消息成為死信后,如果該隊列綁定了死信交換機,則消息會被死信交換機重新路由到死信隊列
3. 消息成為死信的三種情況:
1. 隊列消息長度到達限制;
2. 消費者拒接消費消息,并且不重回隊列;
3. 原隊列存在消息過期設置,消息到達超時時間未被消費;
總結
以上是生活随笔為你收集整理的rabbitmq的死信队列(四)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Thrift入门及Java实例演示
- 下一篇: 当maven引用的jar在maven库中