2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
一、起航
? ? ? ?本章節(jié),柯南君將從幾個(gè)層面,用官網(wǎng)例子講解一下RabbitMQ的實(shí)操經(jīng)典程序案例,讓大家重新回到經(jīng)典“Hello world!”(The simplest thing that does something )時(shí)代,RabbitMQ 支持N多種客戶端(client),這里無法一一講解,暫定java client,有時(shí)間的情況下,在彌補(bǔ)一下。
事先,先普及一下圖標(biāo)(我們會(huì)在下面的事例中,會(huì)大量用到,所以先普及一下,便于識(shí)別,最終更好理解事例的含義)
1、圖標(biāo)概念
① producting(生產(chǎn)者):在程序中 發(fā)送消息的一端,我們暫且稱之為 生產(chǎn)者,在這里用“p”表示
②?queue(隊(duì)列):隊(duì)列是一個(gè)郵箱的名字。它住在RabbitMQ。盡管消息流經(jīng)RabbitMQ和您的應(yīng)用程序,他們只可以存儲(chǔ)在一個(gè)隊(duì)列中。隊(duì)列是不受任何限制,它可以儲(chǔ)存盡可能多的信息(按你興趣來了),它本質(zhì)上是一個(gè)無限緩沖區(qū)。許多生產(chǎn)商可以發(fā)送消息到一個(gè)隊(duì)列,許多消費(fèi)者可以嘗試接收數(shù)據(jù)從一個(gè)隊(duì)列。
③?consuming(消費(fèi)者):消費(fèi)者和生產(chǎn)者是對(duì)應(yīng)的,較為相似的意思;在這里,我用“C”表示
2、The Java client library
RabbitMQ 中 AMQP這是一個(gè)開放的、通用的協(xié)議消息。有許多客戶AMQP在許多不同的語言。我們將使用提供的Java客戶機(jī)RabbitMQ。 ?
我們需要下載(Download) client library package,并要核實(shí)每個(gè)jar包,解壓到相應(yīng)位置,如下圖所示:
第一步:點(diǎn)擊?http://www.rabbitmq.com/java-client.html,然后找到相應(yīng)的lib下載位置
第二步:選擇合適的下載,比如我下載了zip包,如圖所示:
第三步:Unzip it(解壓它) 到你的working directory(工作目錄)中 and grab (并且獲得)你的jar包文件
$ unzip rabbitmq-java-client-bin-*.zip $ cp rabbitmq-java-client-bin-*/*.jar ./
二、程序案例
1)?"Hello World"?
在這部分教程中我們將用Java寫兩個(gè)程序,發(fā)送一個(gè)消息的生產(chǎn)者,消費(fèi)者接收信息并打印出來。我們會(huì)掩蓋一些細(xì)節(jié)的Java API,集中在這個(gè)非常簡(jiǎn)單的東西開始。這是一個(gè)“Hello World”的消息。在下面的圖中,“P”是我們的生產(chǎn)和“C”是我們的消費(fèi)者。中間的框是一個(gè)隊(duì)列,消息緩沖區(qū)RabbitMQ保持代表消費(fèi)者。
?① sending (發(fā)送)
首先?讓The sender(消息發(fā)送者)發(fā)送消息并且讓我們的receiver (消息接收者)接收消息,The sender(消息發(fā)送者)將會(huì)connect to(連接)RabbitMQ,發(fā)送一個(gè)single message(單一的信息),然后exit(退出)。
- 在send.java 中,我們需要import一些class ,如下所示:
import com.rabbitmq.client.ConnectionFactory
;
import com.rabbitmq.client.Connection
;
import com.rabbitmq.client.Channel
; - set up(設(shè)置)類和queue的name
public
class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] argv)throws java.io.IOException {...}
} - then 我們create 一個(gè)connection (連接)到server(服務(wù)端)
onnectionFactory
factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel(); 備注 : - ?這個(gè)connection 是抽象的socket connection鏈接;
- ?負(fù)責(zé)協(xié)議版本(protocol version negotigation)和身份認(rèn)證(authentication?);
- 我們?cè)诒镜貦C(jī)器上連接到一個(gè)代理即 localhost ,如果我們想要連接到代理不同機(jī)器上我們簡(jiǎn)單的指定其名稱或者IP地址即可;
接下來,我們創(chuàng)建一個(gè)channel(通道),這個(gè)通道匯集了大多數(shù)的API服務(wù)! 為了發(fā)送,我們必須先聲明一個(gè)為我們發(fā)送queue(隊(duì)列),然后,往queue里發(fā)送一個(gè)message channel
.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'"); 消息內(nèi)容是一個(gè)字節(jié)數(shù)組,所以你可以編碼任何你喜歡的。最后,我們關(guān)閉通道和連接; channel
.close();connection.close(); 問題 : ?如果 sending doesn‘t work! 我們將怎么辦?why? 如果這是你第一次使用RabbitMQ并且你看不到“發(fā)送的”消息,那么你可能抓耳撓 腮沒有足夠的空閑磁盤空間(默認(rèn)情況下它需要至少1 gb免費(fèi)),因此拒絕接受消息。檢查代理日志文件確認(rèn),如果有必要減少限制。配置文件的文檔將向您展示如何設(shè)置disk_free_limit。 接下來的是send.java所有源代碼:
[java]? view plain copy print ?
import?com.rabbitmq.client.ConnectionFactory;?? import?com.rabbitmq.client.Connection;?? import?com.rabbitmq.client.Channel;?? ?? public?class?Send?{?? ?????? ??private?final?static?String?QUEUE_NAME?=?"hello";?? ?? ??public?static?void?main(String[]?argv)?throws?Exception?{?? ???????????????? ????ConnectionFactory?factory?=?new?ConnectionFactory();?? ????factory.setHost("localhost");?? ????Connection?connection?=?factory.newConnection();?? ????Channel?channel?=?connection.createChannel();?? ?? ????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);?? ????String?message?=?"Hello?World!";?? ????channel.basicPublish("",?QUEUE_NAME,?null,?message.getBytes());?? ????System.out.println("?[x]?Sent?'"?+?message?+?"'");?? ?????? ????channel.close();?? ????connection.close();?? ??}?? }?? ?? ?② Receiving (接收)
這就是我們的發(fā)送者。我們的接收器是將消息從RabbitMQ,所以不像發(fā)送方發(fā)布一個(gè)消息,我們將保持運(yùn)行監(jiān)聽消息并打印出來
- The code (in?Recv.java) has almost the same imports as?Send:
import com.rabbitmq.client.ConnectionFactory
;
import com.rabbitmq.client.Connection
;
import com.rabbitmq.client.Channel
;
import com.rabbitmq.client.QueueingConsumer
; 額外的QueueingConsumer是一個(gè)類,我們將使用緩沖消息推到我們的服務(wù)器。設(shè)置發(fā)送者一樣,我們打開一個(gè)連接和一個(gè)通道,并宣布我們將使用的隊(duì)列。注意這與隊(duì)列,發(fā)送發(fā)布。 public
class Recv {private final static String QUEUE_NAME = "hello";public static void main(String[] argv)throws java.io.IOException,java.lang.InterruptedException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");...}
} 注意,我們?cè)谶@里聲明隊(duì)列。因?yàn)槲覀兛赡軙?huì)在發(fā)送方之前
開始啟動(dòng)接收方,我們要確保隊(duì)列存在之前我們嘗試使用消息。我們要告訴服務(wù)器提供我們從隊(duì)列的消息。因?yàn)樗鼘惒较?我們提供一個(gè)回調(diào)對(duì)象的形式,將緩沖的消息,直到我們準(zhǔn)備使用它們。 QueueingConsumer要做什么呢? QueueingConsumer
consumer = new QueueingConsumer(channel);channel.basicConsume(QUEUE_NAME, true, consumer);while (true) {QueueingConsumer.Delivery delivery = consumer.nextDelivery();String message = new String(delivery.getBody());System.out.println(" [x] Received '" + message + "'");}
QueueingConsumer.nextDelivery()塊,直到另一個(gè)來自服務(wù)器的消息交付。??
下面是Recv.java 源代碼:
[java]? view plain copy print ?
import?com.rabbitmq.client.ConnectionFactory;?? import?com.rabbitmq.client.Connection;?? import?com.rabbitmq.client.Channel;?? import?com.rabbitmq.client.QueueingConsumer;?? ?? public?class?Recv?{?? ?????? ????private?final?static?String?QUEUE_NAME?=?"hello";?? ?? ????public?static?void?main(String[]?argv)?throws?Exception?{?? ?? ????ConnectionFactory?factory?=?new?ConnectionFactory();?? ????factory.setHost("localhost");?? ????Connection?connection?=?factory.newConnection();?? ????Channel?channel?=?connection.createChannel();?? ?? ????channel.queueDeclare(QUEUE_NAME,?false,?false,?false,?null);?? ????System.out.println("?[*]?Waiting?for?messages.?To?exit?press?CTRL+C");?? ?????? ????QueueingConsumer?consumer?=?new?QueueingConsumer(channel);?? ????channel.basicConsume(QUEUE_NAME,?true,?consumer);?? ?????? ????while?(true)?{?? ??????QueueingConsumer.Delivery?delivery?=?consumer.nextDelivery();?? ??????String?message?=?new?String(delivery.getBody());?? ??????System.out.println("?[x]?Received?'"?+?message?+?"'");?? ????}?? ??}?? } ?
轉(zhuǎn)載于:https://my.oschina.net/zhanghaiyang/blog/599484
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ学习总结(3)——入门实例教程详解的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網(wǎng)站內(nèi)容還不錯(cuò),歡迎將生活随笔推薦給好友。