在PHP中如何使用消息列队
生活随笔
收集整理的這篇文章主要介紹了
在PHP中如何使用消息列队
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
/*** 消息列隊服務* @author zhou.tingze* @example* -----------------------------------Create----------------------------------------* $array = array('a','b','c','d');* $this->load->library('amqp_service');* $this->amqp_service->setSaveType('test_exchange', 'test_queue', 'test_router');* $this->amqp_service->createMessageQueue($array);* -----------------------------------End-------------------------------------------* * -----------------------------------Get-------------------------------------------* $this->load->library('amqp_service');* $this->amqp_service->setSaveType('test_exchange', 'test_queue', 'test_router');* $message_queue = $this->amqp_service->getMessageQueue();* var_dump($message_queue)* -----------------------------------End-------------------------------------------*/class Amqp_service extends Base_service{public $conn;public $exchange;public $queue;public $router;function __construct(){parent:: __construct();//獲取系統配置$this->load->config('app_config', TRUE);$app_config = $this->config->item('app_config');$this->connect($app_config['amqp']);}/*** * 嘗試連接Amqp服務*/private function connect($amqp_args){ $this->conn = new AMQPConnection($amqp_args);$this->conn->connect();if (!$this->conn->isConnected()) {throw new Exception('Cannot connect to the broker.');}}/*** * 設定消息列隊保存方式* @param String $exchange_name 交換機名* @param String $queue_name 消息列隊名* @param String $router_name 路由名*/public function setSaveType($exchange_name, $queue_name, $router_name){$this->exchange = $exchange_name;$this->queue = $queue_name;$this->router = $router_name;}/*** * 創建消息列隊* @param Array $array*/public function createMessageQueue($array){//創建交換機 $channel = new AMQPChannel($this->conn);$ex = new AMQPExchange($channel);//交換機名 $ex->setName($this->exchange);$ex->setType(AMQP_EX_TYPE_DIRECT);$ex->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);$ex->declare();//創建消息列隊$q = new AMQPQueue($channel);//隊列名$q->setName($this->queue);$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);$q->declare();//綁定交換機與隊列,并指定路由鍵 $q->bind($this->exchange, $this->router);//消息發布$channel->startTransaction();$message = json_encode($array);$ex->publish($message, $this->router);$channel->commitTransaction();//$this->conn->disconnect();}/*** * 獲取消息列隊*/public function getMessageQueue(){try{//設置queue名稱,使用exchange,綁定routingkey$channel = new AMQPChannel($this->conn);$q = new AMQPQueue($channel);$q->setName($this->queue);$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);$q->declare();$q->bind($this->exchange, $this->router); //消息獲取$messages = $q->get(AMQP_AUTOACK) ;$arr = array();if ($messages){$arr = json_decode($messages->getBody(), true );}}catch (Exception $e){throw new Exception($e->getMessage());}//$this->conn->disconnect();return $arr;}/*public function getAllMessageQueue(){//設置queue名稱,使用exchange,綁定routingkey$channel = new AMQPChannel($this->conn);$q = new AMQPQueue($channel);$q->setName($this->queue);$q->setFlags(AMQP_DURABLE | AMQP_AUTODELETE);$q->declare();$q->bind($this->exchange, $this->router); $this->conn->disconnect();//阻塞模式獲取消息列隊while(True){ $q->consume('processMessage'); //$q->consume('processMessage', AMQP_AUTOACK); //自動ACK應答 } }*/public function __destruct(){$this->conn->disconnect();}
}/*** 消費回調函數* 處理消息* @param Object $envelope* @param Object $queue*/
/*
function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg . '<br />';//手動發送ACK應答 $queue->ack($envelope->getDeliveryTag());
}
*/
轉載于:https://www.cnblogs.com/adtuu/p/4670229.html
總結
以上是生活随笔為你收集整理的在PHP中如何使用消息列队的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 钱趣多风控新举措:源头选择与物理隔离
- 下一篇: 【Android开发日记】第一个任务An