java网络通信:异步非阻塞I/O (NIO)
生活随笔
收集整理的這篇文章主要介紹了
java网络通信:异步非阻塞I/O (NIO)
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
首先是channel,是一個雙向的全雙工的通道,可同時讀寫,而輸入輸出流都是單工的,要么讀要么寫。Channel分為兩大類,分別是用于網絡數據的SelectableChannel和用于文件操作的FileChannel。
注意:在java NIO庫中,所有的數據都是用緩沖區處理,常用的是ByteBuffer。
多路復用器Selector:
Selector會不斷輪詢注冊在其上的Channel,如果某個Channel上又新的連接接入、讀和寫事件,這個Channel就處于就緒狀態,通過SelectorKey可以獲取就緒Channel的集合。底層使用了epoll()實現,沒有最大連接句柄的限制。
服務端代碼:
public class TimeServer {public static void main(String[] args) throws IOException {int port = 8080;MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);new Thread(timeServer, "NIO-MultiplexerTimeServer-001").start();} } public class MultiplexerTimeServer implements Runnable {private Selector selector;private ServerSocketChannel servChannel;private volatile boolean stop;public MultiplexerTimeServer(int port) {try {selector = Selector.open();servChannel = ServerSocketChannel.open();servChannel.configureBlocking(false);//設置非阻塞模式servChannel.socket().bind(new InetSocketAddress(port), 1024);servChannel.register(selector, SelectionKey.OP_ACCEPT);//將Channel注冊到selector,監聽accept事件System.out.println("The time server is start in port : " + port);} catch (IOException e) {e.printStackTrace();System.exit(1);}}public void stop() {this.stop = true;}@Overridepublic void run() {while (!stop) {try {selector.select(1000);//每隔一秒輪詢一次Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectedKeys.iterator();SelectionKey key = null;while (it.hasNext()) {//如果有新的客戶端接入key = it.next();it.remove();try {handleInput(key);//處理請求} catch (Exception e) {if (key != null) {key.cancel();if (key.channel() != null)key.channel().close();}}}} catch (Throwable t) {t.printStackTrace();}}// 多路復用器關閉后,所有注冊在上面的Channel和Pipe等資源都會被自動去注冊并關閉,所以不需要重復釋放資源if (selector != null)try {selector.close();} catch (IOException e) {e.printStackTrace();}}private void handleInput(SelectionKey key) throws IOException {if (key.isValid()) {// 處理新接入的請求消息if (key.isAcceptable()) {// 獲取客戶端的連接通道ServerSocketChannel ssc = (ServerSocketChannel) key.channel();SocketChannel sc = ssc.accept();sc.configureBlocking(false);//將新連接注冊到selector,并監聽讀事件 sc.register(selector, SelectionKey.OP_READ);}if (key.isReadable()) {//讀取通道數據寫入字節緩存SocketChannel sc = (SocketChannel) key.channel();ByteBuffer readBuffer = ByteBuffer.allocate(1024);int readBytes = sc.read(readBuffer);if (readBytes > 0) {readBuffer.flip();byte[] bytes = new byte[readBuffer.remaining()];readBuffer.get(bytes);//字節緩存寫入字節數組String body = new String(bytes, "UTF-8");System.out.println("The time server receive order : " + body);String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)? new java.util.Date(System.currentTimeMillis()).toString(): "BAD ORDER";doWrite(sc, currentTime);//向socketchannel寫入數據} else if (readBytes < 0) {// 對端鏈路關閉 key.cancel();sc.close();} }}}private void doWrite(SocketChannel channel, String response)throws IOException {if (response != null && response.trim().length() > 0) {byte[] bytes = response.getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);writeBuffer.put(bytes);writeBuffer.flip();channel.write(writeBuffer);}} }客戶端:
public class TimeClient {public static void main(String[] args) {int port = 8080;new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001").start();} } public class TimeClientHandle implements Runnable {private String host;private int port;private Selector selector;private SocketChannel socketChannel;private volatile boolean stop;public TimeClientHandle(String host, int port) {this.host = host == null ? "127.0.0.1" : host;this.port = port;try {selector = Selector.open();socketChannel = SocketChannel.open();socketChannel.configureBlocking(false);} catch (IOException e) {e.printStackTrace();System.exit(1);}}@Overridepublic void run() {try {doConnect();} catch (IOException e) {e.printStackTrace();System.exit(1);}while (!stop) {try {selector.select(1000);Set<SelectionKey> selectedKeys = selector.selectedKeys();Iterator<SelectionKey> it = selectedKeys.iterator();SelectionKey key = null;while (it.hasNext()) {key = it.next();it.remove();try {handleInput(key);} catch (Exception e) {if (key != null) {key.cancel();if (key.channel() != null)key.channel().close();}}}} catch (Exception e) {e.printStackTrace();System.exit(1);}}// 多路復用器關閉后,所有注冊在上面的Channel和Pipe等資源都會被自動去注冊并關閉,所以不需要重復釋放資源if (selector != null)try {selector.close();} catch (IOException e) {e.printStackTrace();}}private void doConnect() throws IOException {// 如果直接連接成功,則注冊到多路復用器上,發送請求消息,讀應答if (!socketChannel.connect(new InetSocketAddress(host, port))) {socketChannel.register(selector, SelectionKey.OP_READ);doWrite(socketChannel);} else//沒有直接連接成功,不代表失敗,而是說明服務器還沒有返回TCP握手的應答消息,所以注冊OP_CONNECT事件,監聽消息。 socketChannel.register(selector, SelectionKey.OP_CONNECT); }private void handleInput(SelectionKey key) throws IOException {if (key.isValid()) {// 判斷是否連接成功SocketChannel sc = (SocketChannel) key.channel();if (key.isConnectable()) {//判斷是否有連接事件,是的話,說明未連接if (sc.finishConnect()) {//再連接 sc.register(selector, SelectionKey.OP_READ);doWrite(sc);} elseSystem.exit(1);// 連接失敗,進程退出 }if (key.isReadable()) {ByteBuffer readBuffer = ByteBuffer.allocate(1024);int readBytes = sc.read(readBuffer);if (readBytes > 0) {readBuffer.flip();byte[] bytes = new byte[readBuffer.remaining()];readBuffer.get(bytes);String body = new String(bytes, "UTF-8");System.out.println("Now is : " + body);this.stop = true;} else if (readBytes < 0) {// 對端鏈路關閉 key.cancel();sc.close();}}}}private void doWrite(SocketChannel sc) throws IOException {byte[] req = "QUERY TIME ORDER".getBytes();ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);writeBuffer.put(req);writeBuffer.flip();sc.write(writeBuffer);if (!writeBuffer.hasRemaining())System.out.println("Send order 2 server succeed.");} }?
轉載于:https://www.cnblogs.com/nazhizq/p/6538708.html
總結
以上是生活随笔為你收集整理的java网络通信:异步非阻塞I/O (NIO)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 常用PHP函数整理!
- 下一篇: Andriod SDK Manager国