RabbitMQ 快速入门
1.簡介
RabbitMQ是一個由erlang開發(fā)的AMQP(Advanced Message Queue protocol)的開源實現(xiàn)。AMQP高級消息隊列,說白了就是一個開源的消息中間件。它能解決不同組件、模塊、系統(tǒng)間消息通信。
2.系統(tǒng)架構
RabbitMQ Server: 也叫broker server,存儲消息的地方
Producer:數(shù)據(jù)的發(fā)送方
Consumer:數(shù)據(jù)的接收方
Connection: 就是一個TCP的連接。Producer和Consumer都是通過TCP連接到RabbitMQ Server的。以后我們可以看到,程序的起始處就是建立這個TCP連接。
Channels: 虛擬連接。它建立在上述的TCP連接中。數(shù)據(jù)流動都是在Channel中進行的。也就是說,一般情況是程序起始建立TCP連接,第二步就是建立這個Channel。
?那么,為什么使用Channel,而不是直接使用TCP連接?
對于OS來說,建立和關閉TCP連接是有代價的,頻繁的建立關閉TCP連接對于系統(tǒng)的性能有很大的影響,而且TCP的連接數(shù)也有限制,這也限制了系統(tǒng)處理高并發(fā)的能力。但是,在TCP連接中建立Channel是沒有上述代價的。對于Producer或者Consumer來說,可以并發(fā)的使用多個Channel進行Publish或者Receive。
Message:消息。服務器和應用程序之間傳遞的數(shù)據(jù),本質(zhì)上就是一段數(shù)據(jù),由Properties和Body組成。
Exchange:交換機。接收消息,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊列。
Binding:Exchange和Queue之間的虛擬連接,binding中可以包含routing key。
Routing key:一個虛擬地址,虛擬機可用它來確定如何路由一個特定消息。
Queue:也稱為Message Queue,消息隊列,保存消息并將它們轉(zhuǎn)發(fā)給消費者。
Virtual Host:其實是一個虛擬概念。類似于權限控制組,一個Virtual Host里面可以有若干個Exchange和Queue,可以用來隔離Exchange和Queue。,同一個Virtual Host里面不能有相同名稱的Exchange和Queue。但是權限控制的最小粒度是Virtual Host。(下面會講到)
3、圖解
? 1. 信息生產(chǎn)者將消息(message)發(fā)送到exchange
2. exchange接受消息之后,負責將其路由到具體的隊列中
3. Bindings負責連接exchange和隊列(queue)
4. 消息到達隊列(queue),然后等待被消息接收端處理
5. 消息接收端處理消息
Exchanges有三種類型:direct,?fanout,topic。 每個實現(xiàn)了不同的路由算法(routing algorithm)。
Direct exchange: 如果 routing key 匹配, 那么Message就會被傳遞到相應的queue中。其實在queue創(chuàng)建時,它會自動的以queue的名字作為routing key來綁定那個exchange。
Fanout exchange: 會向響應的queue廣播。
Topic exchange: 對key進行模式匹配,比如ab*可以傳遞到所有ab*的queue。
Consumer和Procuder都可以通過 queue.declare 創(chuàng)建queue。如果queue已經(jīng)存在,也不會報錯。如果沒有,要么發(fā)送不了消息,要么取不到消息,所以還是都創(chuàng)建吧。
Bindings就是將通過Exchange將queue和routing keys綁定。
總結:生產(chǎn)者將消息發(fā)送到Exchange交換機的,不是發(fā)送到Queue上的,生產(chǎn)者不知道消息是誰消費,有哪些消費者消費。Exchange根據(jù)一定的路由規(guī)則將消息轉(zhuǎn)發(fā)到Queue。
消費者是監(jiān)聽隊列的,不知道是哪個生產(chǎn)者發(fā)送的
4.具體代碼
添加maven 依賴
<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>3.6.5</version></dependency>消費者
package com.flying.rabbitmq.quickstart;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Consumer {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {//創(chuàng)建connectionFactory 并進行配置ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//2.通過連接工廠創(chuàng)建連接Connection connection=connectionFactory.newConnection();//3.通過connection 創(chuàng)建一個channel 通道Channel channel=connection.createChannel();//4 聲明(創(chuàng)建)一個隊列String queueName = "test001";channel.queueDeclare(queueName,true,false,false,null);//5.聲明一個消費者QueueingConsumer queueingConsumer=new QueueingConsumer(channel);//6.設置Channelchannel.basicConsume(queueName,true,queueingConsumer);while (true){//7.獲取消息QueueingConsumer.Delivery delivery=queueingConsumer.nextDelivery();String msg=new String(delivery.getBody());System.err.println("消費端"+msg);}} }?
生產(chǎn)端
package com.flying.rabbitmq.quickstart;import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;import java.io.IOException; import java.util.concurrent.TimeoutException;public class Producer {public static void main(String[] args) throws IOException, TimeoutException {//創(chuàng)建connectionFactory 并進行配置ConnectionFactory connectionFactory=new ConnectionFactory();connectionFactory.setHost("127.0.0.1");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//2.通過連接工廠創(chuàng)建連接Connection connection=connectionFactory.newConnection();//3.通過connection 創(chuàng)建一個channel 通道Channel channel=connection.createChannel();//4.通過Channel 發(fā)送數(shù)據(jù)for(int i=0; i < 5;i++){String msg="Hello rabbitmq";channel.basicPublish("","test001",null,msg.getBytes());}//5 記得要關閉相關的連接 channel.close();connectionFactory.clone();} }運行結果
?
轉(zhuǎn)載于:https://www.cnblogs.com/lflying/p/11107150.html
總結
以上是生活随笔為你收集整理的RabbitMQ 快速入门的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: mysql运维相关
- 下一篇: [转]iis部署php项目