架构师讲解Java中websocket的应用
這篇文章主要來介紹一下在java項目中,特別是java web項目中websocket的應用。
場景:我做了一個商城系統,跟大多數商城系統,分為客戶端和后臺,客戶端供客戶瀏覽,下單,購買,后臺主要管理商品,處理訂單,發貨等。我現在要實現的功能是,當客戶端有客戶下單,并且支付完成以后,主動推送消息給后臺,讓后臺的人知道,好去處理發貨等事宜。
首先,我們要知道websocket
是一個連接,這個連接是客戶端(頁面)與服務端之間的連接,所以我們要分兩部分來完成這個連接,服務端代碼和客戶端代碼。
1.首先,在pom.xml引入如下jar包。
<!-- websocket --><dependency><groupId>org.java-websocket</groupId><artifactId>Java-WebSocket</artifactId><version>1.3.0</version></dependency>2.然后我們要知道的是,websocket是客戶端和服務端之間建立了一個連接,建立完連接以后,會生成一個websocket對象,我們可以用這個對象來執行發送,接收等操作。
但是這只是一個存在于客戶端與服務器之間的鏈接,換句話說,系統只能識別到這個websocket連接是對應于哪個頁面(瀏覽器
),而這個頁面在系統中是對應哪個用戶(數據庫中的用戶,或者根本就沒有對應任何用戶,即未登錄,只是一個游客),我們是無法從這個websocket對象中獲取的。
所以我們需要創建一個Map對象,用于將websocket對象和實際的user對象進行關聯,這樣為我們后續向特定的用戶推送消息做鋪墊。
為此,我們創建一個WsPool,即websocket連接池的類,該類用于管理現實中的用戶和websocket對象之間的關聯。代碼如下。
package com.xdx.websocket;import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set;import org.java_websocket.WebSocket;public class WsPool {private static final Map<WebSocket, String> wsUserMap = new HashMap<WebSocket, String>();/*** 通過websocket連接獲取其對應的用戶* * @param conn* @return*/public static String getUserByWs(WebSocket conn) {return wsUserMap.get(conn);}/*** 根據userName獲取WebSocket,這是一個list,此處取第一個* 因為有可能多個websocket對應一個userName(但一般是只有一個,因為在close方法中,我們將失效的websocket連接去除了)* * @param user*/public static WebSocket getWsByUser(String userName) {Set<WebSocket> keySet = wsUserMap.keySet();synchronized (keySet) {for (WebSocket conn : keySet) {String cuser = wsUserMap.get(conn);if (cuser.equals(userName)) {return conn;}}}return null;}/*** 向連接池中添加連接* * @param inbound*/public static void addUser(String userName, WebSocket conn) {wsUserMap.put(conn, userName); // 添加連接}/*** 獲取所有連接池中的用戶,因為set是不允許重復的,所以可以得到無重復的user數組* * @return*/public static Collection<String> getOnlineUser() {List<String> setUsers = new ArrayList<String>();Collection<String> setUser = wsUserMap.values();for (String u : setUser) {setUsers.add(u);}return setUsers;}/*** 移除連接池中的連接* * @param inbound*/public static boolean removeUser(WebSocket conn) {if (wsUserMap.containsKey(conn)) {wsUserMap.remove(conn); // 移除連接return true;} else {return false;}}/*** 向特定的用戶發送數據* * @param user* @param message*/public static void sendMessageToUser(WebSocket conn, String message) {if (null != conn && null != wsUserMap.get(conn)) {conn.send(message);}}/*** 向所有的用戶發送消息* * @param message*/public static void sendMessageToAll(String message) {Set<WebSocket> keySet = wsUserMap.keySet();synchronized (keySet) {for (WebSocket conn : keySet) {String user = wsUserMap.get(conn);if (user != null) {conn.send(message);}}}}}3.接下來我們編寫websocket的主程序類
該類用于管理[websocket]的生命周期。該類繼承自WebSocketServer ,這是一個實現了runnable接口的類,他的構造函數需要傳入一個端口,所以我們需要為websocket服務指定一個端口,該類有四個要重載的方法,[onOpen()]方法在連接創建成功以后調用,onClose在連接關閉以后調用,[onError方法]在連接發生錯誤的時候調用(一般連接出錯以后觸發了onError,也會緊接著觸發onClose方法)。
onMessage方法在收到客戶端發來消息的時候觸發。我們可以在這個方法中處理客戶端所傳遞過來的消息。
package com.xdx.websocket;import java.net.InetSocketAddress;import org.java_websocket.WebSocket; import org.java_websocket.handshake.ClientHandshake; import org.java_websocket.server.WebSocketServer;public class WsServer extends WebSocketServer {public WsServer(int port) {super(new InetSocketAddress(port));}public WsServer(InetSocketAddress address) {super(address);}@Overridepublic void onOpen(WebSocket conn, ClientHandshake handshake) {// ws連接的時候觸發的代碼,onOpen中我們不做任何操作}@Overridepublic void onClose(WebSocket conn, int code, String reason, boolean remote) {//斷開連接時候觸發代碼userLeave(conn); System.out.println(reason);}@Overridepublic void onMessage(WebSocket conn, String message) { System.out.println(message);if(null != message &&message.startsWith("online")){String userName=message.replaceFirst("online", message);//用戶名userJoin(conn,userName);//用戶加入}else if(null != message && message.startsWith("offline")){userLeave(conn);}}@Overridepublic void onError(WebSocket conn, Exception ex) {//錯誤時候觸發的代碼System.out.println("on error");ex.printStackTrace();}/*** 去除掉失效的websocket鏈接* @param conn*/private void userLeave(WebSocket conn){WsPool.removeUser(conn);}/*** 將websocket加入用戶池* @param conn* @param userName*/private void userJoin(WebSocket conn,String userName){WsPool.addUser(userName, conn);}}上述onMessage()方法中,我們接收到客戶端傳過來的一個message(消息),而這個客戶端對應的websocket連接也被當成一個參數一起傳遞過來,我們通過message中攜帶的信息來判定這條信息對應是什么操作,如果是以online開頭,則說明它是一條上線的是信息,我們就把該websocket和其對應的userName存入ws連接池中,如果是以offline開頭,則說明websocket斷開了,我們也沒有必要維護這個websocket對應的map鍵值對,把它去除掉就好了。
4.如何在服務端開啟這個socket呢
我們上面有說到[WsServer](的父類WebSocketServer 實現了一個runnable方法,由此可見我們需要在一個線程中運行這個WsServer,事實上,WebSocketServer 有個start()方法,其源碼如下。
public void start() {if( selectorthread != null )throw new IllegalStateException( getClass().getName() + " can only be started once." );new Thread( this ).start();;}很顯然,它開了一個線程。所以我們可以用下面這樣的方法來開啟一個websocket線程。(該方法只是針對普通的java項目
,如果是web項目需要在項目啟動的時候運行websocket線程,后面第7點會講)
我們運行這個main方法,就開啟了websocket的服務端。
到目前為止,我們還沒有編寫任何客戶端的代碼。我們如何測試已經開了websocket服務端呢?網上有一個免費的測試工具。測試地址如下:[http://www.blue-zero.com/WebSocket/]
點擊進去,寫上我們的websocket服務地址。點擊連接。如圖所示。
如果連接成功,他就會顯示“連接已建立,正在等待數據……”
我們再文本框中輸入onlinexdx,然后點回車試試。
這就是模擬客戶端向服務端發送onlinexdx請求,按前面的介紹,它會觸發服務端的onMessage方法,我們看一下服務端控制臺。除了我們希望看到的onlinexdx這個message,還有一些@heart,果然觸發了這個方法。
@heart是這個免費頁面發送過來的心跳檢測包,目的是讓websocket一直處于連接狀態。
5.好了,接下來我們要來完成客戶端部分的功能。
我想要實現的是后臺用戶登錄以后,進入到后臺主頁的時候,執行websocket連接工作(就類似于上一步在免費頁面點擊連接按鈕),然后向服務端發送[“online+userName”]這條消息,用以觸發服務端的[onMessage方法,就可以將該[userName加入到連接池了。我們將這些代碼封裝[js])中。
var websocket = ''; var ajaxPageNum = 1; var last_health; var health_timeout = 10; var tDates = [], tData = []; var rightIndex; if ($('body').attr('userName') != '' && $('body').attr('ws') == 'yes') {var userName = $('body').attr('userName');if (window.WebSocket) {websocket = new WebSocket(encodeURI('ws://' + document.domain + ':8887'));websocket.onopen = function() {console.log('已連接');websocket.send("online"+userName);heartbeat_timer = setInterval(function() {keepalive(websocket)}, 60000);};websocket.onerror = function() {console.log('連接發生錯誤');};websocket.onclose = function() {console.log('已經斷開連接');initWs();};// 消息接收websocket.onmessage = function(message) {console.log(message)showNotice("新訂單", "您有新的逸品訂單,請及時處理!")};} else {alert("該瀏覽器不支持下單提醒。<br/>建議使用高版本的瀏覽器,<br/>如 IE10、火狐 、谷歌 、搜狗等");}} var initWs = function() {if (window.WebSocket) {websocket = new WebSocket(encodeURI('ws://' + document.domain + ':8887'));websocket.onopen = function() {console.log('已連接');websocket.send("online"+userName);heartbeat_timer = setInterval(function() {keepalive(websocket)}, 60000);};websocket.onerror = function() {console.log('連接發生錯誤');};websocket.onclose = function() {console.log('已經斷開連接');initWs();};// 消息接收websocket.onmessage = function(message) {console.log(message)showNotice("新訂單", "您有新的逸品訂單,請及時處理!")};} else {alert("該瀏覽器不支持下單提醒。<br/>建議使用高版本的瀏覽器,<br/>如 IE10、火狐 、谷歌 、搜狗等");} } var vadioTimeOut; function showNotice(title, content) {if (!title && !content) {title = "新訂單";content = "您有新的訂單,請及時處理!";}var iconUrl = "http://www.wonyen.com/favicon.ico";$("#myaudio")[0].play();// 消息播放語音var playTime = 1;var audio = document.createElement("myaudio");clearTimeout(vadioTimeOut);audio.addEventListener('ended', function() {vadioTimeOut = setTimeout(function() {playTime = playTime + 1;playTime < 3 ? audio.play() : clearTimeout(vadioTimeOut);}, 500);})if (Notification.permission == "granted") {var notification = new Notification(title, {body : content,icon : iconUrl});notification.onclick = function() {notification.close();};}}// 心跳包 function keepalive(ws) {var time = new Date();if (last_health != -1 && (time.getTime() - last_health > health_timeout)) {// ws.close();} else {if (ws.bufferedAmount == 0) {ws.send('~HC~');}} }頁面的主要代碼如下。首先是引入上述js.這個js必須放在頁面最后,因為它需要加載完頁面以獲取body的attr。
<!-- websocket --> <script src="./static/js/OtherJs/ws.js" type="text/javascript"></script>其次是在頁面的body處加入[userName和ws屬性。作為參數傳遞到js里面。還需要加入語音附件。
<body userName=${adminName} ws="yes"> <!-- 消息提示音 --><audio id="myaudio" src="./static/new_order.wav"></audio>上述js代碼清楚地展示了
在頁面端的生命周期,需要注意的是,我們在onopen()方法中,先是向服務端發送online+userName]
進行上線處理,緊接著開始調用心跳包,避免websocket]
長時間閑置而失效。
[onmessage]方法中,我們處理收到服務端推送過來的消息,然后以語音和彈出窗的形式提醒客戶端。
當然,我們在服務端可以通過封裝[message這個參數,把它變成一個json對象,給這個[json](對象一個[msgType]屬性,這樣就可以根據[msgType的不同來執行不同的前端代碼,比如[msgType=newOrder]_表示有新的訂單,就行新訂單到來的代碼,msgType=newUser表示有新的用戶注冊,就執行新用戶注冊的代碼。
這邊我沒做區分,因為我在服務端只發送用戶購買訂單的消息,所以所有的消息,我都執行showNotice這個方法。
6.最后一步,我們來編寫從服務端向客戶端發送消息的方法。
一旦有訂單到來,我們就向所有的后臺用戶發送消息。我們可以模擬一下這個動作,寫一個[controller]方法,調用[WsPoo的[sendMessageToAll]方法。
@ResponseBody@RequestMapping("sendWs")public String sendWs(String message) {WsPool.sendMessageToAll(message);return message;}7.對了,如果是web項目,
我們還需要在項目啟動的時候開啟websocket服務端線程,可以把啟動的動作放在一個filter中,然后在web.xml里面配置這個filter,使它在項目啟動時候運行。
package com.xdx.filter;import java.io.IOException;import javax.servlet.Filter; import javax.servlet.FilterChain; import javax.servlet.FilterConfig; import javax.servlet.ServletException; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse;import org.java_websocket.WebSocketImpl;import com.xdx.websocket.WsServer;public class StartFilter implements Filter {public void destroy() {}public void doFilter(ServletRequest arg0, ServletResponse arg1,FilterChain arg2) throws IOException, ServletException {}public void init(FilterConfig arg0) throws ServletException {this.startWebsocketInstantMsg();}/*** 啟動即時聊天服務*/public void startWebsocketInstantMsg() {WebSocketImpl.DEBUG = false;WsServer s;s = new WsServer(8887);s.start();} }在web.xml配置。
<!-- filter --><filter><filter-name>startFilter</filter-name><filter-class>com.xdx.filter.StartFilter</filter-class></filter>至此,我們完成了所有的代碼。
8.測試,先運行起項目。然后登陸以后,進入后臺主頁,可以看到,已經連接成功了。
然后我們從服務端向后臺發送一條消息,執行sendWs這個方法。在瀏覽器輸入http://192.168.1.185:8080/warrior/sendWs?message=xxx
會播放語音,并且彈出提示。證明成功了。
上述只是websocket的一個簡單的應用,在此基礎上,我們還可以做很多擴展的工作,比如做聊天室,股票實時價格顯示等,只要掌握了原理就好做了
總結
以上是生活随笔為你收集整理的架构师讲解Java中websocket的应用的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 为什么不能申购300开头的股票?
- 下一篇: 港股通如何打新股