rabbitmq队列php应用,RabbitMQ工作队列应用
工作隊列是說由一個生產者發送消息給隊列,而可能有一個或多個消費者等待從隊列接收并處理消息
循環調度
當有多個消費者時,隊列循環發送消息給每一個消費者,默認情況下,rabbitmq會按順序發送一條消息給下一個消費者,平均每個消費者會收到相同數量的消息
消息確認
消息確認是防止消息丟失,一個確認信息會從消費者返回,告訴rabbitmq該消費已經收到處理,這時rabbitmq才會釋放刪除消息。默認情況下消息確認是被關閉的,設置basic_consume()的第四個參數為false(true means no ack),且從消費者發送一個確認即可。
消息持久化
要確保消息不丟失我們需要標識隊列和消息都是可持久的,首先要確保rabbitmq不會丟失隊列,我們需要聲明隊列為可持久的,這里設置queue_declare的第三個參數為true即可
$channel->queue_declare('task_queue', false, true, false, false);
其次我們需要通過delivery_mode = 2參數標記消息是持久化的,如
$msg = new AMQPMessage($data,
array('delivery_mode' => 2) # make message persistent
);
公平調度
循環調度容易造成奇偶消息費者閑忙不等的情況,為了公平調度我們可以使用basic_qos方法,同時設置參數prefetch=1,如
$channel->basic_qos(null, 1, null);
生產者new_task.php
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
//第3個參數為true,使消息隊列持久化
$channel->queue_declare('task_queue', false, true, false, false);
$data = implode(' ', array_slice($argv, 1));
if(empty($data)) $data = "Hello World!";
$msg = new AMQPMessage($data,
array('delivery_mode' => 2) # 使消息持久化
);
$channel->basic_publish($msg, '', 'task_queue');
echo " [x] Sent ", $data, "\n";
$channel->close();
$connection->close();
消費者worker.php
require_once __DIR__ . '/../vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
//第3個參數為true,使消息隊列持久化
$channel->queue_declare('task_queue', false, true, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
sleep(substr_count($msg->body, '.'));
echo " [x] Done", "\n";
//發送回復確認
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
};
//合理分發消息,即等消費者回復確認之后不忙的時候再發送消息
$channel->basic_qos(null,1,null);
//第4個參數為false表示等待消費者回復再刪除消息
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while(count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
總結
以上是生活随笔為你收集整理的rabbitmq队列php应用,RabbitMQ工作队列应用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: matlab摄像头录像保存在哪里,mat
- 下一篇: 空调支架生锈会不会断掉