rabbitmq java文档_《RabbitMQ官方文档》订阅与发布
之前的教程中,我們創(chuàng)建了一個(gè)工作隊(duì)列。在一個(gè)工作隊(duì)列背后的假設(shè)是將每個(gè)任務(wù)都準(zhǔn)確地交付給一個(gè)工作人員。在這個(gè)環(huán)節(jié)我們要做些完全不同的事情—我們將要把一個(gè)消息傳遞給多個(gè)消費(fèi)者。這種模式被稱為“發(fā)布/訂閱”。
為了闡述這種模式,我們打算構(gòu)建一個(gè)簡(jiǎn)單的日志系統(tǒng)。它由兩個(gè)程序組成—第一個(gè)發(fā)出日志消息,第二個(gè)接收消息并打印出來(lái)。
在我們的日志系統(tǒng)中,每次運(yùn)行接收者程序的副本都將收到消息。這使我們能夠運(yùn)行一個(gè)接收者程序,將日志導(dǎo)向磁盤;同時(shí)我們可以運(yùn)行另一個(gè)接收者程序查看屏幕上的日志。
實(shí)際上,已發(fā)布的日志消息將被廣播到所有接收者。
交換
在教程的前面部分中,我們發(fā)送和接受消息都來(lái)自一個(gè)隊(duì)列,現(xiàn)在是時(shí)候引進(jìn)一個(gè)完整的Rabbit消息模型了。
讓我們快速地回顧一下在之前的教程里介紹的內(nèi)容:
生產(chǎn)者是發(fā)送消息的用戶程序。
隊(duì)列是儲(chǔ)存消息的緩存。
消費(fèi)者是接收消息的用戶程序。
在Rabbit消息模型中核心的思想是,生產(chǎn)者從不直接向一個(gè)隊(duì)列發(fā)送任何消息。事實(shí)上,通常生產(chǎn)者甚至不知道將一個(gè)消息是否傳遞到了某一隊(duì)列。
相反的,生產(chǎn)者只能向交換器(exchange)發(fā)送消息。交換器是一個(gè)非常簡(jiǎn)單的東西。它在一端從生產(chǎn)者接收消息,在另一端將消息發(fā)布到隊(duì)列。交換器必須準(zhǔn)確地知道它收到的消息要做什么。它應(yīng)該被添加到一個(gè)特殊的隊(duì)列?它應(yīng)該被添加到多個(gè)隊(duì)列?或者它應(yīng)該被廢棄?這些規(guī)則由交換類型定義。
有許多交換類型是可用的:direct,tpic,headers和fanout。我們來(lái)關(guān)注最后一種–fanout,我們來(lái)創(chuàng)建一個(gè)這種類型的交換器命名為logs:
channel.exchangeDeclare("logs", "fanout");
這個(gè)fanout交換器非常簡(jiǎn)單。就像你可以從它的名字中猜到的,它僅僅是將它收到的所有消息傳遞到它所知的隊(duì)列中去。這正是我們的日志系統(tǒng)所需要的。
列出交換器
你可以用有用的rabbitmqctl列出交換器:
sudo rabbitmqctl list_exchanges
在列表里會(huì)有一些名字為 amq.*的默認(rèn)(未命名的),但是現(xiàn)在你可能不需要使用它們。
匿名交換器
在教程的前面部分中,我們還不知道交換器,但仍能夠?qū)⑾l(fā)送到對(duì)了,這是因?yàn)槲覀兛赡苡昧艘粋€(gè)默認(rèn)的交換器,我們用空字符串(””)來(lái)識(shí)別它。
想想我們之前是怎樣發(fā)布一條消息的:
channel.basicPublish("", "hello", null, message.getBytes());
第一個(gè)參數(shù)是交換器的名字。空字符串指明是默認(rèn)或匿名的交換器:消息通過(guò)“routingKey”選擇指定的名稱路由到隊(duì)列,如果它存在的話。
現(xiàn)在我們可以發(fā)布到我們命名的交換器了。
channel.basicPublish( "logs", "", null, message.getBytes());
臨時(shí)隊(duì)列
也許你記得我們之前使用的隊(duì)列都有一個(gè)指定的名字(記得hello和task——queue嗎?)。能夠?yàn)橐粋€(gè)隊(duì)列指定名字對(duì)于我們來(lái)說(shuō)是很重要的—我們需要將工作者指向相同的隊(duì)列。當(dāng)你想要在生產(chǎn)者和消費(fèi)者之間同用一個(gè)隊(duì)列的時(shí)候,給一個(gè)隊(duì)列賦予名字是很重要的。
但這不適用于我們的日志系統(tǒng)。我們想要監(jiān)聽(tīng)所有的消息,而不只是其中的一部分。我們也僅只對(duì)當(dāng)前活動(dòng)的消息感興趣而不是舊的。為了解決這個(gè)問(wèn)題我們需要兩個(gè)東西。
首先,每當(dāng)我們連接Rabbit我們需要一個(gè)刷新過(guò)的空隊(duì)列。我們可以創(chuàng)建一個(gè)隊(duì)列并使用一個(gè)隨機(jī)的名字,甚至更好的做法是讓服務(wù)器為我們選擇一個(gè)隨機(jī)的名字:
其次,一旦我們使消費(fèi)者從隊(duì)列斷開(kāi)后,隊(duì)列應(yīng)該自動(dòng)被刪掉。
在Java客戶端,當(dāng)我們使用一個(gè)不帶參數(shù)的queueDeclare()時(shí),我們使用自動(dòng)生成的名字創(chuàng)建了一個(gè),不持久的,專用的,自動(dòng)刪除的隊(duì)列。
String queueName = channel.queueDeclare().getQueue();
此時(shí)queueName包含一個(gè)隨機(jī)的隊(duì)列名字。例如它可能是amq.gen-JzTY20BRgKO-HjmUJj0wLg.
綁定
我們已經(jīng)創(chuàng)建了一個(gè)fanout交換器和一個(gè)隊(duì)列。現(xiàn)在我們需要讓交換器將消息發(fā)送到我們的隊(duì)列里。交換器和一個(gè)隊(duì)列之間的關(guān)系稱為一個(gè)綁定。
channel.queueBind(queueName, "logs", "");
現(xiàn)在開(kāi)始logs交換器會(huì)添加消息到我們的隊(duì)列。
列出綁定
你可以列出所有存在的綁定,你可以猜到這條命令:
rabbitmqctl list_bindings
將他們聯(lián)系在一起
用于發(fā)出消息的生產(chǎn)者程序,看起來(lái)和之前的教程里沒(méi)有太大差異。最重要的改變是現(xiàn)在我們要發(fā)布消息到我們的logs交換器而不是匿名的。我們需要在發(fā)送的時(shí)候提供一個(gè)routingKey,但對(duì)于fanout交換器來(lái)說(shuō),這個(gè)值會(huì)被忽略掉。
以下是EmitLog.java程序的代碼:
import java.io.IOException;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv)
throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
//...
}
正如你所見(jiàn),在建立一個(gè)連接后我們聲明了一個(gè)交換器,這個(gè)步驟是必須的因?yàn)榘l(fā)布到一個(gè)不存在的交換器是禁止的。
如果沒(méi)有隊(duì)列綁定到交換器上消息將丟失,但這對(duì)于我們來(lái)說(shuō)沒(méi)有影響;如果沒(méi)有消費(fèi)者在監(jiān)聽(tīng),我們可以安全地丟棄消息。
這是ReceiveLogs.java:的代碼:
import com.rabbitmq.client.*;
import java.io.IOException;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(queueName, true, consumer);
}
}
像我們之前那樣編譯它。
javac -cp $CP EmitLog.java ReceiveLogs.java
如果你想要把日志保存到文件,只需打開(kāi)控制臺(tái)并輸入:
java -cp $CP ReceiveLogs > logs_from_rabbit.log
如果你想要在屏幕里看到日志,spawn一個(gè)新的終端并且運(yùn)行:
java -cp?$CP ReceiveLogs
當(dāng)然,要發(fā)出日志類型:
java -cp $CP EmitLog
使用rabbitmqctl list_bindings 你可以驗(yàn)證代碼實(shí)際上是根據(jù)需要?jiǎng)?chuàng)建綁定和隊(duì)列。在兩個(gè)ReceiveLogs.java程序中你應(yīng)該可以看到這些東西:
sudo rabbitmqctl list_bindings
# => Listing bindings …
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => …done.
結(jié)果解釋很直接:數(shù)據(jù)從交換器logs而來(lái)去到兩個(gè)系統(tǒng)分配名字的隊(duì)列。這正是我們想要的。
要了解如何監(jiān)聽(tīng)消息的一個(gè)子集,我們繼續(xù)閱讀教程4。
總結(jié)
以上是生活随笔為你收集整理的rabbitmq java文档_《RabbitMQ官方文档》订阅与发布的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: java只有值传递_面试官:为什么 Ja
- 下一篇: 官媒发文质疑增高针滥用,长春高新股价跌停