Kafka源码深度解析-序列3 -Producer -Java NIO
原文地址
在上一篇我們分析了Metadata的更新機制,其中涉及到一個問題,就是Sender如何跟服務器通信,也就是網絡層。同很多Java項目一樣,Kafka client的網絡層也是用的Java NIO,然后在上面做了一層封裝。
下面首先看一下,在Sender和服務器之間的部分:
可以看到,Kafka client基于Java NIO封裝了一個網絡層,這個網絡層最上層的接口是KakfaClient。其層次關系如下:?
在本篇中,先詳細對最底層的Java NIO進行講述。
NIO的4大組件
Buffer與Channel
Channel: 在通常的Java網絡編程中,我們知道有一對Socket/ServerSocket對象,每1個socket對象表示一個connection,ServerSocket用于服務器監聽新的連接。?
在NIO中,與之相對應的一對是SocketChannel/ServerSocketChannel。
下圖展示了SocketChannel/ServerSocketChannel的類繼承層次
public?interface?Channel?extends?Closeable?{public?boolean?isOpen();????public?void?close()?throws?IOException; }public?interface?ReadableByteChannel?extends?Channel?{public?int?read(ByteBuffer?dst)?throws?IOException; }public?interface?WritableByteChannel?extends?Channel?{public?int?write(ByteBuffer?src)?throws?IOException; }123456789101112從代碼可以看出,一個Channel最基本的操作就是read/write,并且其傳進去的必須是ByteBuffer類型,而不是普通的內存buffer。
Buffer:在NIO中,也有1套圍繞Buffer的類繼承層次,在此就不詳述了。只需知道Buffer就是用來封裝channel發送/接收的數據。
Selector
Selector的主要目的是網絡事件的 loop 循環,通過調用selector.poll,不斷輪詢每個Channel上讀寫事件
SelectionKey
SelectionKey用來記錄一個Channel上的事件集合,每個Channel對應一個SelectionKey。?
SelectionKey也是Selector和Channel之間的關聯,通過SelectionKey可以取到對應的Selector和Channel。
關于這4大組件的協作、配合,下面來詳細講述。
4種網絡IO模型
epoll與IOCP
在《Unix環境高級編程》中介紹了以下4種IO模型(實際不止4種,但常用的就這4種):
阻塞IO: read/write的時候,阻塞調用
非阻塞IO: read/write,沒有數據,立馬返回,輪詢
IO復用:read/write一次都只能監聽一個socket,但對于服務器來講,有成千上完個socket連接,如何用一個函數,可以監聽所有的socket上面的讀寫事件呢?這就是IO復用模型,對應linux上面,就是select/poll/epoll3種技術。
異步IO:linux上沒有,windows上對應的是IOCP。
Reactor模式 vs. Preactor模式
相信很多人都聽說過網絡IO的2種設計模式,關于這2種模式的具體闡述,可以自行google之。
在此處,只想對這2種模式做一個“最通俗的解釋“:
Reactor模式:主動模式,所謂主動,是指應用程序不斷去輪詢,問操作系統,IO是否就緒。Linux下的select/poll/epooll就屬于主動模式,需要應用程序中有個循環,一直去poll。?
在這種模式下,實際的IO操作還是應用程序做的。
Proactor模式:被動模式,你把read/write全部交給操作系統,實際的IO操作由操作系統完成,完成之后,再callback你的應用程序。Windows下的IOCP就屬于這種模式,再比如C++ Boost中的Asio庫,就是典型的Proactor模式。
epoll的編程模型--3個階段
在Linux平臺上,Java NIO就是基于epoll來實現的。所有基于epoll的框架,都有3個階段:?
注冊事件(connect,accept,read,write), 輪詢IO是否就緒,執行實際IO操作。
下面的代碼展示了在linux下,用c語言epoll編程的基本框架:
//階段1:?調用epoll_ctl(xx)?注冊事件for(?;?;?){nfds?=?epoll_wait(epfd,events,20,500);?????//階段2:輪詢所有的socketfor(i=0;i<nfds;++i)??//處理輪詢結果{????????????if(events[i].data.fd==listenfd)?//accept事件就緒{connfd?=?accept(listenfd,(sockaddr?*)&clientaddr,?&clilen);?//階段3:執行實際的IO操作,acceptev.data.fd=connfd;ev.events=EPOLLIN|EPOLLET;epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);?//回到階段1:重新注冊}????????????else?if(?events[i].events&EPOLLIN?)??//讀就緒{n?=?read(sockfd,?line,?MAXLINE))?<?0????//階段3:執行實際的io操作ev.data.ptr?=?md;?????ev.events=EPOLLOUT|EPOLLET;epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);?//回到階段1:重新注冊事件}????????????else?if(events[i].events&EPOLLOUT)?//寫就緒{????????????????struct?myepoll_data*?md?=?(myepoll_data*)events[i].data.ptr;????sockfd?=?md->fd;send(?sockfd,?md->ptr,?strlen((char*)md->ptr),?0?);????????//階段3:?執行實際的io操作ev.data.fd=sockfd;ev.events=EPOLLIN|EPOLLET;epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);?//回到階段1,重新注冊事件}????????????else{????????????????//其他的處理}}}12345678910111213141516171819202122232425262728293031323334353637同樣, NIO中的Selector同樣有以下3個階段,下面把Selector和epoll的使用做個對比:
可以看到,2者只是寫法不同,同樣的, 都有這3個階段。
下面的表格展示了connect, accept, read, write 這4種事件,分別在這3個階段對應的函數:
下面看一下Kafka client中Selector的核心實現:
????@Overridepublic?void?poll(long?timeout)?throws?IOException?{。。。clear();?//清空各種狀態if?(hasStagedReceives())timeout?=?0;long?startSelect?=?time.nanoseconds();int?readyKeys?=?select(timeout);??//輪詢long?endSelect?=?time.nanoseconds();currentTimeNanos?=?endSelect;this.sensors.selectTime.record(endSelect?-?startSelect,?time.milliseconds());if?(readyKeys?>?0)?{????????????Set<SelectionKey>?keys?=?this.nioSelector.selectedKeys();Iterator<SelectionKey>?iter?=?keys.iterator();while?(iter.hasNext())?{SelectionKey?key?=?iter.next();iter.remove();KafkaChannel?channel?=?channel(key);//?register?all?per-connection?metrics?at?oncesensors.maybeRegisterConnectionMetrics(channel.id());lruConnections.put(channel.id(),?currentTimeNanos);try?{if?(key.isConnectable())?{??//有連接事件channel.finishConnect();this.connected.add(channel.id());this.sensors.connectionCreated.record();}if?(channel.isConnected()?&&?!channel.ready())?channel.prepare();?//這個只有需要安全檢查的SSL需求,普通的不加密的channel,prepare()為空實現if?(channel.ready()?&&?key.isReadable()?&&?!hasStagedReceive(channel))?{?//讀就緒NetworkReceive?networkReceive;while?((networkReceive?=?channel.read())?!=?null)?addToStagedReceives(channel,?networkReceive);?//實際的讀動作}if?(channel.ready()?&&?key.isWritable())?{??//寫就緒Send?send?=?channel.write();?//實際的寫動作if?(send?!=?null)?{this.completedSends.add(send);this.sensors.recordBytesSent(channel.id(),?send.size());}}????????????????????/*?cancel?any?defunct?sockets?*/if?(!key.isValid())?{close(channel);this.disconnected.add(channel.id());}}?catch?(Exception?e)?{String?desc?=?channel.socketDescription();if?(e?instanceof?IOException)log.debug("Connection?with?{}?disconnected",?desc,?e);elselog.warn("Unexpected?error?from?{};?closing?connection",?desc,?e);close(channel);this.disconnected.add(channel.id());}}}addToCompletedReceives();long?endIo?=?time.nanoseconds();this.sensors.ioTime.record(endIo?-?endSelect,?time.milliseconds());maybeCloseOldestConnection();}1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071epoll和selector在注冊上的差別-LT&ET模式
LT & ET
我們知道,epoll里面有2種模式:LT(水平觸發)和 ET(邊緣觸發)。水平觸發,又叫條件觸發;邊緣觸發,又叫狀態觸發。這2種到底有什么區別呢?
在這里就要引入socket的“讀/寫緩沖區”的概念了:
水平觸發(條件觸發):讀緩沖區只要不為空,就一直會觸發讀事件;寫緩沖區只要不滿,就一直會觸發寫事件。這個比較符合編程習慣,也是epoll的缺省模式。
邊緣觸發(狀態觸發):讀緩沖區的狀態,從空轉為非空的時候,觸發1次;寫緩沖區的狀態,從滿轉為非滿的時候,觸發1次。比如你發送一個大文件,把寫緩存區塞滿了,之后緩存區可以寫了,就會發生一次從滿到不滿的切換。
通過分析,我們可以看出:?
對于LT模式,要避免“寫的死循環”問題:寫緩沖區為滿的概率很小,也就是“寫的條件“會一直滿足,所以如果你注冊了寫事件,沒有數據要寫,但它會一直觸發,所以在LT模式下,寫完數據,一定要取消寫事件;
對應ET模式,要避免“short read”問題:比如你收到100個字節,它觸發1次,但你只讀到了50個字節,剩下的50個字節不讀,它也不會再次觸發,此時這個socket就廢了。因此在ET模式,一定要把“讀緩沖區”的數據讀完。
另外一個關于LT和ET的區別是:LT適用于阻塞和非阻塞IO, ET只適用于非阻塞IO。
還有一個說法是ET的性能更高,但編程難度更大,容易出錯。到底ET的性能,是不是一定比LT高,這個有待商榷,需要實際的測試數據來說話。
上面說了,epoll缺省使用的LT模式,而Java NIO用的就是epoll的LT模式。下面就來分析一下Java NIO中connect/read/write事件的處理。
connect事件的注冊
//Selectorpublic?void?connect(String?id,?InetSocketAddress?address,?int?sendBufferSize,?int?receiveBufferSize)?throws?IOException?{????????if?(this.channels.containsKey(id))????????????throw?new?IllegalStateException("There?is?already?a?connection?for?id?"?+?id);SocketChannel?socketChannel?=?SocketChannel.open();。。。????????try?{socketChannel.connect(address);}?catch?(UnresolvedAddressException?e)?{socketChannel.close();????????????throw?new?IOException("Can't?resolve?address:?"?+?address,?e);}?catch?(IOException?e)?{socketChannel.close();????????????throw?e;}SelectionKey?key?=?socketChannel.register(nioSelector,?SelectionKey.OP_CONNECT);??//構造channel的時候,注冊connect事件KafkaChannel?channel?=?channelBuilder.buildChannel(id,?key,?maxReceiveSize);key.attach(channel);????????this.channels.put(id,?channel);}123456789101112131415161718192021connect事件的取消
//在上面的poll函數中,connect事件就緒,也就是指connect連接完成,連接簡歷if?(key.isConnectable())?{??//有連接事件channel.finishConnect();?...}?//PlainTransportLayerpublic?void?finishConnect()?throws?IOException?{socketChannel.finishConnect();??//調用channel的finishConnect()key.interestOps(key.interestOps()?&?~SelectionKey.OP_CONNECT?|?SelectionKey.OP_READ);?//取消connect事件,新加read事件組冊}1234567891011read事件的注冊
從上面也可以看出,read事件的注冊和connect事件的取消,是同時進行的
read事件的取消
因為read是要一直監聽遠程,是否有新數據到來,所以不會取消,一直監聽。并且因為是LT模式,只要“讀緩沖區”有數據,就會一直觸發。
write事件的注冊
//Selectorpublic?void?send(Send?send)?{KafkaChannel?channel?=?channelOrFail(send.destination());????????try?{channel.setSend(send);}?catch?(CancelledKeyException?e)?{????????????this.failedSends.add(send.destination());close(channel);}}//KafkaChannelpublic?void?setSend(Send?send)?{????????if?(this.send?!=?null)????????????throw?new?IllegalStateException("Attempt?to?begin?a?send?operation?with?prior?send?operation?still?in?progress.");????????this.send?=?send;????????this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);??//每調用一次Send,注冊一次Write事件}123456789101112131415161718Write事件的取消
//上面的poll函數里面????????????????????if?(channel.ready()?&&?key.isWritable())?{?//write事件就緒Send?send?=?channel.write();?//在這個write里面,取消了write事件????????????????????????if?(send?!=?null)?{this.completedSends.add(send);this.sensors.recordBytesSent(channel.id(),?send.size());}}private?boolean?send(Send?send)?throws?IOException?{????????send.writeTo(transportLayer);????????if?(send.completed())transportLayer.removeInterestOps(SelectionKey.OP_WRITE);??//取消write事件????????return?send.completed();}???????????????????1234567891011121314151617總結一下:?
(1)“事件就緒“這個概念,對于不同事件類型,還是有點歧義的
read事件就緒:這個最好理解,就是遠程有新數據到來,需要去read。這里因為是LT模式,只要讀緩沖區有數據,會一直觸發。
write事件就緒:這個指什么呢? 其實指本地的socket緩沖區有沒有滿。沒有滿的話,就會一直觸發寫事件。所以要避免”寫的死循環“問題,寫完,要取消寫事件。
connect事件就緒: 指connect連接完成
accept事件就緒:有新的連接進來,調用accept處理
(2)不同類型事件,處理方式是不一樣的:
connect事件:注冊1次,成功之后,就取消了。有且僅有1次
read事件:注冊之后不取消,一直監聽
write事件: 每調用一次send,注冊1次。send成功,取消注冊
轉載于:https://blog.51cto.com/redleafstudios/2068316
總結
以上是生活随笔為你收集整理的Kafka源码深度解析-序列3 -Producer -Java NIO的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 你也许不知道的Vuejs - 前言
- 下一篇: 数据结构免费课程