websocket心跳链接代码_Hyperf+RabbitMQ+WebSocket实现大屏幕消息推送
生活随笔
收集整理的這篇文章主要介紹了
websocket心跳链接代码_Hyperf+RabbitMQ+WebSocket实现大屏幕消息推送
小編覺得挺不錯的,現(xiàn)在分享給大家,幫大家做個參考.
作者:八重櫻
來源:www.cnblogs.com/a609251438/p/12713467.html
介紹
基于 Hyperf+ WebSocket +RabbitMQ 實現(xiàn)的一個簡單大屏幕的消息推送。
思路
利用 WebSocket 協(xié)議讓客戶端和服務(wù)器端保持有狀態(tài)的長鏈接,
保存鏈接上來的客戶端 id。訂閱發(fā)布者發(fā)布的消息針對已保存的客戶端 id 進行廣播消息。
WebSocket 服務(wù)
composer?require?hyperf/websocket-server配置文件 [config/autoload/server.php]
<?phpreturn ?['mode'?=> SWOOLE_PROCESS,'servers'?=> [????????['name'?=>?'http','type'?=> Server::SERVER_HTTP,'host'?=>?'0.0.0.0','port'?=>?11111,'sock_type'?=> SWOOLE_SOCK_TCP,'callbacks'?=> [????????????????SwooleEvent::ON_REQUEST => [HyperfHttpServerServer::class,?'onRequest'],????????????],????????],????????['name'?=>?'ws','type'?=> Server::SERVER_WEBSOCKET,'host'?=>?'0.0.0.0','port'?=>?12222,'sock_type'?=> SWOOLE_SOCK_TCP,'callbacks'?=> [????????????????SwooleEvent::ON_HAND_SHAKE => [HyperfWebSocketServerServer::class,?'onHandShake'],????????????????SwooleEvent::ON_MESSAGE => [HyperfWebSocketServerServer::class,?'onMessage'],????????????????SwooleEvent::ON_CLOSE => [HyperfWebSocketServerServer::class,?'onClose'],????????????],????????],????],WebSocket 服務(wù)器端代碼示例
<?phpdeclare (strict_types=1);/**?* This file is part of Hyperf.?*?*?@link?????https://www.hyperf.io?*?@document?https://doc.hyperf.io?*?@contact??group@hyperf.io?*?@license??https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE?*/namespace?AppController;use?HyperfContractOnCloseInterface;use?HyperfContractOnMessageInterface;use?HyperfContractOnOpenInterface;use?SwooleHttpRequest;use?SwooleServer;use?SwooleWebsocketFrame;use?SwooleWebSocketServer?as?WebSocketServer;class?WebSocketController?extends?Controller?implements?OnMessageInterface,?OnOpenInterface,?OnCloseInterface{/**?????* 發(fā)送消息?????*?@param?WebSocketServer $server?????*?@param?Frame $frame?????*/public?function?onMessage(WebSocketServer $server, Frame $frame):?void{//心跳刷新緩存????????$redis =?$this->container->get(Redis::class);//獲取所有的客戶端id????????$fdList = $redis->sMembers('websocket_sjd_1');//如果當(dāng)前客戶端在客戶端集合中,就刷新if?(in_array($frame->fd, $fdList)) {????????????$redis->sAdd('websocket_sjd_1', $frame->fd);????????????$redis->expire('websocket_sjd_1',?7200);????????}????????$server->push($frame->fd,?'Recv: '?. $frame->data);????}/**?????* 客戶端失去鏈接?????*?@param?Server $server?????*?@param?int $fd?????*?@param?int $reactorId?????*/public?function?onClose(Server $server, int $fd, int $reactorId):?void{//刪掉客戶端id????????$redis =?$this->container->get(Redis::class);//移除集合中指定的value????????$redis->sRem('websocket_sjd_1', $fd);????????var_dump('closed');????}/**?????* 客戶端鏈接?????*?@param?WebSocketServer $server?????*?@param?Request $request?????*/public?function?onOpen(WebSocketServer $server, Request $request):?void{//保存客戶端id????????$redis =?$this->container->get(Redis::class);????????$res1 = $redis->sAdd('websocket_sjd_1', $request->fd);????????var_dump($res1);????????$res = $redis->expire('websocket_sjd_1',?7200);????????var_dump($res);????????$server->push($request->fd,?'Opened');????}}WebSocket 前端代碼
function?WebSocketTest()?{if?("WebSocket"?in?window) {console.log("您的瀏覽器支持 WebSocket!");var?num =?0// 打開一個 web socketvar?ws =?new?WebSocket("ws://127.0.0.1:12222");????????ws.onopen =?function?()?{// Web Socket 已連接上,使用 send() 方法發(fā)送數(shù)據(jù)//alert("數(shù)據(jù)發(fā)送中...");//ws.send("發(fā)送數(shù)據(jù)");????????};window.setInterval(function?()?{?//每隔5秒鐘發(fā)送一次心跳,避免websocket連接因超時而自動斷開var?ping = {"type":?"ping"};????????????ws.send(JSON.stringify(ping));????????},?5000);???????ws.onmessage =?function?(evt)?{var?d =?JSON.parse(evt.data);console.log(d);if?(d.code ==?300) {????????????????$(".address").text(d.address)????????????}if?(d.code ==?200) {var?v = d.dataconsole.log(v);????????????????num++var?str =?`????????????????????????????????${v.recordOutTime}
???????????????????????????????${v.userOutName}
???????????????????????????????${v.userOutNum}
???????????????????????????????${v.doorOutName}
????????????????????????????`????????????????$(".tableHead").after(str)if?(num >?7) {???????????????????num--????????????????????$(".table .item:nth-last-child(1)").remove()????????????????}????????????}????????};????????ws.error =?function?(e)?{console.log(e)????????????alert(e)????????}????????ws.onclose =?function?()?{// 關(guān)閉 websocket????????????alert("連接已關(guān)閉...");????????};????}?else?{????????alert("您的瀏覽器不支持 WebSocket!");????}}AMQP 組件
composer?require?hyperf/amqp配置文件 [config/autoload/amqp.php]
<?phpreturn ?['default'?=> ['host'?=>?'localhost','port'?=>?5672,'user'?=>?'guest','password'?=>?'guest','vhost'?=>?'/','pool'?=> ['min_connections'?=>?1,'max_connections'?=>?10,'connect_timeout'?=>?10.0,'wait_timeout'?=>?3.0,'heartbeat'?=>?-1,????????],'params'?=> ['insist'?=>?false,'login_method'?=>?'AMQPLAIN','login_response'?=>?null,'locale'?=>?'en_US','connection_timeout'?=>?3.0,'read_write_timeout'?=>?6.0,'context'?=>?null,'keepalive'?=>?false,'heartbeat'?=>?3,????????],????],];MQ 消費者代碼<?phpdeclare (strict_types=1);namespace?AppAmqpConsumer;use?HyperfAmqpAnnotationConsumer;use?HyperfAmqpMessageConsumerMessage;use?HyperfAmqpResult;use?HyperfServerServer;use?HyperfServerServerFactory;/**?*?@Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1)?*/class?DemoConsumer?extends?ConsumerMessage{/**?????* rabbmitMQ消費端代碼?????*?@param?$data?????*?@return?string?????*/public?function?consume($data):?string{????????print_r($data);//獲取集合中所有的value????????$redis =?$this->container->get(Redis::class);????????$fdList=$redis->sMembers('websocket_sjd_1');????????$server=$this->container->get(ServerFactory::class)->getServer()->getServer();foreach($fdList?as?$key=>$v){if(!empty($v)){????????????????$server->push((int)$v, $data);????????????}????????}return?Result::ACK;????}}控制器代碼
/**?* test?*?@return?array?*/public?function?test(){????$data =?array('code'?=>?200,'data'?=> ['userOutName'?=>?'ccflow','userOutNum'?=>?'9999','recordOutTime'?=> date("Y-m-d H:i:s", time()),'doorOutName'?=>?'教師公寓',????????]????);????$data = GuzzleHttpjson_encode($data);????$message =?new?DemoProducer($data);????$producer = ApplicationContext::getContainer()->get(Producer::class);????$result = $producer->produce($message);????var_dump($result);????$user =?$this->request->input('user',?'Hyperf');????$method =?$this->request->getMethod();return?['method'?=> $method,'message'?=>?"{$user}.",????];}最終效果
總結(jié)
以上是生活随笔為你收集整理的websocket心跳链接代码_Hyperf+RabbitMQ+WebSocket实现大屏幕消息推送的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: keras中lstm参数_如何使用Ker
- 下一篇: mybatis plus 日志打印_my