【RabbitMQ】8、RabbitMQ之mandatory和immediate
1. 概述
mandatory和immediate是AMQP協(xié)議中basic.publish方法中的兩個(gè)標(biāo)識(shí)位,它們都有當(dāng)消息傳遞過程中不可達(dá)目的地時(shí)將消息返回給生產(chǎn)者的功能。對(duì)于剛開始接觸RabbitMQ的朋友特別容易被這兩個(gè)參數(shù)搞混,這里博主整理了寫資料,簡(jiǎn)單講解下這兩個(gè)標(biāo)識(shí)位。
mandatory?
當(dāng)mandatory標(biāo)志位設(shè)置為true時(shí),如果exchange根據(jù)自身類型和消息routeKey無法找到一個(gè)符合條件的queue,那么會(huì)調(diào)用basic.return方法將消息返回給生產(chǎn)者(Basic.Return + Content-Header + Content-Body);當(dāng)mandatory設(shè)置為false時(shí),出現(xiàn)上述情形broker會(huì)直接將消息扔掉。
mandatory標(biāo)志的作用:在消息沒有被路由到合適隊(duì)列情況下會(huì)將消息返還給消息發(fā)布者,同時(shí)我們測(cè)試了哪些情況下消息不會(huì)到達(dá)合適的隊(duì)列,測(cè)試1演示的是創(chuàng)建了exchange但是沒有為他綁定隊(duì)列導(dǎo)致的消息未到達(dá)合適隊(duì)列,測(cè)試3演示的是創(chuàng)建了exchange同時(shí)創(chuàng)建了queue,但是在將兩者綁定的時(shí)候,使用的bindingKey和消息發(fā)布者使用的rountingKey不一致導(dǎo)致的消息未到達(dá)合適隊(duì)列;
immediate?
當(dāng)immediate標(biāo)志位設(shè)置為true時(shí),如果exchange在將消息路由到queue(s)時(shí)發(fā)現(xiàn)對(duì)于的queue上么有消費(fèi)者,那么這條消息不會(huì)放入隊(duì)列中。當(dāng)與消息routeKey關(guān)聯(lián)的所有queue(一個(gè)或者多個(gè))都沒有消費(fèi)者時(shí),該消息會(huì)通過basic.return方法返還給生產(chǎn)者。
概括來說,mandatory標(biāo)志告訴服務(wù)器至少將該消息route到一個(gè)隊(duì)列中,否則將消息返還給生產(chǎn)者;immediate標(biāo)志告訴服務(wù)器如果該消息關(guān)聯(lián)的queue上有消費(fèi)者,則馬上將消息投遞給它,如果所有queue都沒有消費(fèi)者,直接把消息返還給生產(chǎn)者,不用將消息入隊(duì)列等待消費(fèi)者了。
2. mandatory
在生產(chǎn)者通過channle的basicPublish方法發(fā)布消息時(shí),通常有幾個(gè)參數(shù)需要設(shè)置,為此我們有必要了解清楚這些參數(shù)代表的具體含義及其作用,查看channel接口,會(huì)發(fā)現(xiàn)存在3個(gè)重載的basicPublish方法:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;mandatory和immediate上面已經(jīng)解釋過了,其余的參數(shù)分別是:?
exchange:交換機(jī)名稱?
routingkey:路由鍵?
props:消息屬性字段,比如消息頭部信息等等?
body:消息主體部分
本節(jié)主要講述mandatory, 下面我們寫一個(gè)demo,在RabbitMQ broker中有:?
exchange : exchange.mandatory.test?
queue: queue.mandatory.test?
exchange路由到queue的routingkey是mandatory?
這里先不講當(dāng)前的exchange綁定到queue中,即:
詳細(xì)代碼如下:
package com.vms.test.zzh.rabbitmq.self;import com.rabbitmq.client.*;import java.io.IOException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; /** * Created by hidden on 2017/2/7. */ public class RBmandatoryTest { public static final String ip = "10.198.197.73"; public static final int port = 5672; public static final String username = "root"; public static final String password = "root"; public static final String queueName = "queue.mandatory.test"; public static final String exchangeName = "exchange.mandatory.test"; public static final String routingKey = "mandatory"; public static final Boolean mandatory = true; public static final Boolean immediate = false; public static void main(String[] args) { try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(ip); factory.setPort(port); factory.setUsername(username); factory.setPassword(password); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes()); // channel.close(); // connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }運(yùn)行,之后通過wireshark抓包工具可以看到如下圖所示:?
這里可以看到最后執(zhí)行了basic.return方法,將發(fā)布者發(fā)出的消息返回給了發(fā)布者,查看協(xié)議的arguments參數(shù)部分可以看到:reply-text字段值為NO_ROUTE,表示消息并沒有路由到合適的隊(duì)列中;
那么我們?cè)撛趺传@取到?jīng)]有被正確路由到合適隊(duì)列的消息呢?這時(shí)候可以通過為channel信道設(shè)置ReturnListener監(jiān)聽器來實(shí)現(xiàn),具體代碼(main函數(shù)部分):
try {ConnectionFactory factory = new ConnectionFactory();factory.setHost(ip);factory.setPort(port); factory.setUsername(username); factory.setPassword(password); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes()); channel.addReturnListener(new ReturnListener() { public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties basicProperties, byte[] body) throws IOException { String message = new String(body); System.out.println("Basic.return返回的結(jié)果是:"+message); } }); // channel.close(); // connection.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }運(yùn)行結(jié)果:
Basic.return返回的結(jié)果是:===mandatory===下面我們來看一下,設(shè)置mandatory標(biāo)志且exchange路由到queue中,代碼部分只需要將:
channel.basicPublish(exchangeName, "", mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());改為
channel.basicPublish(exchangeName, routingKey, mandatory, immediate, MessageProperties.PERSISTENT_TEXT_PLAIN, "===mandatory===".getBytes());即可。?
通過wireshark抓包如下:?
可以看到并不會(huì)有basic.return方法被調(diào)用。查看RabbitMQ管理界面發(fā)現(xiàn)消息已經(jīng)到達(dá)了隊(duì)列。
3. immediate
在RabbitMQ3.0以后的版本里,去掉了immediate參數(shù)的支持,發(fā)送帶immediate標(biāo)記的publish會(huì)返回如下錯(cuò)誤:?
“{amqp_error,not_implemented,”immediate=true”,’basic.publish’}”
為什么移除immediate標(biāo)記,參見如下版本變化描述:?
Removal of “immediate” flag?
What changed? We removed support for the rarely-used “immediate” flag on AMQP’s basic.publish.?
Why on earth did you do that? Support for “immediate” made many parts of the codebase more complex, particularly around mirrored queues. It also stood in the way of our being able to deliver substantial performance improvements in mirrored queues.?
What do I need to do? If you just want to be able to publish messages that will be dropped if they are not consumed immediately, you can publish to a queue with a TTL of 0.?
If you also need your publisher to be able to determine that this has happened, you can also use the DLX feature to route such messages to another queue, from which the publisher can consume them.?
這段解釋的大概意思是:immediate標(biāo)記會(huì)影響鏡像隊(duì)列性能,增加代碼復(fù)雜性,并建議采用“TTL”和“DLX”等方式替代。
出處:https://yq.aliyun.com/articles/238349?spm=5176.8091938.0.0.EfSZh4
http://www.mamicode.com/info-detail-1673003.html?spm=5176.100239.blogcont238349.8.FAbhIg
轉(zhuǎn)載于:https://www.cnblogs.com/wangzhongqiu/p/7832796.html
總結(jié)
以上是生活随笔為你收集整理的【RabbitMQ】8、RabbitMQ之mandatory和immediate的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 朋友梦到我怀孕了是什么意思周公解梦
- 下一篇: 梦到妈妈开车是什么预兆