nio的epoll和selector实现流程分析
一、NETTY底層使用的是NIO的selector和epoll進行實現的,select,poll,epoll都是IO多路復用的機制。I/O多路復用就是通過一種機制,一個進程可以監視多個描述符,一旦某個描述符就緒(一般是讀就緒或者寫就緒),能夠通知程序進行相應的讀寫操作。但select,poll,epoll本質上都是同步I/O,因為他們都需要在讀寫事件就緒后自己負責進行讀寫,也就是說把數據從內核拷貝到用戶空間是阻塞的,而異步I/O則無需自己負責進行讀寫,異步I/O的實現會負責把數據從內核拷貝到用戶空間。
二、測試實例
1.C++
/**\ 服務器端的源代碼*/#include <netinet/in.h> #include <sys/types.h> #include <sys/socket.h> #include <fcntl.h> #include <iostream> #include <signal.h> #include <sys/epoll.h>#define MAXFDS 256 #define EVENTS 100 #define PORT 8888int epfd; bool setNonBlock(int fd) {int flags = fcntl(fd, F_GETFL, 0);flags |= O_NONBLOCK;if(-1 == fcntl(fd, F_SETFL, flags))return false;return true; }int main(int argc, char *argv[], char *evp[]) {int fd, nfds, confd;int on = 1;char *buffer[512];struct sockaddr_in saddr, caddr;struct epoll_event ev, events[EVENTS];if(-1 == socket(AF_INET, SOCKSTREAM), 0){std::cout << "創建套接字出錯啦" << std::endl;return -1;}struct sigaction sig;sigemptyset(&sig.sa_mask);sig_handler = SIG_IGN;sigaction(SIGPIPE, &N > sig, NULL);epfd = epoll_create(MAXFDS);setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));memset(&saddr, 0, sizeof(saddr));saddr.sin_family = AF_INET;saddr.sin_port = htons((short)(PORT));saddr.sin_addr.s_addr = INADDR_ANY;if(-1 == bind(fd, (struct sockaddr *)&saddr, sizeof(saddr))){std::cout << "套接字不能綁定到服務器上" << std::endl;return -1;}if(-1 == listen(fd, 32)){std::cout << "監聽套接字的時候出錯了" << std::endl;return -1;}ev.data.fd = fd;ev.events = EPOLLIN;epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);while(true){nfds = epoll_wait(epfd, &events, MAXFDS, 0);for(int i = 0; i < nfds; ++ i){if(fd == events[i].data.fd){memset(&caddr, sizeof(caddr));cfd = accept(fd, (struct sockaddr *)&caddr, &sizeof(caddr));if(-1 == cfd){std::cout << "服務器接收套接字的時候出問題了" << std::endl;break;}setNonBlock(cfd);ev.data.fd = cfd;ev.events = EPOLLIN;epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);}else if(events[i].data.fd & EPOLLIN){bzero(&buffer, sizeof(buffer));std::cout << "服務器端要讀取客戶端發過來的消息" << std::endl;ret = recv(events[i].data.fd, buffer, sizeof(buffer), 0);if(ret < 0){std::cout << "服務器收到的消息出錯了" << endl;return -1;}std::cout << "接收到的消息為:" << (char *) buffer << std::endl;ev.data.fd = events[i].data.fd;ev.events = EPOLLOUT;epoll_ctl(epfd, EPOLL_CTL_MOD, events[i].data.fd, &ev);}else if(events[i].data.fd & EPOLLOUT){bzero(&buffer, sizeof(buffer));bcopy("The Author@: magicminglee@Hotmail.com", buffer, sizeof("The Author@: magicminglee@Hotmail.com"));ret = send(events[i].data.fd, buffer, strlen(buffer));if(ret < 0){std::cout << "服務器發送消息給客戶端的時候出錯啦" << std::endl;return -1;}ev.data.fd = events[i].data.fd;epoll_ctl(epfd, EPOLL_CTL_DEL, ev.data.fd, &ev);}}}if(fd > 0){shutdown(fd, SHUT_RDWR);close(fd);} }2.JAVA
package com.tpw.summaryday.nio;import lombok.Getter; import lombok.extern.slf4j.Slf4j;import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock;/*** <h3>summaryday</h3>* <p></p>** @author : lipengyao* @date : 2021-09-16 16:50:54**/ @Slf4j public class ServerReactor3 {public static final int workReactorNums = Runtime.getRuntime().availableProcessors() * 2;public static class WorkReactor {public static ExecutorService executorService = Executors.newFixedThreadPool(ServerReactor3.workReactorNums * 2);private Selector selector;private int reactorIndex;private int channelCnt;private Map<SocketChannel, ArrayDeque<String>> unWriteDataMap = new ConcurrentHashMap<>();private List<SocketChannel> waitRegisterChannels = new ArrayList<>();private Lock lock = new ReentrantLock();private int maxItemKeyCnt = 0;public WorkReactor(int reactorIndex) throws IOException {this.selector = Selector.open();this.reactorIndex = reactorIndex;this.channelCnt = 0;select();log.debug("register init channelCnt:{},reactorIndex:{},selectionKey:{}", this.channelCnt, this.reactorIndex,this.selector);}public void register(SocketChannel socketChannel) {lock.lock();try{waitRegisterChannels.add(socketChannel);log.debug("register add socket channel waitRegisterChannels:{}",waitRegisterChannels.size());}finally {lock.unlock();}}public void registerAllWaitChannels(){lock.lock();try{if (!this.waitRegisterChannels.isEmpty()){log.debug("registerAllWaitChannels will register begin waitRegisterChannels:{}",waitRegisterChannels.size());for (int i = 0; i < this.waitRegisterChannels.size(); i++) {this.interRegister(this.waitRegisterChannels.get(i));}this.waitRegisterChannels.clear();log.debug("registerAllWaitChannels register end waitRegisterChannels:{}",waitRegisterChannels.size());}}finally {lock.unlock();}}public void interRegister(SocketChannel socketChannel){try {selector.wakeup();SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ /*| SelectionKey.OP_WRITE*/);int read = SelectionKey.OP_READ;this.channelCnt++;unWriteDataMap.put(socketChannel, new ArrayDeque<String>());log.debug("register channelCnt:{},reactorIndex:{},selectionKey:{}", this.channelCnt, this.reactorIndex, selectionKey);}catch ( Exception e){e.printStackTrace();}}public void select() throws ClosedChannelException {executorService.submit(() -> {log.debug("begin channelCnt:{},reactorIndex:{}", this.channelCnt, this.reactorIndex);while (true) {try {if (selector.select(10) <= 0) { // TimeUnit.MILLISECONDS.sleep(10); // log.debug(" has no event,continue,reactorIndex:{}", this.reactorIndex);this.registerAllWaitChannels();continue;}log.debug(" get some io event,will handle,reactorIndex:{}", this.reactorIndex);Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();Set<SelectionKey> allKeys = selector.keys();if (allKeys.size() > maxItemKeyCnt){maxItemKeyCnt = allKeys.size();}log.debug(" get select key--> selectionKeys:{},channelCnt:{},reactorIndex:{},allKeys:{},maxItemKeyCnt:{}",selectionKeys.size(), this.channelCnt, this.reactorIndex,allKeys.size(),maxItemKeyCnt);while (selectionKeyIterator.hasNext()) {SelectionKey t = selectionKeyIterator.next();selectionKeyIterator.remove();if (t.isReadable() && t.isValid()) {SocketChannel socketChannel = (SocketChannel) t.channel();ByteBuffer byteBuffer = ByteBuffer.allocate(1024);int count = socketChannel.read(byteBuffer);log.debug(" read socketHandler:{},count:{}", socketChannel.getRemoteAddress(),count);if (count < 0) {log.debug(" read error ,will close channel remote Address:{}", socketChannel.getRemoteAddress());t.cancel();socketChannel.close();this.channelCnt--;continue;}byteBuffer.flip();byte[] inputData = new byte[byteBuffer.limit()];byteBuffer.get(inputData);String body = new String(inputData);log.debug(" read ok , ,reactorIndex:{},data:{}", reactorIndex, body);if (socketChannel.isConnected() && socketChannel.isOpen()) {byteBuffer.clear();String response = "server rsp:" + body + "\r\n";log.debug(" read ok , ,response:{}", response); // unWriteDataMap.get(socketChannel).addLast(response);byte[] writeByets = response.getBytes("utf-8");byteBuffer.put(writeByets);byteBuffer.flip();socketChannel.write(byteBuffer);}} else if (t.isWritable()) {SocketChannel socketChannel = (SocketChannel) t.channel();while (!unWriteDataMap.get(socketChannel).isEmpty()) {String response = unWriteDataMap.get(socketChannel).pollFirst();log.debug(" write ok , ,response:{}", response);byte[] writeByets = response.getBytes("utf-8");ByteBuffer byteBuffer = ByteBuffer.allocate(1024);byteBuffer.put(writeByets);byteBuffer.flip();socketChannel.write(byteBuffer);}}selectionKeys.remove(t);}} catch (IOException e) {e.printStackTrace();} catch (Exception e) {e.printStackTrace();}}});}}@Getterpublic static class SocketHandler {private String name;private Object channel;public SocketHandler(String name, Object channel) {this.name = name;this.channel = channel;}}public void start() throws IOException {ExecutorService executorService = Executors.newFixedThreadPool(100);Selector selector = Selector.open();ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();serverSocketChannel.configureBlocking(false);int port = 7020;serverSocketChannel.bind(new InetSocketAddress(port));serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT, new SocketHandler("acceptHandler", serverSocketChannel));log.debug(" start bind success port:{}", port);int clientIndex = 0;WorkReactor[] workReactors = new WorkReactor[workReactorNums];for (int i = 0; i < workReactorNums; i++) {workReactors[i] = new WorkReactor(i);}while (selector.select() > 0) {Set<SelectionKey> selectionKeys = selector.selectedKeys();Iterator<SelectionKey> selectionKeyIterator = selectionKeys.iterator();while (selectionKeyIterator.hasNext()) {SelectionKey t = selectionKeyIterator.next();selectionKeys.remove(t);if (t.isAcceptable()) {ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) t.channel();SocketHandler socketHandler = (SocketHandler) t.attachment(); // log.debug(" accept socketHandler:{}", socketHandler.getName());SocketChannel socketChannel = serverSocketChannel1.accept();socketChannel.configureBlocking(false);int reactorNumIndex = clientIndex++ % workReactorNums;log.debug(" accept new channel remote Address:{},reactorNumIndex:{}", socketChannel.getRemoteAddress(), reactorNumIndex);workReactors[reactorNumIndex].register(socketChannel);}}}}public static void main(String[] args) {ServerReactor3 serverReactor = new ServerReactor3();try {serverReactor.start();} catch (IOException e) {e.printStackTrace();}} }1.這里面的accept-reactor 一個selector,處理所有的接入連接。就是我們主類的start函數,會不斷接收連接,然后將新獲取到的socketChannel注冊到work-reactor的處理類中。
2.work-reactor CPU核數selector 和CPU核心*2 的線程。所有的客戶端連接采用roundRobin方式均勻分配到這些線程上。這里面的work-reactor為WorkReactor類,每個類會創建一個selector,并處理部分的socketChannel連接,分擔任務。
3.注意:對一個selector的register和select等所有操作要放到同一個線程處理,否則會出現死鎖。像accept-reactor在注冊新連接到的socketChannel到work-reactor的selector時,不能直接注冊,需放在和work-reactor的select方法同一個線程處理,否則會出現競爭死鎖。
三、Selector.open初始化流程分析
1.首先會調用Selector.open()創建一個selector對象。注意,一般nio程序會部署在linux環境,所以我們查看linux下的JDK實現源碼(一般下載open-jdk的linux源碼進行分析)。
public static Selector open() throws IOException {return SelectorProvider.provider().openSelector(); }2.provider由sun.nio.ch.DefaultSelectorProvider.create()創建。create方法內部通過系統的名稱來創建創建SelectorProvider,在這里由于linux環境,創建了sun.nio.ch.EPollSelectorProvider
public static SelectorProvider create() {String osName = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));if (osName .equals("SunOS")) {return createProvider("sun.nio.ch.DevPollSelectorProvider");} else {return (SelectorProvider)(osName .equals("Linux") ? createProvider("sun.nio.ch.EPollSelectorProvider") : new PollSelectorProvider());} }3.繼續回到Selector的open()中,獲取到SelecProvider實例之后,繼續調用openSelector(),很自然進入EPollSelectorProvider的openSelector()方法
public AbstractSelector openSelector() throws IOException {return new EPollSelectorImpl(this);}4.EPollSelectorImpl為linux下的實現,繼承圖如下
?5.我們來看EPollSelectorImpl初始化做的事情,EPollArrayWrapper為對linux的epoll操作的三個接口epoll_create,epoll_ctl,epoll_wait進行邏輯封裝。
EPollSelectorImpl(SelectorProvider sp) throws IOException {super(sp);long pipeFds = IOUtil.makePipe(false);fd0 = (int) (pipeFds >>> 32);fd1 = (int) pipeFds;pollWrapper = new EPollArrayWrapper();pollWrapper.initInterrupt(fd0, fd1);fdToKey = new HashMap<>();}創建fd0,fd1,并且調用pollWrapper.initInterrupt是將這兩個本地描述符加到epoll的監控事件中,以便調用wakeUp函數時能跳出select函數,可以快速喚醒,結束等待或者阻塞,進而跳出循環或者注冊新的事件。
void initInterrupt(int fd0, int fd1) {outgoingInterruptFD = fd1;incomingInterruptFD = fd0;epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);}6.父類SelectorImp的構造方法。JDK中對于注冊到Selector上的IO事件關系是使用SelectionKey來表示,代表了Channel感興趣的事件,如Read,Write,Connect,Accept。內部初始化publicKeys和publicSelectedKeys,用到的容器是HashSet,前者用來保存所有的感興趣的事件,后者準備好的事件。publicKeys 和publicSelectedKeys 引用這包裝了一層權限控制,內部還是指向前面的HashSet。實際的堆內存中只有兩個HashSet對象。下面是父類SelectorImpl的屬性和構造方法:
protected SelectorImpl(SelectorProvider sp) {super(sp);keys = ConcurrentHashMap.newKeySet();selectedKeys = new HashSet<>();publicKeys = Collections.unmodifiableSet(keys);publicSelectedKeys = Util.ungrowableSet(selectedKeys);}7.EPollArrayWrapper完成了對epoll文件描述符的構建,以及對linux系統的epoll指令操縱的封裝。維護每次selection操作的結果,即epoll_wait結果的epoll_event數組。這里面創建了epoll_create返回的句柄,pollArray就是int epoll_wait ( int epfd, struct epoll_event* events, int maxevents, int timeout );這個方法中的epoll_event數組對象,用來接收IO改變的事件,由內核 進行修改此本地對象。
EPollArrayWrapper() throws IOException {// creates the epoll file descriptorepfd = epollCreate();// the epoll_event array passed to epoll_waitint allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;pollArray = new AllocatedNativeObject(allocationSize, true);pollArrayAddress = pollArray.address();// eventHigh needed when using file descriptors > 64kif (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)eventsHigh = new HashMap<>();}8.eventsLow,eventsHigh都是存放socket的FD所注冊關注的事件列表。?eventsLow以FD為數組下標,值為關注事件。eventsHigh則是以FD做為KEY,關注事件作為VALUEL,小于64K的FD放在低端事件數組,否則放在高端事件數組。
private final byte[] eventsLow = new byte[MAX_UPDATE_ARRAY_SIZE]; private Map<Integer,Byte> eventsHigh; 9.這里面還有幾個參數,updateCount表示已注冊事件的數目,updateDescriptors為更新事件的FD列表。registered表示這個FD是否已經注冊到EPOLL句柄中,主要是用來在epll_wait中注冊事件時作判斷使用,首先注冊則是要調用epoll_ctl(ctl_add),其它則是要調用epoll_ctl(ctl_modify),如果注銷了epoll_ctl(ctl_del) // number of file descriptors with registration changes pending private int updateCount;// file descriptors with registration changes pending private int[] updateDescriptors = new int[INITIAL_PENDING_UPDATE_SIZE]; // Used by release and updateRegistrations to track whether a file // descriptor is registered with epoll. private final BitSet registered = new BitSet();四、channel.register注冊流程分析
1.channel.register首先會調用基類的java.nio.channels.spi.AbstractSelectableChannel.register方法,
2.接著會調用selector.register方法,這個會調用selectorImpl這個基類的注冊方法。這里會創建一個SelectionKeyImpl,里面包裝了socketChannel,和當前的selector作為成員變量,并可以添加一個任意對象為附加數據。
protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {if (!(var1 instanceof SelChImpl)) {throw new IllegalSelectorException();} else {SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);var4.attach(var3);synchronized(this.publicKeys) {this.implRegister(var4);}var4.interestOps(var2);return var4;}}3.EPollSelectorImpl.implRegister,將channel對應的fd(文件描述符)和對應的SelectionKey放到fdToKey映射表中。fdToKey是一個map類型的結構,用來保存fd和key的映射關系。
 將channel對應的fd(文件描述符)添加到EPollArrayWrapper中,并強制初始化fd的事件為0 ( 強制初始更新事件為0,因為該事件可能存在于之前被取消過的注冊中。)
 將selectionKey放入到keys集合中。
4.EpollWrapper的add方法內部調用了setUpdateEvents方法,并且把第二個參數事件類型(events)設置為0,即為初始值。在setUpdateEvents中,把fd作為數組的下表,值為事件類型。如果fd大于64*1024,則把fd和事件類型存入eventsHigh中,就是上面講的EpollWrapper中的成員變量,兩個事件數組和HASHMAP.
void add(int fd) {// force the initial update events to 0 as it may be KILLED by a// previous registration.synchronized (updateLock) {assert !registered.get(fd);setUpdateEvents(fd, (byte)0, true);}}private void setUpdateEvents(int fd, byte events, boolean force) {if (fd < MAX_UPDATE_ARRAY_SIZE) {if ((eventsLow[fd] != KILLED) || force) {eventsLow[fd] = events;}} else {Integer key = Integer.valueOf(fd);if (!isEventsHighKilled(key) || force) {eventsHigh.put(key, Byte.valueOf(events));}}}注意,這里add時首先會判斷registered中是否已經有此FD,有則報錯,不能重復加入。
5.我們再回到第二步selectorImpl.register方法中,會調用SelectionKeyImpl.interestOps(var2);進行事件添加,接著調用到SelectionKeyImpl.nioInterestOps。
public SelectionKey nioInterestOps(int var1) {if ((var1 & ~this.channel().validOps()) != 0) {throw new IllegalArgumentException();} else {this.channel.translateAndSetInterestOps(var1, this);this.interestOps = var1;return this;}}6.上面的channel就是socketChannel,var1為關注的事件。sun.nio.ch.SocketChannelImpl.translateAndSetInterestOps 進行關注事件轉換和設置。
public void translateAndSetInterestOps(int var1, SelectionKeyImpl var2) {int var3 = 0;if ((var1 & 1) != 0) {var3 |= Net.POLLIN;}if ((var1 & 4) != 0) {var3 |= Net.POLLOUT;}if ((var1 & 8) != 0) {var3 |= Net.POLLCONN;}var2.selector.putEventOps(var2, var3);}這里面的1,4,8分別為SelectionKey.OP_READ,OP_WRITE,OP_CONNECT,把這三個轉換為Net的定義。
7.接著又調用了EpollSelectorImpl.putEventOps配置事件。
public void putEventOps(SelectionKeyImpl ski, int ops) {if (closed)throw new ClosedSelectorException();SelChImpl ch = ski.channel;pollWrapper.setInterest(ch.getFDVal(), ops);}8.接著調用了EpollWrapper的setInterest對FD設置關注事件。首先會判斷存放fd的數組updateDescriptors是否已滿,如果滿了,則進行擴容,在原來的基礎上加64,然后將fd保存到數組中。然后調用setUpdateEvents,設置fd的事件,在前面已經分析過一次。
void setInterest(int fd, int mask) {synchronized (updateLock) {// record the file descriptor and eventsint oldCapacity = updateDescriptors.length;if (updateCount == oldCapacity) {int newCapacity = oldCapacity + INITIAL_PENDING_UPDATE_SIZE;int[] newDescriptors = new int[newCapacity];System.arraycopy(updateDescriptors, 0, newDescriptors, 0, oldCapacity);updateDescriptors = newDescriptors;}updateDescriptors[updateCount++] = fd;// events are stored as bytes for efficiency reasonsbyte b = (byte)mask;assert (b == mask) && (b != KILLED);setUpdateEvents(fd, b, false);}}setUpdateEvent方法會將注冊的感興趣的事件和其對應的文件描述存儲到EPollArrayWrapper對象的eventsLow或eventsHigh中,這是給底層實現epoll_wait時使用的。同時selectorImpl.register還會將設置SelectionKey的interestOps字段,這是給我們程序員獲取使用的
這里會把新關注的事件的FD保存到updateDescriptors中。
五、selector.select注冊流程分析
1.首先觸發selectorImpl.select方法,為抽象基類的方法,這里面都使用了模板的設計模式,將具體的業務實現放在繼承類中。抽象基類實現框架的流程。
public int select(long var1) throws IOException {if (var1 < 0L) {throw new IllegalArgumentException("Negative timeout");} else {return this.lockAndDoSelect(var1 == 0L ? -1L : var1);}}private int lockAndDoSelect(long var1) throws IOException {synchronized(this) {synchronized(this.publicKeys) {synchronized(this.publicSelectedKeys) {var10000 = this.doSelect(var1);}}return var10000;}}protected abstract int doSelect(long var1) throws IOException;2.該方法會一直阻塞直到至少一個channel被選擇(即,該channel注冊的事件發生了為止,除非當前線程發生中斷或者selector的wakeup方法被調用。Selector.select方法最終調用的是EPollSelectorImpl的doSelect方法
protected int doSelect(long timeout) throws IOException {if (closed)throw new ClosedSelectorException();processDeregisterQueue();try {begin();pollWrapper.poll(timeout);} finally {end();}processDeregisterQueue();int numKeysUpdated = updateSelectedKeys();if (pollWrapper.interrupted()) {// Clear the wakeup pipepollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);synchronized (interruptLock) {pollWrapper.clearInterrupted();IOUtil.drain(fd0);interruptTriggered = false;}}return numKeysUpdated;}調用processDeregisterQueue方法,將cancel的selectionKey從selector中刪除,底層會調用epoll_ctl方法移除被epoll所監聽的文件描述符;
 begin和end方法主要是為了處理線程中斷,將線程的中斷轉化為Selector的wakeup方法,避免線程堵塞在IO操作上;
 通過fdToKey查找文件描述符對應的SelectionKey,并更新之。
3.繼續看poll的實現
int poll(long timeout) throws IOException {updateRegistrations();updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);for (int i=0; i<updated; i++) {if (getDescriptor(i) == incomingInterruptFD) {interruptedIndex = i;interrupted = true;break;}}return updated;}4.updateRegistrations方法?updateRegistrations()方法會將已經注冊到該selector的fd和事件(eventsLow或eventsHigh)通過調用epollCtl(epfd, opcode, fd, events),注冊到linux系統中。具體的實現如下:
private void updateRegistrations() {synchronized (updateLock) {int j = 0;while (j < updateCount) {int fd = updateDescriptors[j];short events = getUpdateEvents(fd);boolean isRegistered = registered.get(fd);int opcode = 0;if (events != KILLED) {if (isRegistered) {opcode = (events != 0) ? EPOLL_CTL_MOD : EPOLL_CTL_DEL;} else {opcode = (events != 0) ? EPOLL_CTL_ADD : 0;}if (opcode != 0) {epollCtl(epfd, opcode, fd, events);if (opcode == EPOLL_CTL_ADD) {registered.set(fd);} else if (opcode == EPOLL_CTL_DEL) {registered.clear(fd);}}}j++;}updateCount = 0;}}private byte getUpdateEvents(int fd) {if (fd < MAX_UPDATE_ARRAY_SIZE) {return eventsLow[fd];} else {Byte result = eventsHigh.get(Integer.valueOf(fd));// result should never be nullreturn result.byteValue();}}這里首先從updateDescriptors中獲取已注冊的FD,然后通過fd從event_low,event_high中獲取關注事件。再根據是否已注冊來判斷是調用ctl_add,還是ctl_modify,并配置到epoll事件模型中。
5.然后調用updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);進行IO事件等待,updated返回已更新的EPOLLEVENTS個數,并且內核會將新的IO事件放以pollArrayAddress(pollArray)。
6.繼承回到第二步doSelect,有IO事件返回后,會調用updateSelectedKeys將接收到的IO事件更新到EpollSelectorImpl的keys,selectkeys集合中。
private int updateSelectedKeys() {int entries = pollWrapper.updated;int numKeysUpdated = 0;for (int i=0; i<entries; i++) {int nextFD = pollWrapper.getDescriptor(i);SelectionKeyImpl ski = fdToKey.get(Integer.valueOf(nextFD));// ski is null in the case of an interruptif (ski != null) {int rOps = pollWrapper.getEventOps(i);if (selectedKeys.contains(ski)) {if (ski.channel.translateAndSetReadyOps(rOps, ski)) {numKeysUpdated++;}} else {ski.channel.translateAndSetReadyOps(rOps, ski);if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {selectedKeys.add(ski);numKeysUpdated++;}}}}return numKeysUpdated;}7.pollWrapper.getDescriptor從pollArray成員變量中根據索引和偏移位置獲取EPOLLEVENT對象數組中的索引對象的描述符數據,pollWrapper.getEventOps則是獲取新的事件。由于pollArray為本地對象,所以都是通過偏移位置去獲取值的。
int getEventOps(int i) {int offset = SIZE_EPOLLEVENT * i + EVENT_OFFSET;return pollArray.getInt(offset);}int getDescriptor(int i) {int offset = SIZE_EPOLLEVENT * i + FD_OFFSET;return pollArray.getInt(offset);}8.通過fdToKey的MAP對象獲取FD所對應的selectKey,后面根據EpollSelectorImpl的成員變量selectKeys集合對象是否包含此selectKey,包含則直接將關注事件轉換為應用層事件并設置到selectKeyImpl的interKeys關注事件成員變量中就可以,沒有包含,則先要加入到集合中。
9.接著調用selectkey.channel就是socketChannel的translateAndSetReadyOps
sun.nio.ch.SocketChannelImplpublic boolean translateAndSetReadyOps(int ops, SelectionKeyImpl ski) {return translateReadyOps(ops, 0, ski);}public boolean translateReadyOps(int ops, int initialOps, SelectionKeyImpl ski) {int intOps = ski.nioInterestOps();int oldOps = ski.nioReadyOps();int newOps = initialOps;if ((ops & Net.POLLNVAL) != 0) {// This should only happen if this channel is pre-closed while a// selection operation is in progress// ## Throw an error if this channel has not been pre-closedreturn false;}if ((ops & (Net.POLLERR | Net.POLLHUP)) != 0) {newOps = intOps;ski.nioReadyOps(newOps);return (newOps & ~oldOps) != 0;}boolean connected = isConnected();if (((ops & Net.POLLIN) != 0) &&((intOps & SelectionKey.OP_READ) != 0) && connected)newOps |= SelectionKey.OP_READ;if (((ops & Net.POLLCONN) != 0) &&((intOps & SelectionKey.OP_CONNECT) != 0) && isConnectionPending())newOps |= SelectionKey.OP_CONNECT;if (((ops & Net.POLLOUT) != 0) &&((intOps & SelectionKey.OP_WRITE) != 0) && connected)newOps |= SelectionKey.OP_WRITE;ski.nioReadyOps(newOps);return (newOps & ~oldOps) != 0;}這里面把Net(就是EPOLL底層)的IO事件轉換為SelectionKey的事件。然后又調用SelectKeyImpl的nioReadyOps再設置到關注事件成員變量readOps中。
sun.nio.ch.SelectionKeyImpl public void nioReadyOps(int ops) {readyOps = ops;}六、selector.selectedKeys()流程分析
1.這個方法比較簡單,直接返回SelectorImpl的publicSelectedKeys,這個就是上面EpollSelectorImpl中的selectedKeys的引用,就不用重復講了,在上面方法已經更新進去了。
public Set<SelectionKey> selectedKeys() {if (!this.isOpen() && !Util.atBugLevel("1.4")) {throw new ClosedSelectorException();} else {return this.publicSelectedKeys;}}部分摘自下面文章
 原文鏈接:https://blog.csdn.net/TheLudlows/article/details/82931478
總結
以上是生活随笔為你收集整理的nio的epoll和selector实现流程分析的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: netty 管道和handler的加载和
 - 下一篇: ServerBootstrap的启动流程