二、网络编程
一、linux網絡I/O模型
1、同步和異步、阻塞和非阻塞
1.1、同步和異步
關注的是消息的通知機制
- 同步
調用方主動等待結果的返回 - 異步
調用方不需要主動等待結果的返回,通過其他方式:回調函數、狀態通知等通知
1.2 阻塞和非阻塞
關注的是等待結果返回調用方的狀態
- 阻塞
結果返回之前,當前線程被掛起,不做任務事情 - 非阻塞
結果返回之前,當前線程不阻塞,可做其他事情
1.3 組合含義
- 同步阻塞
主動等待調用返回,期間不做任何事情,調用方被掛起 - 同步非阻塞
主動等待調用方返回,期間可以做其他事情,但是還需要輪訓去查看調用結果是否返回 - 異步阻塞
不需要主動等待調用方返回,但是阻塞。很少用到 - 異步非阻塞
不需要主動等待調用方返回,期間可以做其他事情。
2、linux5中I/O模型
2.1 阻塞I/O
應用程序調用一個 IO 函數,導致應用程序阻塞,等待數據準備好。 如果數 據沒有準備好,一直等待…數據準備好了,從內核拷貝到用戶空間,IO 函數返回 成功指示。
2.2 非阻塞I/O
我們把一個 SOCKET 接口設置為非阻塞就是告訴內核,當所請求的 I/O 操作 無法完成時,不要將進程睡眠,而是返回一個錯誤。這樣我們的 I/O 操作函數將 不斷的測試數據是否已經準備好,如果沒有準備好,繼續測試,直到數據準備好 為止。在這個不斷測試的過程中,會大量的占用 CPU 的時間
2.3 I/O復用
主要是 select 和 epoll 兩個系統調用;對一個 IO 端口,兩次調用,兩 次返回,比阻塞 IO 并沒有什么優越性;關鍵是能實現同時對多個 IO 端口進行監聽;
當用戶進程調用了 select,那么整個進程會被 block;而同時,kernel 會“監 視”所有 select 負責的 socket;當任何一個 socket 中的數據準備好了,select 就會返回
2.4 信號驅動I/O
首先我們允許套接口進行信號驅動 I/O,并安裝一個信號處理函數,進程繼續 運行并不阻塞。當數據準備好時,進程會收到一個 SIGIO 信號,可以在信號處理 函數中調用 I/O 操作函數處理數據。
2.5 異步I/O
當一個異步過程調用發出后,調用者不能立刻得到結果。實際處理這個調用 的部件在完成后,通過狀態、通知和回調來通知調用者的輸入輸出操作
2.6 5中I/O模型比較
3、select poll epoll區別
3.1 所能打開的文件描述符個數(最大連接數)
- select
單個進程所能打開的最大連接數有 FD_SETSIZE 宏定義,其大 小是 32 個整數的大小(在 32 位的機器上,大小就是 3232,同 理 64 位機器上 FD_SETSIZE 為 3264) - poll
poll 本質上和 select 沒有區別,但是它沒有最大連接數的限 制,原因是它是基于鏈表來存儲的 - epoll
能達到系統允許打開的最大文件描述符數,1G 內存的機器上可以打開 10 萬左右的連接,2G 內存的機器可以打開 20 萬左右的連接
3.2 性能方法
- 使用效率
select、poll每次都將所有的文件描述符(就緒的和未就緒的)返回,所以應用程序檢索就緒文件描述符的時間復雜度為O(n),epoll通過events參數返回所有就緒的文件描述符,應用程序檢索就緒文件描述符的時間復雜度為O(1) - 內核效率
select和poll采用輪詢的方式:即每次都需要掃描整個注冊的文件描述符集合,并將其中就緒的文件描述符返回給用戶程序,因此,內核中檢測就緒文件描述符的算法時間復雜度為O(n), epoll則采取回調的方式,內核檢測到就緒文件描述符,就觸發回調函數,將文件描述符及發生的事件插入內核就緒事件隊列,因此,epoll在內核中檢測就緒文件描述符的算法時間復雜度為O(1) - 數據拷貝
epoll 通過內核和用戶空間共享一塊內存來實現的
總結
- 表面上看 epoll 的性能最好,但是在連接數少并且連接都十分活躍的情況 下,select 和 poll 的性能可能比 epoll 好,畢竟 epoll 的通知機制需要很多函數回 調。
- select 低效是因為每次它都需要輪詢。但低效也是相對的,視情況而定, 也可通過良好的設計改善
4、java語言下的網絡編程demo(BIO)
4.1 Service
package cn.enjoyedu.bio;import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket;public class Server {public static void main(String[] args) throws IOException {/*服務器必備*/ServerSocket serverSocket = new ServerSocket();/*綁定監聽端口*/serverSocket.bind(new InetSocketAddress(10001));System.out.println("Server start.......");while(true){new Thread(new ServerTask(serverSocket.accept())).start();}}private static class ServerTask implements Runnable{private Socket socket = null;public ServerTask(Socket socket) {this.socket = socket;}@Overridepublic void run() {/*拿和客戶端通訊的輸入輸出流*/try(ObjectInputStream inputStream= new ObjectInputStream(socket.getInputStream());ObjectOutputStream outputStream= new ObjectOutputStream(socket.getOutputStream())){/*服務器的輸入*/String userName = inputStream.readUTF();System.out.println("Accept clinet message:"+userName);outputStream.writeUTF("Hello,"+userName);outputStream.flush();}catch (Exception e){e.printStackTrace();}finally {try {socket.close();} catch (IOException e) {e.printStackTrace();}}}}}4.2 Client
package cn.enjoyedu.bio;import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.InetSocketAddress; import java.net.Socket;public class Client {public static void main(String[] args) throws IOException {//客戶端啟動必備Socket socket = null;//實例化與服務端通信的輸入輸出流ObjectOutputStream output = null;ObjectInputStream input = null;//服務器的通信地址InetSocketAddress addr= new InetSocketAddress("127.0.0.1",10001);try{socket = new Socket();/*連接服務器*/socket.connect(addr);output = new ObjectOutputStream(socket.getOutputStream());input = new ObjectInputStream(socket.getInputStream());/*向服務器輸出請求*/output.writeUTF("Mark");output.flush();//接收服務器的輸出System.out.println(input.readUTF());}finally{if (socket!=null) socket.close();if (output!=null) output.close();if (input!=null) input.close();}}}5、BIO實現一個RPC框架
5.1 RPC實現需要的步驟
- 代理問題
動態代理-見代碼 - 序列化問題
Serializable - 通訊問題
BIO通訊 - 登記的服務的實例化問題
反射
5.2 client
1、客戶端通過動態代理調用
package cn.enjoyedu.rpc.client;import cn.enjoyedu.rpc.service.SendSms; import cn.enjoyedu.rpc.service.StockService; import cn.enjoyedu.rpc.vo.UserInfo; import cn.enjoyedu.rpc.client.rpc.RpcClientFrame;/***@author 王樂**類說明:rpc的客戶端,調用遠端服務*/ public class RpcClient {public static void main(String[] args) {UserInfo userInfo= new UserInfo("Mark","Mark@xiangxue.com");SendSms sendSms = RpcClientFrame.getRemoteProxyObj(SendSms.class,"127.0.0.1",9189);System.out.println("Send mail: "+ sendSms.sendMail(userInfo));} }2、客戶端動態代理類
package cn.enjoyedu.rpc.client.rpc;import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.Socket;/***@author 王樂***類說明:rpc框架的客戶端代理部分*/ public class RpcClientFrame {//遠程代理對象public static <T> T getRemoteProxyObj(final Class<?> serviceInterface,String hostname,int port){final InetSocketAddress addr= new InetSocketAddress(hostname,port);return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(),new Class<?>[]{serviceInterface},new DynProxy(serviceInterface,addr));}//動態代理類private static class DynProxy implements InvocationHandler {private final Class<?> serviceInterface;private final InetSocketAddress addr;public DynProxy(Class<?> serviceInterface, InetSocketAddress addr) {this.serviceInterface = serviceInterface;this.addr = addr;}@Overridepublic Object invoke(Object proxy, Method method, Object[] args)throws Throwable {Socket socket = null;ObjectOutputStream output = null;ObjectInputStream input = null;try{socket = new Socket();socket.connect(addr);output = new ObjectOutputStream(socket.getOutputStream());output.writeUTF(serviceInterface.getName());//方法所在的類output.writeUTF(method.getName());//方法的名output.writeObject(method.getParameterTypes());//方法的入參類型output.writeObject(args);output.flush();input = new ObjectInputStream(socket.getInputStream());return input.readObject();}finally{if (socket!=null) socket.close();if (output!=null) output.close();if (input!=null) input.close();}}}}5.3 服務端
package cn.enjoyedu.rpc.server;import cn.enjoyedu.rpc.service.SendSms; import cn.enjoyedu.rpc.service.impl.SendSmsImpl; import cn.enjoyedu.rpc.server.rpc.RpcServerFrame;/***@author 王樂**類說明:rpc的服務端,提供服務*/ public class SmsRpcServer {public static void main(String[] args) {new Thread(new Runnable() {public void run() {try{RpcServerFrame serviceServer = new RpcServerFrame(9189);serviceServer.registerSerive(SendSms.class.getName(),SendSmsImpl.class);serviceServer.startService();}catch(Exception e){e.printStackTrace();}}}).start();} } package cn.enjoyedu.rpc.server.rpc;import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors;/***@author 王樂**類說明:rpc框架的服務端部分*/ public class RpcServerFrame {private static ExecutorService executorService= Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());//服務的注冊中心private static final Map<String,Class> serviceHolder= new HashMap<>();//服務的端口號private int port;public RpcServerFrame(int port) {this.port = port;}//服務注冊public void registerSerive(String className,Class impl){serviceHolder.put(className,impl);}//處理服務請求任務private static class ServerTask implements Runnable{private Socket client = null;public ServerTask(Socket client){this.client = client;}public void run() {try(ObjectInputStream inputStream =new ObjectInputStream(client.getInputStream());ObjectOutputStream outputStream =new ObjectOutputStream(client.getOutputStream())){//方法所在類名接口名String serviceName = inputStream.readUTF();//方法的名字String methodName = inputStream.readUTF();//方法的入參類型Class<?>[] parmTypes = (Class<?>[]) inputStream.readObject();//方法入參的值Object[] args = (Object[]) inputStream.readObject();Class serviceClass = serviceHolder.get(serviceName);if (serviceClass == null){throw new ClassNotFoundException(serviceName+" Not Found");}Method method = serviceClass.getMethod(methodName,parmTypes);Object result = method.invoke(serviceClass.newInstance(),args);outputStream.writeObject(result);outputStream.flush();}catch(Exception e){e.printStackTrace();}finally {try {client.close();} catch (IOException e) {e.printStackTrace();}}}}//啟動RPC服務public void startService() throws IOException{ServerSocket serverSocket = new ServerSocket();serverSocket.bind(new InetSocketAddress(port));System.out.println("RPC server on:"+port+":運行");try{while(true){executorService.execute(new ServerTask(serverSocket.accept()));}}finally {serverSocket.close();}}}client運行截圖
service運行截圖
總結
- 上一篇: 32位程序和64位程序这些区别你知道吗?
- 下一篇: 腾讯技术团队整理,年度好文轻松彻底入门