RabbitMQ消息发送和接收
1.RabbitMQ的消息發(fā)送和接受機制
所有 MQ 產(chǎn)品從模型抽象上來說都是一樣的過程:
消費者(consumer)訂閱某個隊列。生產(chǎn)者(producer)創(chuàng)建消息,然后發(fā)布到隊列(queue)中,最后將消息發(fā)送到監(jiān)聽的消費者。
上面是MQ的基本抽象模型,但是不同的MQ產(chǎn)品有有者不同的機制,RabbitMQ實際基于AMQP協(xié)議的一個開源實現(xiàn),因此RabbitMQ內(nèi)部也是AMQP的基本概念。
RabbitMQ的內(nèi)部接收如下:
1、Message
消息,消息是不具體的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優(yōu)先權(quán))、delivery-mode(指出該消息可能需要持久性存儲)等。
2、Publisher
消息的生產(chǎn)者,也是一個向交換器發(fā)布消息的客戶端應(yīng)用程序。
3、Exchange
交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊列。
4、Binding
綁定,用于消息隊列和交換器之間的關(guān)聯(lián)。一個綁定就是基于路由鍵將交換器和消息隊列連接起來的路由規(guī)則,所以可以將交換器理解成一個由綁定構(gòu)成的路由表。
5、Queue
消息隊列,用來保存消息直到發(fā)送給消費者。它是消息的容器,也是消息的終點。一個消息可投入一個或多個隊列。消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
6、Connection
網(wǎng)絡(luò)連接,比如一個TCP連接。
7、Channel
信道,多路復(fù)用連接中的一條獨立的雙向數(shù)據(jù)流通道。信道是建立在真實的TCP連接內(nèi)地虛擬連接,AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊列還是接收消息,這些動作都是通過信道完成。因為對于操作系統(tǒng)來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復(fù)用一條 TCP 連接。
8、Consumer
消息的消費者,表示一個從消息隊列中取得消息的客戶端應(yīng)用程序。
9、Virtual Host
虛擬主機,表示一批交換器、消息隊列和相關(guān)對象。虛擬主機是共享相同的身份認證和加密環(huán)境的獨立服務(wù)器域。每個 vhost 本質(zhì)上就是一個 mini 版的 RabbitMQ 服務(wù)器,擁有自己的隊列、交換器、綁定和權(quán)限機制。vhost 是 AMQP 概念的基礎(chǔ),必須在連接時指定,RabbitMQ 默認的 vhost 是 / 。
10、Broker
表示消息隊列服務(wù)器實體。
2.AMQP中的消息路由
AMQP 中消息的路由過程和 Java 開發(fā)者熟悉的 JMS 存在一些差別,AMQP 中增加了 Exchange 和 Binding 的角色。生產(chǎn)者把消息發(fā)布到 Exchange 上,消息最終到達隊列并被消費者接收,而 Binding 決定交換器的消息應(yīng)該發(fā)送到那個隊列
3.Exchange類型
Exchange分發(fā)消息時根據(jù)類型的不同分發(fā)策略有區(qū)別,目前共四種類型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由鍵,此外 headers 交換器和 direct 交換器完全一致,但性能差很多,目前幾乎用不到了,所以直接看另外三種類型
1、direct
消息中的路由鍵(routing key)如果和 Binding 中的 binding key 一致, 交換器就將消息發(fā)到對應(yīng)的隊列中。路由鍵與隊列名完全匹配,如果一個隊列綁定到交換機要求路由鍵為“dog”,則只轉(zhuǎn)發(fā) routing key 標記為“dog”的消息,不會轉(zhuǎn)發(fā)“dog.puppy”,也不會轉(zhuǎn)發(fā)“dog.guard”等等。它是完全匹配、單播的模式。
2、fanout
每個發(fā)到 fanout 類型交換器的消息都會分到所有綁定的隊列上去。fanout 交換器不處理路由鍵,只是簡單的將隊列綁定到交換器上,每個發(fā)送到交換器的消息都會被轉(zhuǎn)發(fā)到與該交換器綁定的所有隊列上。很像子網(wǎng)廣播,每臺子網(wǎng)內(nèi)的主機都獲得了一份復(fù)制的消息。fanout 類型轉(zhuǎn)發(fā)消息是最快的。
3、topic
topic 交換器通過模式匹配分配消息的路由鍵屬性,將路由鍵和某個模式進行匹配,此時隊列需要綁定到一個模式上。它將路由鍵和綁定鍵的字符串切分成單詞,這些單詞之間用點隔開。它同樣也會識別兩個通配符:符號“#”和符號“”。#匹配0個或多個單詞,“”匹配不多不少一個單詞。
4.Java發(fā)送和接收Queue的消息
4.1創(chuàng)建Maven工程01-rabbitmq-send-java添加Maven依賴
<dependencies> <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.1.1</version> </dependency> </dependencies>4.2 編寫消息發(fā)送類
在01-rabbitmq-send-java項目中創(chuàng)建,com.xxxx.rabbitmq.queue.Send類
以運行Send類觀看管控臺的變化
4.3 創(chuàng)建Maven工程01-rabbitmq-receive-java添加Maven依賴
<dependencies> <dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.1.1</version> </dependency> </dependencies>4.4 編寫消息接收類
在01-rabbitmq-receive-java項目中創(chuàng)建,com.xxxx.rabbitmq.queue.Receive類
注意:
1、Queue的消息只能被同一個消費者消費,如果沒有消費監(jiān)聽隊列那么消息會存放到隊列中持久化保存,直到有消費者來消費這個消息,如果以有消費者監(jiān)聽隊列則立即消費發(fā)送到隊列中的消息
2、Queue的消息可以保證每個消息都一定能被消費
5.Java綁定Exchange發(fā)送和接收消息
AMQP 協(xié)議中的核心思想就是生產(chǎn)者和消費者的解耦,生產(chǎn)者從不直接將消息發(fā)送給隊列。生產(chǎn)者通常不知道是否一個消息會被發(fā)送到隊列中,只是將消息發(fā)送到一個交換機。先由 Exchange 來接收,然后 Exchange 按照特定的策略轉(zhuǎn)發(fā)到 Queue 進行存儲。Exchange 就類似于一個交換機,將各個消息分發(fā)到相應(yīng)的隊列中。
在實際應(yīng)用中我們只需要定義好 Exchange 的路由策略,而生產(chǎn)者則不需要關(guān)心消息會發(fā)送到哪個 Queue 或被哪些 Consumer 消費。在這種模式下生產(chǎn)者只面向 Exchange 發(fā)布消息,消費者只面向 Queue 消費消息,Exchange 定義了消息路由到 Queue 的規(guī)則,將各個層面的消息傳遞隔離開,使每一層只需要關(guān)心自己面向的下一層,降低了整體的耦合度。
5.1 Exchange的direct消息綁定
5.1.1 編寫direct消息發(fā)送類
在01-rabbitmq-send-java項目中創(chuàng)建,com.xxxx.rabbitmq.direct.Send類
注意:使用direct消息模式時必須要指定RoutingKey(路由鍵),將指定的消息綁定到指定的路由鍵上
5.1.2 編寫direct消息接收類
在01-rabbitmq-Receive-java項目中創(chuàng)建,com.xxxx.rabbitmq.direct.Receive類
注意:
1、使用Exchange的direct模式時接收者的RoutingKey必須要與發(fā)送時的RoutingKey完全一致否則無法獲取消息
2、接收消息時隊列名也必須要發(fā)送消息時的完全一致
5.2 Exchange的fanout消息綁定
5.2.1 編寫fanout消息發(fā)送類
在01-rabbitmq-send-java項目中創(chuàng)建,com.xxxx.rabbitmq.fanout.Send類
注意:
fanout模式的消息需要將一個消息同時綁定到多個隊列中因此這里不能創(chuàng)建并指定某個隊列
5.2.2 編寫fanout消息接收類
在01-rabbitmq-receive-java項目中創(chuàng)建,com.xxxx.rabbitmq.fanout.Receive類
public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setUsername("root");factory.setPassword("root");factory.setHost("192.168.222.128");//建立到代理服務(wù)器到連接Connection conn = factory.newConnection();//獲得信道final Channel channel = conn.createChannel();//聲明交換器String exchangeName = "myExchangeFanout";channel.exchangeDeclare(exchangeName, "fanout", true);//聲明隊列String queueName = channel.queueDeclare().getQueue();String routingKey = "";//綁定隊列,通過鍵 hola 將隊列和交換器綁定起來channel.queueBind(queueName, exchangeName, routingKey);//消費消息boolean autoAck = true;String consumerTag = "";//接收消息//參數(shù)1 隊列名稱//參數(shù)2 是否自動確認消息 true表示自動確認 false表示手動確認//參數(shù)3 為消息標簽 用來區(qū)分不同的消費者這列暫時為""// 參數(shù)4 消費者回調(diào)方法用于編寫處理消息的具體代碼(例如打印或?qū)⑾懭霐?shù)據(jù)庫)System.out.println(queueName);channel.basicConsume(queueName,autoAck,consumerTag,new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//獲取消息數(shù)String bodyStr = new String(body, "UTF-8");System.out.println(bodyStr);}});}注意:
1、使用fanout模式獲取消息時不需要綁定特定的隊列名稱,只需使用channel.queueDeclare().getQueue();獲取一個隨機的隊列名稱,然后綁定到指定的Exchange即可獲取消息。
2、這種模式中可以同時啟動多個接收者只要都綁定到同一個Exchang即可讓所有接收者同時接收同一個消息是一種廣播的消息機制
5.3 Exchange的topic消息綁定
5.3.1編寫topic消息發(fā)送類
在01-rabbitmq-send-java項目中創(chuàng)建,com.xxxx.rabbitmq.topic.Send類
public static void main(String[] args) throws IOException, TimeoutException {//創(chuàng)建鏈接工廠對象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//設(shè)置RabbitMQ的主機IPfactory.setPort(5672);//設(shè)置RabbitMQ的端口號factory.setUsername("root");//設(shè)置訪問用戶名factory.setPassword("root");//設(shè)置訪問密碼Connection connection=null;//定義鏈接對象Channel channel=null;//定義通道對象connection=factory.newConnection();//實例化鏈接對象channel=connection.createChannel();//實例化通道對象String message ="Hello World!";String exchangeName="myExchangeTopic";//指定Exchange的類型//參數(shù)1為 交換機名稱//參數(shù)2為交換機類型取值為 direct、queue、topic、headers//參數(shù)3 為是否為持久化消息 true表示持久化消息 false表示非持久化channel.exchangeDeclare(exchangeName, "topic", true);//發(fā)送消息到RabbitMQ//參數(shù)1 我們自定義的交換機名稱//參數(shù)2 自定義的RoutingKey值//參數(shù)3 設(shè)置消息的屬性,可以通過消息屬性設(shè)置消息是否是持久化的//參數(shù)4 具體要發(fā)送的消息信息channel.basicPublish(exchangeName,"test.myRoutingKey",null,message.getBytes("UTF-8"));System.out.println("消息發(fā)送成功: "+message);channel.close();connection.close(); }注意:
1、在topic模式中必須要指定Routingkey,并且可以同時指定多層的RoutingKey,每個層次之間使用 點分隔即可 例如test.myRoutingKey
5.3.2編寫topic的消息接收類
在01-rabbitmq-receive-java項目中創(chuàng)建,com.xxxx.rabbitmq.topic.Receive類
public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setUsername("root");factory.setPassword("root");factory.setHost("192.168.222.128");//建立到代理服務(wù)器到連接Connection conn = factory.newConnection();//獲得信道final Channel channel = conn.createChannel();//聲明交換器String exchangeName = "myExchangeTopic";channel.exchangeDeclare(exchangeName, "topic", true);//聲明隊列String queueName = channel.queueDeclare().getQueue();String routingKey = "test.#";//綁定隊列,通過鍵 將隊列和交換器綁定起來channel.queueBind(queueName, exchangeName, routingKey);//消費消息boolean autoAck = true;String consumerTag = "";//接收消息//參數(shù)1 隊列名稱//參數(shù)2 是否自動確認消息 true表示自動確認 false表示手動確認//參數(shù)3 為消息標簽 用來區(qū)分不同的消費者這列暫時為""// 參數(shù)4 消費者回調(diào)方法用于編寫處理消息的具體代碼(例如打印或?qū)⑾懭霐?shù)據(jù)庫)channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//獲取消息數(shù)據(jù)String bodyStr = new String(body, "UTF-8");System.out.println("test.#----"+bodyStr);}}); }注意:
1、Topic模式的消息接收時必須要指定RoutingKey并且可以使用# 和 *來做統(tǒng)配符號,#表示通配任意一個單詞 *表示通配任意多個單詞,例如消費者的RoutingKey為test.#或#.myRoutingKey都可以獲取RoutingKey為test.myRoutingKey發(fā)送者發(fā)送的消息
5.4 事務(wù)消息
事務(wù)消息與數(shù)據(jù)庫的事務(wù)類似,只是MQ中的消息是要保證消息是否會全部發(fā)送成功,防止丟失消息的一種策略。
RabbitMQ有兩種方式來解決這個問題:
5.4.1 事務(wù)使用
事務(wù)的實現(xiàn)主要是對信道(Channel)的設(shè)置,主要的方法有三個:
5.4.2 編寫消息發(fā)送類
在01-rabbitmq-send-java項目中創(chuàng)建,com.xxxx.rabbitmq.transaction.Send類
public class Send{public static void main(String[] args) throws IOException, TimeoutException {//創(chuàng)建鏈接工廠對象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.171.143");//設(shè)置RabbitMQ的主機IPfactory.setPort(5672);//設(shè)置RabbitMQ的端口號factory.setUsername("root");//設(shè)置訪問用戶名factory.setPassword("root");//設(shè)置訪問密碼Connection connection=null;//定義鏈接對象Channel channel=null;//定義通道對象connection=factory.newConnection();//實例化鏈接對象channel=connection.createChannel();//實例化通道對象String message ="Hello World!"; String exchangeName="myExchangeTransaction"; //指定Exchange的類型 //參數(shù)1為 交換機名稱 //參數(shù)2為交換機類型取值為 direct、fanout、topic、headers //參數(shù)3 為是否為持久化消息 true表示持久化消息 false表示非持久化 channel.exchangeDeclare(exchangeName, "direct", true);// 聲明事務(wù) channel.txSelect();//發(fā)送消息到RabbitMQ//參數(shù)1 我們自定義的交換機名稱//參數(shù)2 自定義的RoutingKey值//參數(shù)3 設(shè)置消息的屬性,可以通過消息屬性設(shè)置消息是否是持久化的//參數(shù)4 具體要發(fā)送的消息信息channel.basicPublish(exchangeName,"myRoutingKeyTransaction",null,message.getBytes("UTF-8")); // 提交事務(wù)channel.txCommit();System.out.println("消息發(fā)送成功: "+message);channel.close();connection.close();} }5.1.3編寫消息接收類
在01-rabbitmq-receive-java項目中創(chuàng)建,com.xxxx.rabbitmq.transaction.Receive類
public class Receive{public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setUsername("root");factory.setPassword("root");factory.setHost("192.168.171.143");//建立到代理服務(wù)器到連接Connection conn = factory.newConnection();//獲得信道final Channel channel = conn.createChannel();//聲明交換器String exchangeName = "myExchangeTransaction";channel.exchangeDeclare(exchangeName, "direct", true);//聲明隊列String queueName = channel.queueDeclare().getQueue();String routingKey = "myRoutingKeyTransaction";//綁定隊列,通過鍵 hola 將隊列和交換器綁定起來channel.queueBind(queueName, exchangeName, routingKey);//消費消息boolean autoAck = true;String consumerTag = "";//接收消息 //參數(shù)1 隊列名稱//參數(shù)2 是否自動確認消息 true表示自動確認 false表示手動確認//參數(shù)3 為消息標簽 用來區(qū)分不同的消費者這列暫時為""// 參數(shù)4 消費者回調(diào)方法用于編寫處理消息的具體代碼(例如打印或?qū)⑾懭霐?shù)據(jù)庫) channel.basicConsume(queueName, autoAck, consumerTag, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {//獲取消息數(shù)據(jù)String bodyStr = new String(body, "UTF-8");System.out.println(bodyStr);}}); channel.close();conn.close();} }5.5 消息的發(fā)送者確認模式
Confirm發(fā)送方確認模式使用和事務(wù)類似,也是通過設(shè)置Channel進行發(fā)送方確認的,最終達到確保所有的消息全部發(fā)送成功
Confirm的三種實現(xiàn)方式:
方式一:channel.waitForConfirms()普通發(fā)送方確認模式;
public class Send {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//創(chuàng)建鏈接工廠對象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//設(shè)置RabbitMQ的主機IPfactory.setPort(5672);//設(shè)置RabbitMQ的端口號factory.setUsername("root");//設(shè)置訪問用戶名factory.setPassword("root");//設(shè)置訪問密碼Connection connection=null;//定義鏈接對象Channel channel=null;//定義通道對象connection=factory.newConnection();//實例化鏈接對象channel=connection.createChannel();//實例化通道對象String message ="Hello World!3";//創(chuàng)建隊列 ,名字為myQueuechannel.queueDeclare("myQueue", true, false, false, null);// 開啟發(fā)送方確認模式channel.confirmSelect();long time=System.currentTimeMillis();//發(fā)送消息到指定隊列for(int i=0;i<10000;i++){message="Hello World!"+i;channel.basicPublish("","myQueue",null,message.getBytes("UTF-8"));}channel.waitForConfirms();System.out.println(System.currentTimeMillis()-time);System.out.println("消息發(fā)送成功: "+message);channel.close();connection.close();} }方式二:channel.waitForConfirmsOrDie()批量確認模式;
public class Send {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//創(chuàng)建鏈接工廠對象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//設(shè)置RabbitMQ的主機IPfactory.setPort(5672);//設(shè)置RabbitMQ的端口號factory.setUsername("root");//設(shè)置訪問用戶名factory.setPassword("root");//設(shè)置訪問密碼Connection connection=null;//定義鏈接對象Channel channel=null;//定義通道對象connection=factory.newConnection();//實例化鏈接對象channel=connection.createChannel();//實例化通道對象String message ="Hello World!3";//創(chuàng)建隊列 ,名字為myQueuechannel.queueDeclare("myQueue", true, false, false, null);// 開啟發(fā)送方確認模式channel.confirmSelect();long time=System.currentTimeMillis();//發(fā)送消息到指定隊列for(int i=0;i<10000;i++){message="Hello World!"+i;channel.basicPublish("","myQueue",null,message.getBytes("UTF-8"));}channel.waitForConfirmsOrDie();System.out.println(System.currentTimeMillis()-time);System.out.println("消息發(fā)送成功: "+message);channel.close();connection.close();} }方式三:channel.addConfirmListener()異步監(jiān)聽發(fā)送方確認模式
public class Send {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//創(chuàng)建鏈接工廠對象ConnectionFactory factory=new ConnectionFactory();factory.setHost("192.168.222.128");//設(shè)置RabbitMQ的主機IPfactory.setPort(5672);//設(shè)置RabbitMQ的端口號factory.setUsername("root");//設(shè)置訪問用戶名factory.setPassword("root");//設(shè)置訪問密碼Connection connection=null;//定義鏈接對象Channel channel=null;//定義通道對象connection=factory.newConnection();//實例化鏈接對象channel=connection.createChannel();//實例化通道對象String message ="Hello World!3";//創(chuàng)建隊列 ,名字為myQueuechannel.queueDeclare("myQueue", true, false, false, null);// 開啟發(fā)送方確認模式channel.confirmSelect();long time=System.currentTimeMillis();//發(fā)送消息到指定隊列for(int i=0;i<10000;i++){message="Hello World!"+i;channel.basicPublish("","myQueue",null,message.getBytes("UTF-8"));}channel.addConfirmListener(new ConfirmListener() {public void handleAck(long deliveryTag, boolean multiple) throws IOException {System.out.println("未確認消息,標識:" + deliveryTag+"----"+multiple);}public void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("已確認消息,標識:"+deliveryTag+" ---多個消息:"+multiple);}});System.out.println(System.currentTimeMillis()-time);System.out.println("消息發(fā)送成功: "+message);channel.close();connection.close();} }5.6 消息的消費者確認模式
為了保證消息從隊列可靠地到達消費者,RabbitMQ提供消息確認機制(message acknowledgment)。消費者在聲明隊列時,可以指定noAck參數(shù),當(dāng)noAck=false時,RabbitMQ會等待消費者顯式發(fā)回ack信號后才從內(nèi)存(和磁盤,如果是持久化消息的話)中移去消息。否則,RabbitMQ會在隊列中消息被消費后立即刪除它。
在Consumer中Confirm模式中分為手動確認和自動確認。
手動確認主要并使用以下方法:
basicAck(): 用于肯定確認,multiple參數(shù)用于多個消息確認。
basicRecover():是路由不成功的消息可以使用recovery重新發(fā)送到隊列中。
basicReject():是接收端告訴服務(wù)器這個消息我拒絕接收,不處理,可以設(shè)置是否放回到隊列中還是丟掉,而且只能一次拒絕一個消息,官網(wǎng)中有明確說明不能批量拒絕消息,為解決批量拒絕消息才有了basicNack。
basicNack():可以一次拒絕N條消息,客戶端可以設(shè)置basicNack方法的multiple參數(shù)為true。
在01-rabbitmq-send-java項目中創(chuàng)建,
com.xxxx.rabbitmq.ack.Send類
在01-rabbitmq-receive-java項目中創(chuàng)建,
com.xxxx.rabbitmq.ack.Receive類
注意:
1、如果開啟了事務(wù)手動提交以后再開始事務(wù),如果事務(wù)執(zhí)行了回滾操作那么即使手動確認了消息那么消息也不會從隊列中移除,除非使用事務(wù)執(zhí)行提交以后才會移除。
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ消息发送和接收的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 面试和学习必备--Java多线程
- 下一篇: loader调用过程