消息队列rabitMq
rabbitmq
MQ全稱為Message Queue,?消息隊列(MQ)是一種應(yīng)用程序對應(yīng)用程序的通信方法。應(yīng)用程序通過讀寫出入隊列的消息(針對應(yīng)用程序的數(shù)據(jù))來通信,而無需專用連接來鏈接它們。消息傳遞指的是程序之間通過在消息中發(fā)送數(shù)據(jù)進行通信,而不是通過直接調(diào)用彼此來通信,直接調(diào)用通常是用于諸如遠程過程調(diào)用的技術(shù)。排隊指的是應(yīng)用程序通過 隊列來通信。隊列的使用除去了接收和發(fā)送應(yīng)用程序同時執(zhí)行的要求。
使用場景
在項目中,將一些無需即時返回且耗時的操作提取出來,進行了異步處理,而這種異步處理的方式大大的節(jié)省了服務(wù)器的請求響應(yīng)時間,從而提高了系統(tǒng)的吞吐量。
含義
RabbitMQ是一個在AMQP基礎(chǔ)上完成的,可復(fù)用的企業(yè)消息系統(tǒng)。他遵循Mozilla Public License開源協(xié)議。
客戶端
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] args) throws.IOException{ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");channel.close();connection.close();} }消費者端
public class RabbitMQRecv { public static void main(String avg[]) throws.IOException,java.lang.InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");QueueingConsumer consumer = new QueueingConsumer(channel);channel.basicConsume(QUEUE_NAME, true, consumer);
while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");}} }
幾個概念
Exchange:交換機,決定了消息路由規(guī)則; Queue:消息隊列; Channel:進行消息讀寫的通道; Bind:綁定了Queue和Exchange,意即為符合什么樣路由規(guī)則的消息,將會放置入哪一個消息隊列;RabbitMQ的結(jié)構(gòu)圖如下:
?
?
?
幾個概念說明: Broker:簡單來說就是消息隊列服務(wù)器實體。Exchange:消息交換機,它指定消息按什么規(guī)則,路由到哪個隊列。
Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。
Binding:綁定,它的作用就是把exchange和queue按照路由規(guī)則綁定起來。
Routing Key:路由關(guān)鍵字,exchange根據(jù)這個關(guān)鍵字進行消息投遞。
vhost:虛擬主機,一個broker里可以開設(shè)多個vhost,用作不同用戶的權(quán)限分離。
producer:消息生產(chǎn)者,就是投遞消息的程序。
consumer:消息消費者,就是接受消息的程序。
channel:消息通道,在客戶端的每個連接里,可建立多個channel,每個channel代表一個會話任務(wù)。 消息隊列的使用過程大概如下: (1)客戶端連接到消息隊列服務(wù)器,打開一個channel。
(2)客戶端聲明一個exchange,并設(shè)置相關(guān)屬性。
(3)客戶端聲明一個queue,并設(shè)置相關(guān)屬性。
(4)客戶端使用routing key,在exchange和queue之間建立好綁定關(guān)系。
(5)客戶端投遞消息到exchange。 exchange接收到消息后,就根據(jù)消息的key和已經(jīng)設(shè)置的binding,進行消息路由,將消息投遞到一個或多個隊列里。 exchange也有幾個類型,完全根據(jù)key進行投遞的叫做Direct交換機,例如,綁定時設(shè)置了routing key為”abc”,那么客戶端提交的消息,只有設(shè)置了key為”abc”的才會投遞到隊列。對key進行模式匹配后進行投遞的叫做Topic交換機,符號”#”匹配一個或多個詞,符號”*”匹配正好一個詞。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。還有一種不需要key的,叫做Fanout交換機,它采取廣播模式,一個消息進來時,投遞到與該交換機綁定的所有隊列。 RabbitMQ支持消息的持久化,也就是數(shù)據(jù)寫在磁盤上,為了數(shù)據(jù)安全考慮,我想大多數(shù)用戶都會選擇持久化。消息隊列持久化包括3個部分:
(1)exchange持久化,在聲明時指定durable => 1
(2)queue持久化,在聲明時指定durable => 1
(3)消息持久化,在投遞時指定delivery_mode => 2(1是非持久化) 如果exchange和queue都是持久化的,那么它們之間的binding也是持久化的。如果exchange和queue兩者之間有一個持久化,一個非持久化,就不允許建立綁定。
什么是MQ?
?????? MQ全稱為Message Queue,?消息隊列(MQ)是一種應(yīng)用程序?qū)?yīng)用程序的通信方法。MQ是消費-生產(chǎn)者模型的一個典型的代表,一端往消息隊列中不斷寫入消息,而另一端則可以讀取隊列中的消息。
????? RabbitMQ是MQ的一種。下面詳細介紹一下RabbitMQ的基本概念。
1、隊列、生產(chǎn)者、消費者
?? ?? 隊列是RabbitMQ的內(nèi)部對象,用于存儲消息。生產(chǎn)者(下圖中的P)生產(chǎn)消息并投遞到隊列中,消費者(下圖中的C)可以從隊列中獲取消息并消費。
?????
????? 多個消費者可以訂閱同一個隊列,這時隊列中的消息會被平均分攤給多個消費者進行處理,而不是每個消費者都收到所有的消息并處理。
?????
2、Exchange、Binding
????? 剛才我們看到生產(chǎn)者將消息投遞到隊列中,實際上這在RabbitMQ中這種事情永遠都不會發(fā)生。實際的情況是,生產(chǎn)者將消息發(fā)送到Exchange(交換器,下圖中的X),再通過Binding將Exchange與Queue關(guān)聯(lián)起來。
?????3、Exchange Type、Bingding key、routing key
????? 在綁定(Binding)Exchange與Queue的同時,一般會指定一個binding key。在綁定多個Queue到同一個Exchange的時候,這些Binding允許使用相同的binding key。
????? 生產(chǎn)者在將消息發(fā)送給Exchange的時候,一般會指定一個routing key,來指定這個消息的路由規(guī)則,生產(chǎn)者就可以在發(fā)送消息給Exchange時,通過指定routing key來決定消息流向哪里。
????? RabbitMQ常用的Exchange Type有三種:fanout、direct、topic。
????? fanout:把所有發(fā)送到該Exchange的消息投遞到所有與它綁定的隊列中。
????? direct:把消息投遞到那些binding key與routing key完全匹配的隊列中。
????? topic:將消息路由到binding key與routing key模式匹配的隊列中。
????? 附上一張RabbitMQ的結(jié)構(gòu)圖:
?????
????
最后來具體解析一下幾個問題:
1、可以自動創(chuàng)建隊列,也可以手動創(chuàng)建隊列,如果自動創(chuàng)建隊列,那么是誰負責創(chuàng)建隊列呢?是生產(chǎn)者?還是消費者??
????? 如果隊列不存在,當然消費者不會收到任何的消息。但是如果隊列不存在,那么生產(chǎn)者發(fā)送的消息就會丟失。所以,為了數(shù)據(jù)不丟失,消費者和生產(chǎn)者都可以創(chuàng)建隊列。那么如果創(chuàng)建一個已經(jīng)存在的隊列呢?那么不會有任何的影響。需要注意的是沒有任何的影響,也就是說第二次創(chuàng)建如果參數(shù)和第一次不一樣,那么該操作雖然成功,但是隊列屬性并不會改變。
??????隊列對于負載均衡的處理是完美的。對于多個消費者來說,RabbitMQ使用輪詢的方式均衡的發(fā)送給不同的消費者。
2、RabbitMQ的消息確認機制
????? 默認情況下,如果消息已經(jīng)被某個消費者正確的接收到了,那么該消息就會被從隊列中移除。當然也可以讓同一個消息發(fā)送到很多的消費者。
?? ?? 如果一個隊列沒有消費者,那么,如果這個隊列有數(shù)據(jù)到達,那么這個數(shù)據(jù)會被緩存,不會被丟棄。當有消費者時,這個數(shù)據(jù)會被立即發(fā)送到這個消費者,這個數(shù)據(jù)被消費者正確收到時,這個數(shù)據(jù)就被從隊列中刪除。
???? 那么什么是正確收到呢?通過ack。每個消息都要被acknowledged(確認,ack)。我們可以顯示的在程序中去ack,也可以自動的ack。如果有數(shù)據(jù)沒有被ack,那么:
???? RabbitMQ Server會把這個信息發(fā)送到下一個消費者。
?? ? 如果這個app有bug,忘記了ack,那么RabbitMQServer不會再發(fā)送數(shù)據(jù)給它,因為Server認為這個消費者處理能力有限。
??? 而且ack的機制可以起到限流的作用(Benefitto throttling):在消費者處理完成數(shù)據(jù)后發(fā)送ack,甚至在額外的延時后發(fā)送ack,將有效的均衡消費者的負載。
?
?二:代碼示例
2.1:首先引入rabbitMQ jar包
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version></dependency>2.2:創(chuàng)建消費者Producer
/*** 消息生成者*/ public class Producer {public final static String QUEUE_NAME="rabbitMQ.test";public static void main(String[] args) throws IOException, TimeoutException {//創(chuàng)建連接工廠ConnectionFactory factory = new ConnectionFactory();//設(shè)置RabbitMQ相關(guān)信息factory.setHost("localhost");//factory.setUsername("lp");//factory.setPassword("");// factory.setPort(2088);//創(chuàng)建一個新的連接Connection connection = factory.newConnection();//創(chuàng)建一個通道Channel channel = connection.createChannel();// 聲明一個隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello RabbitMQ";//發(fā)送消息到隊列中channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));System.out.println("Producer Send +'" + message + "'");//關(guān)閉通道和連接 channel.close();connection.close();} }注1:queueDeclare第一個參數(shù)表示隊列名稱、第二個參數(shù)為是否持久化(true表示是,隊列將在服務(wù)器重啟時生存)、第三個參數(shù)為是否是獨占隊列(創(chuàng)建者可以使用的私有隊列,斷開后自動刪除)、第四個參數(shù)為當所有消費者客戶端連接斷開時是否自動刪除隊列、第五個參數(shù)為隊列的其他參數(shù)
注2:basicPublish第一個參數(shù)為交換機名稱、第二個參數(shù)為隊列映射的路由key、第三個參數(shù)為消息的其他屬性、第四個參數(shù)為發(fā)送信息的主體
2.3:創(chuàng)建消費者
?
public class Customer {private final static String QUEUE_NAME = "rabbitMQ.test";public static void main(String[] args) throws IOException, TimeoutException {// 創(chuàng)建連接工廠ConnectionFactory factory = new ConnectionFactory();//設(shè)置RabbitMQ地址factory.setHost("localhost");//創(chuàng)建一個新的連接Connection connection = factory.newConnection();//創(chuàng)建一個通道Channel channel = connection.createChannel();//聲明要關(guān)注的隊列channel.queueDeclare(QUEUE_NAME, false, false, true, null);System.out.println("Customer Waiting Received messages");//DefaultConsumer類實現(xiàn)了Consumer接口,通過傳入一個頻道,// 告訴服務(wù)器我們需要那個頻道的消息,如果頻道中有消息,就會執(zhí)行回調(diào)函數(shù)handleDeliveryConsumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body)throws IOException {String message = new String(body, "UTF-8");System.out.println("Customer Received '" + message + "'");}};//自動回復(fù)隊列應(yīng)答 -- RabbitMQ中的消息確認機制channel.basicConsume(QUEUE_NAME, true, consumer);}前面代碼我們可以看出和生成者一樣的,后面的是獲取生產(chǎn)者發(fā)送的信息,其中envelope主要存放生產(chǎn)者相關(guān)信息(比如交換機、路由key等)body是消息實體。
2.4:運行結(jié)果
生產(chǎn)者:
?
消費者:
?三:實現(xiàn)任務(wù)分發(fā)
工作隊列
一個隊列的優(yōu)點就是很容易處理并行化的工作能力,但是如果我們積累了大量的工作,我們就需要更多的工作者來處理,這里就要采用分布機制了。
我們新創(chuàng)建一個生產(chǎn)者NewTask
public class NewTask {private static final String TASK_QUEUE_NAME="task_queue";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory=new ConnectionFactory();factory.setHost("localhost");Connection connection=factory.newConnection();Channel channel=connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME,true,false,false,null);//分發(fā)信息for (int i=0;i<10;i++){String message="Hello RabbitMQ"+i;channel.basicPublish("",TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());System.out.println("NewTask send '"+message+"'");}channel.close();connection.close();} }然后創(chuàng)建2個工作者Work1和Work2代碼一樣
public class Work1 {private static final String TASK_QUEUE_NAME = "task_queue";public static void main(String[] args) throws IOException, TimeoutException {final ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();final Channel channel = connection.createChannel();channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println("Worker1 Waiting for messages");//每次從隊列獲取的數(shù)量channel.basicQos(1);final Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body) throws IOException {String message = new String(body, "UTF-8");System.out.println("Worker1 Received '" + message + "'");try {throw new Exception();//doWork(message);}catch (Exception e){channel.abort();}finally {System.out.println("Worker1 Done");channel.basicAck(envelope.getDeliveryTag(),false);}}};boolean autoAck=false;//消息消費完成確認channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);}private static void doWork(String task) {try {Thread.sleep(1000); // 暫停1秒鐘} catch (InterruptedException _ignored) {Thread.currentThread().interrupt();}} }注:channel.basicQos(1);保證一次只分發(fā)一個 。autoAck是否自動回復(fù),如果為true的話,每次生產(chǎn)者只要發(fā)送信息就會從內(nèi)存中刪除,那么如果消費者程序異常退出,那么就無法獲取數(shù)據(jù),我們當然是不希望出現(xiàn)這樣的情況,所以才去手動回復(fù),每當消費者收到并處理信息然后在通知生成者。最后從隊列中刪除這條信息。如果消費者異常退出,如果還有其他消費者,那么就會把隊列中的消息發(fā)送給其他消費者,如果沒有,等消費者啟動時候再次發(fā)送。
?
?
參考:https://www.cnblogs.com/LipeiNet/p/5977028.html
轉(zhuǎn)載于:https://www.cnblogs.com/UncleWang001/p/9734651.html
總結(jié)
以上是生活随笔為你收集整理的消息队列rabitMq的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: T1330最少步数(#Ⅱ- 8)(广度优
- 下一篇: 转MQTT SERVER 性能测试报告