RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)
2019獨角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
發(fā)布/訂閱
在上篇教程中,我們搭建了一個工作隊列。每個任務(wù)之分發(fā)給一個工作者(worker)。在本篇教程中,我們要做的之前完全不一樣——分發(fā)一個消息給多個消費者(consumers)。這種模式被稱為“發(fā)布/訂閱”。
為了描述這種模式,我們將會構(gòu)建一個簡單的日志系統(tǒng)。它包括兩個程序——第一個程序負責(zé)發(fā)送日志消息,第二個程序負責(zé)獲取消息并輸出內(nèi)容。
在我們的這個日志系統(tǒng)中,所有正在運行的接收方程序都會接受消息。我們用其中一個接收者(receiver)把日志寫入硬盤中,另外一個接受者(receiver)把日志輸出到屏幕上。
最終,日志消息被廣播給所有的接受者(receivers)。
交換器(Exchanges)
前面的教程,我們發(fā)送消息到隊列并從中取出消息?,F(xiàn)在是時候介紹RabbitMq中完整的消息模型了。
讓我們簡單的概括一下之前的教程:
- 發(fā)布者(producer)是發(fā)布消息的應(yīng)用程序。
- 隊列(queue)用于消息存儲的緩沖。
- 消費者(consumer)是接收消息的應(yīng)用程序。
RabbitMQ消息模型的核心理念是:發(fā)布者(producer)不會直接發(fā)送任何消息給隊列。事實上,發(fā)布者(producer)甚至不知道消息是否已經(jīng)被投遞到隊列。
發(fā)布者(producer)只需要把消息發(fā)送給一個交換器(exchange)。交換器非常簡單,它一邊從發(fā)布者方接收消息,一邊把消息推入隊列。交換器必須知道如何處理它接收到的消息,是應(yīng)該推送到指定的隊列還是是多個隊列,或者是直接忽略消息。這些規(guī)則是通過exchange type來定義的。
有幾個可供選擇的交換器類型:AMQPEXTYPEDIRECT, AMQPEXTYPEFANOUT,AMQPEXTYPEHEADER or AMQPEXTYPETOPIC。我們在這里主要說明AMQPEXTYPE_FANOUT。先創(chuàng)建一個fanout類型的交換器,命名為logs:
$exchange->setName('logs'); $exchange->setType(AMQP_EX_TYPE_FANOUT); $exchange->declare();fanout交換器很簡單,你可能從名字上就能猜測出來,它把消息發(fā)送給它所知道的所有隊列。這正是我們的日志系統(tǒng)所需要的。
交換器列表
rabbitmqctl能夠列出服務(wù)器上所有的交換器:
$ sudo rabbitmqctl list_exchanges Listing exchanges ... logs fanout amq.direct direct amq.topic topic amq.fanout fanout amq.headers headers ...done.
這個列表中有一些叫做amq.*的交換器。這些都是默認創(chuàng)建的,不過這時候你還不需要使用他們。
匿名的交換器
前面的教程中我們對交換器一無所知,但仍然能夠發(fā)送消息到隊列中。因為我們使用了命名為空字符串(“”)默認的交換器。
回想我們之前是如何發(fā)布一則消息:
``` $exchange->publish($message, $routeKey);
```
exchange參數(shù)就是交換器的名稱??兆址砟J或者匿名交換器:消息將會根據(jù)指定的routing_key分發(fā)到指定的隊列。
在PHP的AMQP中如果exchange設(shè)置為匿名的話,是報錯的:PHP Fatal error: Uncaught exception ‘AMQPExchangeException’ with message ‘Invalid exchange name given, must be between 1 and 255 characters long.’
現(xiàn)在,我們就可以發(fā)送消息到一個具名交換器了:
$exchange->publish($message, '');臨時隊列
你還記得之前我們使用的隊列名嗎( hello和task_queue)?給一個隊列命名是很重要的——我們需要把工作者(workers)指向正確的隊列。如果你打算在發(fā)布者(producers)和消費者(consumers)之間共享同隊列的話,給隊列命名是十分重要的。
但是這并不適用于我們的日志系統(tǒng)。我們打算接收所有的日志消息,而不僅僅是一小部分。我們關(guān)心的是最新的消息而不是舊的。為了解決這個問題,我們需要做兩件事情。
首先,當(dāng)我們連接上RabbitMQ的時候,我們需要一個全新的、空的隊列。我們可以手動創(chuàng)建一個隨機的隊列名,或者讓服務(wù)器為我們選擇一個隨機的隊列名(推薦)。我們只要在調(diào)用$queue->declare();方法的時候,不提供queue參數(shù)就可以了:
$queue = new AMQPQueue($channel); $queue->setFlags(AMQP_EXCLUSIVE); $queue->declare();這時候我們可以通過$queue->getName();獲得已經(jīng)生成的隨機隊列名。它可能是這樣子的:amq.gen-U0srCoW8TsaXjNh73pnVAw==。
第二步,當(dāng)與消費者(consumer)斷開連接的時候,這個隊列應(yīng)當(dāng)被刪除。我們可以使用exclusive標(biāo)識。
$queue->setFlags(AMQP_EXCLUSIVE);綁定(Bindings)
我們已經(jīng)創(chuàng)建了一個fanout類型的交換器和一個隊列?,F(xiàn)在我們需要告訴交換器如何發(fā)送消息給我們的隊列。交換器和隊列之間的關(guān)系我們稱之為綁定(binding)。
$queue->bind($exchangeName, $queue->getName());現(xiàn)在,logs交換器將會把消息添加到我們的隊列中。
綁定列表。
你可以使用rabbitmqctl list_bindings隊列出所有存在的綁定。.
整合
發(fā)布日志消息的程序看起來和之前的沒有太大區(qū)別。最重要的改變就是我們把消息發(fā)送給logs交換器而不是匿名交換器。在發(fā)送的時候我們需要提供routingkey參數(shù),但是它的值會被fanout交換器忽略。以下是emitlog.php腳本:
<?php/*** PHP amqp(RabbitMQ) Demo-3* @author yuansir &lt;yuansir@live.cn/yuansir-web.com>*/$exchangeName = 'logs'; $message = empty($argv[1]) ? 'info:Hello World!' : ' '.$argv[1];$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n");$channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); $exchange->declare();$exchange->publish($message, ''); var_dump("[x] Sent $message");$connection->disconnect();正如你看到的那樣,在連接成功之后,我們聲明了一個交換器,這一個是很重要的,因為不允許發(fā)布消息到不存在的交換器。
如果沒有綁定隊列到交換器,消息將會丟失。但這個沒有所謂,如果沒有消費者監(jiān)聽,那么消息就會被忽略。
receive_logs.php的代碼:
<?php/*** PHP amqp(RabbitMQ) Demo-3* @author yuansir &lt;yuansir@live.cn/yuansir-web.com>*/ $exchangeName = 'logs';$connection = new AMQPConnection(array('host' => '127.0.0.1', 'port' => '5672', 'vhost' => '/', 'login' => 'guest', 'password' => 'guest')); $connection->connect() or die("Cannot connect to the broker!\n"); $channel = new AMQPChannel($connection); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_FANOUT); $exchange->declare(); $queue = new AMQPQueue($channel); $queue->setFlags(AMQP_EXCLUSIVE); $queue->declare(); $queue->bind($exchangeName, '');var_dump('[*] Waiting for messages. To exit press CTRL+C'); while (TRUE) { $queue->consume('callback'); } $connection->disconnect();function callback($envelope, $queue) { $msg = $envelope->getBody();var_dump(" [x] Received:" . $msg);$queue->nack($envelope->getDeliveryTag()); }這樣我們就完成了。如果你想把日志保存到文件中,只需要打開控制臺輸入:
$ php receive_logs.php > logs_from_rabbit.log如果你想在屏幕中查看日志,那么打開一個新的終端然后運行:
$ php receive_logs.php當(dāng)然還要發(fā)送日志:
$ php emit_log.php使用rabbitmqctl listbindings你可確認已經(jīng)創(chuàng)建了隊列的綁定。你可以看到運行中的兩個receivelogs.php程序:
$ sudo rabbitmqctl list_bindings Listing bindings ... ... logs amq.gen-TJWkez28YpImbWdRKMa8sg== [] logs amq.gen-x0kymA4yPzAT6BoC/YP+zw== [] ...done.顯示結(jié)果很直觀:logs交換器把數(shù)據(jù)發(fā)送給兩個系統(tǒng)命名的隊列。這就是我們所期望的。
如何監(jiān)聽消息的子集呢?讓我們移步教程4
轉(zhuǎn)載于:https://my.oschina.net/u/1186749/blog/786460
總結(jié)
以上是生活随笔為你收集整理的RabbitMQ官方中文入门教程(PHP版) 第三部分:发布/订阅(Publish/Subscribe)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【转载】数据库范式那些事
- 下一篇: com.android.ddmlib.S