漫漫的webim(一) web实现简易im功能
????????因為本人的工作需要,偶爾被要求實現一些市面上已經有的成熟接口功能。這里要轉折一下,不是說我實現的功能的穩定性和成熟度已經達到了可以商用的標準,只是被用作一個給客戶展示的demo而已,有點小尷尬。
????? ? 進入正題吧,用web實現im功能,目前主流的解決方案總結為如下兩點:
????????? ? 1.使用服務器輪詢技術實現。
????????? ? 2.使用websocket技術實現。
????? ? 恰巧以上兩種方式,我都沒研究過,輪詢技術大致懂,但是這樣做容易徒廢服務器資源,并且公司提供的demo服務器配置也很low,我怕給客戶演示的時候,它宕掉了,我特么的工作也就被宕掉了。再說到websocket,這個基本是市面上我了解的webim接扣的技術基礎,看了看websocket的php(對,我特么也是個光榮的php開發者~php是世界上最好的語言~輕噴)服務端轉發代碼,有點耽擱時間,遂拋棄了它。
????? ? 我百度了一圈,可悲的發現,沒有一個是對我有用的,可能說是沒有一個在我想耗費的時間內完成。這個時候大多牛逼程序員會靈光乍現,然后手速飛快的自己敲出了一套市面上沒有的新技術,看到這里,你們就懂了,我特么不是這樣的程序員,好吧,在我放棄了這些技術后,我想起來以前給一個電力能效的項目用mqtt(不懂mqtt的同學可以自行百度下)掛過長連接,實時反饋電力設備數據到web管理端,反正大家都是長連接,我用來干干,也是不礙事的吧。
? ??? ? 一、準備工作:
????????? ? 1.客戶端:mqttws31.min.js mqtt官方給的js客戶端。
????????? ? 2.服務端:phpMQtt.php mqttphp端腳本。
????????????????????????? ? ?recive.php 接收信息,實例化mqtt進行轉發腳本。
????? ? mqttws31.min.js可以再mqtt官網進行下載,php端代碼在這里粘貼出來,需要用的可以自取(因為我也是借鑒別人的)
<?php/*phpMQTTA simple php class to connect/publish/subscribe to an MQTT broker*//*LicenceCopyright (c) 2010 Blue Rhinos Consulting | Andrew Milstedandrew@bluerhinos.co.uk | http://www.bluerhinos.co.ukPermission is hereby granted, free of charge, to any person obtaining a copyof this software and associated documentation files (the "Software"), to dealin the Software without restriction, including without limitation the rightsto use, copy, modify, merge, publish, distribute, sublicense, and/or sellcopies of the Software, and to permit persons to whom the Software isfurnished to do so, subject to the following conditions:The above copyright notice and this permission notice shall be included inall copies or substantial portions of the Software.THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS ORIMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THEAUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHERLIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS INTHE SOFTWARE.*//* phpMQTT */ class phpMQTT {private $socket; /* holds the socket */private $msgid = 1; /* counter for message id */public $keepalive = 10; /* default keepalive timmer */public $timesinceping; /* host unix time, used to detect disconects */public $topics = array(); /* used to store currently subscribed topics */public $debug = false; /* should output debug messages */public $address; /* broker address */public $port; /* broker port */public $clientid; /* client id sent to brocker */public $will; /* stores the will of the client */private $username; /* stores username */private $password; /* stores password */function __construct($address, $port, $clientid){$this->broker($address, $port, $clientid);}/* sets the broker details */function broker($address, $port, $clientid){$this->address = $address;$this->port = $port;$this->clientid = $clientid; }function connect_auto($clean = true, $will = NULL, $username = NULL, $password = NULL){while($this->connect($clean, $will, $username, $password)==false){sleep(10);}return true;}/* connects to the broker inputs: $clean: should the client send a clean session flag */function connect($clean = true, $will = NULL, $username = NULL, $password = NULL){if($will) $this->will = $will;if($username) $this->username = $username;if($password) $this->password = $password;$address = gethostbyname($this->address); $this->socket = fsockopen($address, $this->port, $errno, $errstr, 60);if (!$this->socket ) {if($this->debug) error_log("fsockopen() $errno, $errstr \n");return false;}stream_set_timeout($this->socket, 5);stream_set_blocking($this->socket, 0);$i = 0;$buffer = "";$buffer .= chr(0x00); $i++;$buffer .= chr(0x06); $i++;$buffer .= chr(0x4d); $i++;$buffer .= chr(0x51); $i++;$buffer .= chr(0x49); $i++;$buffer .= chr(0x73); $i++;$buffer .= chr(0x64); $i++;$buffer .= chr(0x70); $i++;$buffer .= chr(0x03); $i++;//No Will$var = 0;if($clean) $var+=2;//Add will info to headerif($this->will != NULL){$var += 4; // Set will flag$var += ($this->will['qos'] << 3); //Set will qosif($this->will['retain']) $var += 32; //Set will retain}if($this->username != NULL) $var += 128; //Add username to headerif($this->password != NULL) $var += 64; //Add password to header$buffer .= chr($var); $i++;//Keep alive$buffer .= chr($this->keepalive >> 8); $i++;$buffer .= chr($this->keepalive & 0xff); $i++;$buffer .= $this->strwritestring($this->clientid,$i);//Adding will to payloadif($this->will != NULL){$buffer .= $this->strwritestring($this->will['topic'],$i); $buffer .= $this->strwritestring($this->will['content'],$i);}if($this->username) $buffer .= $this->strwritestring($this->username,$i);if($this->password) $buffer .= $this->strwritestring($this->password,$i);$head = " ";$head{0} = chr(0x10);$head{1} = chr($i);fwrite($this->socket, $head, 2);fwrite($this->socket, $buffer);$string = $this->read(4);if(ord($string{0})>>4 == 2 && $string{3} == chr(0)){if($this->debug) echo "Connected to Broker\n"; }else{ error_log(sprintf("Connection failed! (Error: 0x%02x 0x%02x)\n", ord($string{0}),ord($string{3})));return false;}$this->timesinceping = time();return true;}/* read: reads in so many bytes */function read($int = 8192, $nb = false){// print_r(socket_get_status($this->socket));$string="";$togo = $int;if($nb){return fread($this->socket, $togo);}while (!feof($this->socket) && $togo>0) {$fread = fread($this->socket, $togo);$string .= $fread;$togo = $int - strlen($string);}return $string;}/* subscribe: subscribes to topics */function subscribe($topics, $qos = 0){$i = 0;$buffer = "";$id = $this->msgid;$buffer .= chr($id >> 8); $i++;$buffer .= chr($id % 256); $i++;foreach($topics as $key => $topic){$buffer .= $this->strwritestring($key,$i);$buffer .= chr($topic["qos"]); $i++;$this->topics[$key] = $topic; }$cmd = 0x80;//$qos$cmd += ($qos << 1);$head = chr($cmd);$head .= chr($i);fwrite($this->socket, $head, 2);fwrite($this->socket, $buffer, $i);$string = $this->read(2);$bytes = ord(substr($string,1,1));$string = $this->read($bytes);}/* ping: sends a keep alive ping */function ping(){$head = " ";$head = chr(0xc0); $head .= chr(0x00);fwrite($this->socket, $head, 2);if($this->debug) echo "ping sent\n";}/* disconnect: sends a proper disconect cmd */function disconnect(){$head = " ";$head{0} = chr(0xe0); $head{1} = chr(0x00);fwrite($this->socket, $head, 2);}/* close: sends a proper disconect, then closes the socket */function close(){$this->disconnect();fclose($this->socket); }/* publish: publishes $content on a $topic */function publish($topic, $content, $qos = 0, $retain = 0){$i = 0;$buffer = "";$buffer .= $this->strwritestring($topic,$i);//$buffer .= $this->strwritestring($content,$i);if($qos){$id = $this->msgid++;$buffer .= chr($id >> 8); $i++;$buffer .= chr($id % 256); $i++;}$buffer .= $content;$i+=strlen($content);$head = " ";$cmd = 0x30;if($qos) $cmd += $qos << 1;if($retain) $cmd += 1;$head{0} = chr($cmd); $head .= $this->setmsglength($i);fwrite($this->socket, $head, strlen($head));fwrite($this->socket, $buffer, $i);}/* message: processes a recieved topic */function message($msg){$tlen = (ord($msg{0})<<8) + ord($msg{1});$topic = substr($msg,2,$tlen);$msg = substr($msg,($tlen+2));$found = 0;foreach($this->topics as $key=>$top){if( preg_match("/^".str_replace("#",".*",str_replace("+","[^\/]*",str_replace("/","\/",str_replace("$",'\$',$key))))."$/",$topic) ){if(is_callable($top['function'])){call_user_func($top['function'],$topic,$msg);$found = 1;}}}if($this->debug && !$found) echo "msg recieved but no match in subscriptions\n";}/* proc: the processing loop for an "allways on" client set true when you are doing other stuff in the loop good for watching something else at the same time */ function proc( $loop = true){if(1){$sockets = array($this->socket);$w = $e = NULL;$cmd = 0;//$byte = fgetc($this->socket);if(feof($this->socket)){if($this->debug) echo "eof receive going to reconnect for good measure\n";fclose($this->socket);$this->connect_auto(false);if(count($this->topics))$this->subscribe($this->topics); }$byte = $this->read(1, true);if(!strlen($byte)){if($loop){usleep(100000);}}else{ $cmd = (int)(ord($byte)/16);if($this->debug) echo "Recevid: $cmd\n";$multiplier = 1; $value = 0;do{$digit = ord($this->read(1));$value += ($digit & 127) * $multiplier; $multiplier *= 128;}while (($digit & 128) != 0);if($this->debug) echo "Fetching: $value\n";if($value)$string = $this->read($value,"fetch");if($cmd){switch($cmd){case 3:$this->message($string);break;}$this->timesinceping = time();}}if($this->timesinceping < (time() - $this->keepalive )){if($this->debug) echo "not found something so ping\n";$this->ping(); }if($this->timesinceping<(time()-($this->keepalive*2))){if($this->debug) echo "not seen a package in a while, disconnecting\n";fclose($this->socket);$this->connect_auto(false);if(count($this->topics))$this->subscribe($this->topics);}}return 1;}/* getmsglength: */function getmsglength(&$msg, &$i){$multiplier = 1; $value = 0 ;do{$digit = ord($msg{$i});$value += ($digit & 127) * $multiplier; $multiplier *= 128;$i++;}while (($digit & 128) != 0);return $value;}/* setmsglength: */function setmsglength($len){$string = "";do{$digit = $len % 128;$len = $len >> 7;// if there are more digits to encode, set the top bit of this digitif ( $len > 0 )$digit = ($digit | 0x80);$string .= chr($digit);}while ( $len > 0 );return $string;}/* strwritestring: writes a string to a buffer */function strwritestring($str, &$i){$ret = " ";$len = strlen($str);$msb = $len >> 8;$lsb = $len % 256;$ret = chr($msb);$ret .= chr($lsb);$ret .= $str;$i += ($len+2);return $ret;}function printstr($string){$strlen = strlen($string);for($j=0;$j<$strlen;$j++){$num = ord($string{$j});if($num > 31) $chr = $string{$j}; else $chr = " ";printf("%4d: %08b : 0x%02x : %s \n",$j,$num,$num,$chr);}} }?>????? ? 以上是mqttphp服務端的代碼。
????? ? 二、思路詳解
????????說了這么多,萌新同學可能有點疑惑,這里為萌新同學們講解下思路。
????? ? 百度百科的mqtt有6大特性,最主要也是我們最必須了解的特性就是:使用發布/訂閱消息模式,提供一對多的消息模式
????? ? 這個怎么解釋呢?,微信玩過吧,里面不是有很多公眾號嘛,假設你是A,你朋友是B,有個公眾號C,A和B都訂閱了C,此時C在微信后臺發布文章,A和B都能接收到。
????? ? 而我們要實現的IM的思路是:每個人都訂閱自己,什么意思呢?就是A訂閱A,B訂閱B,C訂閱C,當A要給B發送消息的時候,A從客戶端使用ajax給服務器異步一條數據,例如:
msg = {to : "B",con : "你好啊",from : "A",time : 2015698856 }msg里面記錄了接受者(to),發送內容(con),發送者(from),發送時間戳(time),當服務端接收到消息后,對消息進行解析,獲取到to后對B進行消息發送,例如
include_once("phpMQTT.php"); $clientid = mt_rand(0,10000); $mqtt = new phpMQTT("127.0.0.1", 1883, $clientid."a"); //127.0.0.1是連接本機,1883端口為服務端mqtt默認代理端口,$clinetid不唯一即可 if ($mqtt->connect(true, NULL,username, password)) { //mqtt服務進行連接,username,password只要跟客戶端設置一致即可$mqtt->publish($to,json_encode($msg), 0); //$to就是解析到的to,此時就是B,$msg就是原信息$mqtt->close();$msg = '發送成功'; } else {$msg = '連接失敗'; }B登陸客戶端后,用mqttjs端對自己進行訂閱,然后將接受到的信息進行解析:例如
function subscribe(user){ //開啟mqtt客戶端,并訂閱自己,user為自己var client = new Paho.MQTT.Client("服務端地址/IP或者域名", Number(9001), uuid); //web客戶端開啟的端口必須是9001,每個客戶端的uuid必須要不一樣client.connect({onSuccess:function(){console.log("connect success");client.subscribe(user);//接收訂閱的主題},onFailure:function() {}, timeout:100,userName:'xxxx', //mqtt服務端連接驗證,用戶名密碼只要與服務端設置相同即可password:'xxxx' //mqtt服務端連接驗證,用戶名密碼只要與服務端設置相同即可});client.onMessageArrived = function(message) {console.log(message); //消息接收到的處理}client.onConnectionLost = function(responseObject) {console.log(responseObject) //連接斷開的處理} }一條完整的路線走了下來,一個簡易的Im就做出來了,此時還只是文本聊天,如果你想進行表情,圖片,語音,地圖,請移步我下一篇文章。
????? ? 嚴重警告:一個客戶端最好只開一個mqtt,如果開多了,就會莫名的崩潰掉,盡量使用邏輯手段去實現功能,而不要呆板的以數量來實現。
????? ? 三、效果展示:
????? ? 視頻連接地址:
? ? ? ??點擊打開鏈接
????????
????????
總結
以上是生活随笔為你收集整理的漫漫的webim(一) web实现简易im功能的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 论使命的重要性
- 下一篇: 产品各类型之间的关系