Rabbitmq如何设置优先级队列?如何限流?如何重试?如何处理幂等性?
優(yōu)先級隊列
方式一:可以通過RabbitMQ管理界面配置隊列的優(yōu)先級屬性,如下圖的x-max-priority
方式二:代碼設(shè)置
| Map<String,Object> args = new HashMap<String,Object>(); args.put("x-max-priority", 10); channel.queueDeclare("queue_priority", true, false, false, args); |
這里設(shè)置的是一個隊列queue的最大優(yōu)先級,之后要在發(fā)送的消息中設(shè)置消息本身的優(yōu)先級,設(shè)置代碼:
| AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.priority(5); AMQP.BasicProperties properties = builder.build(); channel.basicPublish("exchange_priority","rk_priority",properties,("messages").getBytes()); |
完整代碼:生產(chǎn)者
| public class Producer { ??? public static final String ip = "10.0.40.127"; ??? public static final int port = 5672; ??? public static final String username = "admin"; ??? public static final String password = "123456"; ? ??? public static void main(String[] args) throws IOException{ ????? ??ConnectionFactory connectionFactory = new ConnectionFactory(); ??????? connectionFactory.setPassword(password); ??????? connectionFactory.setUsername(username); ??????? connectionFactory.setPort(port); ??????? connectionFactory.setHost(ip); ? ???? /*?? Connection connection = connectionFactory.newConnection(); ??????? Channel channel = connection.createChannel(); ? ??????? //create exchange ??????? channel.exchangeDeclare("exchange_priority", "direct", true); ? ??????? //create queue with priority ??????? Map<String, Object> params = new HashMap<>(); ??????? params.put("x-max-priority", 10); ??????? channel.queueDeclare("queue_priority", true, false, false, params); ??????? channel.queueBind("queue_priority", "exchange_priority", "rk_priority"); ? ??????? //send message with priority ??????? for (int i = 0; i < 10; i++) { ??????????? AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); ??????????? if (i % 2 == 0) { ??????????????? builder.priority(5); ??????????? } ??????????? AMQP.BasicProperties properties = builder.build(); ??????????? channel.basicPublish("exchange_priority", "rk_priority", properties, ("produce messages-" + i).getBytes()); ??????? } ? ??????? channel.close(); ??????? connection.close();*/ ??? } } |
消費者
| public class Consumer { ??? public static final String ip = "10.0.40.127"; ??? public static final int port = 5672; ??? public static final String username = "admin"; ??? public static final String password = "123456"; ? ??? public static void main(String[] args) throws IOException, InterruptedException { ??????? ConnectionFactory connectionFactory = new ConnectionFactory(); ??????? connectionFactory.setPassword(password); ??????? connectionFactory.setUsername(username); ??????? connectionFactory.setPort(port); ??????? connectionFactory.setHost(ip); ? /*??????? Connection connection = connectionFactory.newConnection(); ??????? Channel channel = connection.createChannel(); ? ??????? QueueingConsumer consumer = new QueueingConsumer(channel); ??????? channel.basicConsume("queue_priority",consumer); ??????? while (true) { ??????????? QueueingConsumer.Delivery delivery = consumer.nextDelivery(); ??????????? String msg = new String(delivery.getBody()); ??????????? System.out.println(msg); ??????????? channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); ??????? }*/ ??? } } |
打印輸出:先輸出偶數(shù),后輸出奇數(shù)
如何限流?
1、為什么要對消費端限流?
如果Rabbitmq 服務(wù)器積壓了有上萬條未處理的消息,如果這時候連上了一個消費端,那么巨量的消息瞬間全部推送過來,但是單個客戶端無法同時處理這么多。當數(shù)據(jù)量特別大的時候?qū)οM端限流,用于保持消費端的穩(wěn)定,當消息數(shù)量激增的時候很有可能造成資源耗盡,以及影響服務(wù)的性能,導(dǎo)致系統(tǒng)的卡頓甚至直接崩潰。
2、限流的實現(xiàn)方式—限流api
RabbitMQ 提供了一種 qos (服務(wù)質(zhì)量保證)功能,即在非自動確認消息的前提下,如果一定數(shù)目的消息(通過基于 consume 或者 channel 設(shè)置 Qos 的值)未被確認前,不進行消費新的消息。
| void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException; |
- prefetchSize:0,單條消息大小限制,0代表不限制
- prefetchCount:一次性消費的消息數(shù)量。告訴 RabbitMQ 不要同時給一個消費者推送多于 N 個消息,即一旦有 N 個消息還沒有 ack,則該 consumer 將 block 掉,直到有消息 ack。
- global:true、false 是否將上面設(shè)置應(yīng)用于 channel,就是上面限制 channel 級別還是 consumer 級別。當我們設(shè)置為 false 的時候生效
- prefetchCount 在 no_ask=false 的情況下才生效,即在自動應(yīng)答的情況下這兩個值是不生效的。
3、如何進行限流?
- 首先第一步,使用消費端限流需要關(guān)閉自動 ack,將 autoAck 設(shè)置為 falsechannel.basicConsume(queueName, false, consumer);
- 第二步設(shè)置具體的限流大小以及數(shù)量。channel.basicQos(0, 15, false);
- 第三步在消費者的 handleDelivery 消費方法中手動 ack,并且設(shè)置批量處理 ack 回應(yīng)為 truechannel.basicAck(envelope.getDeliveryTag(), true);
?
?消息確認機制
1、如果沒有開啟ack消息確認,rabbitmq會認為這條消息沒有被消費,會將消息再次放入到隊列中,再次讓你消費,形成死循環(huán);
2、消費端配置了手動ack,但是在異常捕獲中設(shè)置了消息重新入隊,那么還是會出現(xiàn)死循環(huán)
| channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); |
因為最后一個參數(shù)requeue一般都會為true,此次沒調(diào)用到數(shù)據(jù),把這個消息返回到隊列中再消費,如果代碼中出現(xiàn)了int a=1/0,那么還是會造成死循環(huán)。
?消息重試機制
當你開啟了手動ack的時候再消費端如果在消費的時候出現(xiàn)異常也會導(dǎo)致循環(huán)消費,所以要啟動消息重試機制,默認是3次重試去消費一條消息,如果沒有消費完成,則丟棄(刪除)該消息或者放入死信隊列中或者進行人工補償。
| erver.port=8889 ? spring.rabbitmq.host=192.168.221.150 spring.rabbitmq.port=5672 spring.rabbitmq.username=zl spring.rabbitmq.password=123 #開啟消息確認機制 spring.rabbitmq.publisher-confirms=true #支持消息發(fā)送失敗返回隊列 spring.rabbitmq.publisher-returns=true ? #設(shè)置為 true 后 消費者在消息沒有被路由到合適隊列情況下會被return監(jiān)聽,而不會自動刪除 spring.rabbitmq.template.mandatory=true ? spring.rabbitmq.connection-timeout=15000 #用戶虛擬機權(quán)限名稱 spring.rabbitmq.virtual-host=/ ? #設(shè)置消費端手動 ack?? none不確認? auto自動確認? manual手動確認 spring.rabbitmq.listener.simple.acknowledge-mode=manual #消費者最小數(shù)量 spring.rabbitmq.listener.simple.concurrency=1 #消費之最大數(shù)量 spring.rabbitmq.listener.simple.max-concurrency=1 ? #開啟消費者重試機制(為false時關(guān)閉消費者重試,這時消費端代碼異常會一直重復(fù)收到消息) spring.rabbitmq.listener.simple.retry.enabled=true #重試次數(shù)5 spring.rabbitmq.listener.simple.retry.max-attempts=5 #重試時間間隔 spring.rabbitmq.listener.simple.retry.initial-interval=5000 ? #重試次數(shù)超過上面的設(shè)置之后是否丟棄(false不丟棄時需要寫相應(yīng)代碼將該消息加入死信隊列) spring.rabbitmq.listener.simple.default-requeue-rejected=true ? #在單個請求中處理的消息個數(shù),他應(yīng)該大于等于事務(wù)數(shù)量(unack的最大數(shù)量) spring.rabbitmq.listener.simple.prefetch=2 |
?
1、觸發(fā)重試機制需要消費者拋出異常,而不能try/catch捕捉異常,不然會死循環(huán)。
2、對于重試之后仍然異常的消息,mq默認的處理類是RejectAndDontRequeueRecoverer
見名知意。
SimpleRabbitListenerContainerFactoryConfigurer——>>RejectAndDontRequeueRecoverer(實現(xiàn)了MessageRecoverer接口)
?
| ? MessageRecoverer接口實現(xiàn)類 | RejectAndDontRequeueRecoverer |
| RepublishMessageRecoverer | |
| ImmediateRequeueMessageRecoverer |
?
優(yōu)化處理一:對于重試之后仍然異常的消息,可以采用RepublishMessageRecoverer,將消息發(fā)送到其他的隊列中,再專門針對新的隊列進行處理。
?
優(yōu)化處理二:采用死信隊列的方式處理重試失敗的消息。
| /** ?* 死信交換機 ?* @return ?*/ @Bean public DirectExchange dlxExchange(){ ??? return new DirectExchange(dlxExchangeName); } ? /** ?* 死信隊列 ?* @return ?*/ @Bean public Queue dlxQueue(){ ??? return new Queue(dlxQueueName); } ? /** ?* 死信隊列綁定死信交換機 ?* @param dlxQueue ?* @param dlxExchange ?* @return ?*/ @Bean public Binding dlcBinding(Queue dlxQueue, DirectExchange dlxExchange){ ??? return BindingBuilder.bind(dlxQueue).to(dlxExchange).with(dlxRoutingKey); } |
業(yè)務(wù)代碼添加死信交換機、死信路由配置
| /** ?* 業(yè)務(wù)隊列 ?* @return ?*/ @Bean public Queue queue(){ ??? Map<String,Object> params = new HashMap<>(); ??? params.put("x-dead-letter-exchange",dlxExchangeName);//聲明當前隊列綁定的死信交換機 ??? params.put("x-dead-letter-routing-key",dlxRoutingKey);//聲明當前隊列的死信路由鍵 ??? return QueueBuilder.durable(queueName).withArguments(params).build(); ??? //return new Queue(queueName,true); } | ? |
注意點:
1、消費者在重試5次后,由于MessageCover默認的實現(xiàn)類是RejectAndDontRequeueRecoverer,也就是requeue=false,因為業(yè)務(wù)隊列綁定了死信隊列,消息會從業(yè)務(wù)隊列中刪除,同時發(fā)送到死信隊列中。
2、如果ack模式是手動ack,那么需要調(diào)用channe.nack方法,同時設(shè)置requeue=false才會將異常消息發(fā)送到死信隊列中
重試使用場景:
對于消費端異常的消息,如果在有限次重試過程中消費成功是最好,如果有限次重試之后仍然失敗的消息,不管是采用RejectAndDontRequeueRecoverer還是使用死信隊列都是可以的,同時也可以采用折中的方法,先將消息從業(yè)務(wù)隊列中ack掉,再將消息發(fā)送到另外的一個隊列中,后續(xù)再單獨處理異常數(shù)據(jù)的隊列
考慮下面兩個場景:
1、http下載視頻或者圖片或者調(diào)用第三方接口
2、空指針異常或者類型轉(zhuǎn)換異常(其他的受檢查的運行時異常)
第一種重試有意義,第二種重試無意義,需要記錄日志以及人工處理或者輪詢?nèi)蝿?wù)方式處理。
重試的使用方式:
1、自動ack模式,不能catch異常
2、手動ack模式,不能try—catch異常
建議自動ack模式使用重試機制,如果一定要在手動ack模式下使用retry功能,最好還是確認在有限次重試過程中可以重試成功,否則超過重試次數(shù),又沒辦法執(zhí)行nack,會出現(xiàn)消息一直unack死循環(huán)
消息冪等性
| 問題 | 解決方案 |
| 消息重復(fù)消費問題:消費者消息處理了,沒來的及提交offset,再重啟可能導(dǎo)致重復(fù)消費 | 方式一:使用全局MessageID判斷消費方使用同一個,解決冪等性。 方式二:用一個消息消費表來記錄每一條消息,給每個一個消息設(shè)置一個id(uuid),消費了就保存到表中去。消息過來的時候先查詢是否已經(jīng)消費。 |
?
總結(jié)
以上是生活随笔為你收集整理的Rabbitmq如何设置优先级队列?如何限流?如何重试?如何处理幂等性?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 网页滚动截屏怎么截长图
- 下一篇: 20款超级好用的chrome拓展插件让你