Hadoop中RPC机制
Hadoop中RPC機制
RPC(Remote Procedure Call Protocol)遠程過程調用協議,它是一種通過網絡從遠程計算機程序上請求服務,而不需要了解底層網絡技術的協議。Hadoop底層的交互都是通過rpc進行的。例如:datanode和namenode 、tasktracker和jobtracker、secondary namenode和namenode之間的通信都是通過rpc實現的。下面是rpc交互過程圖:?
1.客服端調用的總過程:
Hadoop的RPC客戶端代碼其實就一個類:org.apache.hadoop.ipc.Client。這個類使用Java的動態代理技術,生成服務器的業務接口的代理,通過socket將調用的業務方法和參數傳送到服務器端,并且等待服務器端的響應。
客戶端調用的序列圖如下:
?
?例如:TaskTracker請求與JobTracker的通信:
TaskTracker通過:
????? this.jobClient = (InterTrackerProtocol)
????????? RPC.waitForProxy(InterTrackerProtocol.class, InterTrackerProtocol.versionID,
?????????????????????? jobTrackAddr, this.fConf);
中的InterTrackerProtocol 去和JobTracker通信。
在RPC中通過:
?????? VersionedProtocol proxy =
????????????(VersionedProtocol) Proxy.newProxyInstance(
??????????? protocol.getClassLoader(), new Class[] { protocol },
??????????? new Invoker(addr, ticket, conf, factory));
產生一個動態代理完成JobTracker和TaskTracker之間的心跳交流。
2.客服端向服務器發送連接
??? 客戶端(C)要發起對服務端(S)方法的調用主要通過:
????public Writable call(Writable param, InetSocketAddress addr,
?????????????????????? Class<?> protocol, UserGroupInformation ticket)?
?????????????????????? throws InterruptedException, IOException 實現
2.1首先在該方法調用中:
??? Call call = new Call(param); //將param轉換成Call對象? 其實就是將Invocation(用來序列化和反序列化RPC客戶端的調用信息,包括方法名和參數信息)轉化為Call 。
2.2 客服端創建一個通向服務端的連接connection,Connection connection = getConnection(addr, protocol, ticket, call);然后將此次調用放入CallList里,這樣客戶端就可以同時發生很多調用,每個調用用ID來識別。
(1) 根據RPC服務端的地址和接口從連接池中獲取一個,如果取到Connection則直接返回。
(2) 否則新建一個Connection,并將它放入到連接池中
(3) 然后通過SocketFactory創建一個Socket,并建立到RPC服務端的連接,如果連接不成功,則重試。
(4) 創建和關聯輸入和輸出流對象。
2.3 發送調用參數connection.sendParam(call)。調用參數是Client的調用方(比如NameNode,DataNode等)指定的,一般就是一個Invocation對象,里面包含要調用的方法和參數。
2.4等待調用結果.Client.Connection是個線程類,啟動了之后唯一做的事情就是等待調用結果。
??? synchronized (call) {???
??????????while (!call.done){?? //done就是服務器端返回該call的結果,判斷該call是否處理
????????? ……………………???????????
????????? ………………
?????? }
??? }
3.服務器端對客服端的call請求處理
對于服務器端,其有一個方法start指定了啟動服務器開始監聽,這個start被四個類調用,分別是TaskTracker.initialize,Namenode.initialize,Jobtracker.offerService,Datanode.startDatanode,顯然,任何兩者之間的通信都是考這個client-server模型實現的。
3.1 server start后,干了三件事,就是啟動三個線程
1.啟動listener,監聽客戶端Call
2.啟動response,隨時準備將處理結果發回client
3.啟動10個handler,處理具體的請求。
3.2上面三個線程的具體工作過程
3.2.1. Listener線程
該線程負責監聽客戶端請求以及數據的接收,然后將接收到的數據組成一個Call實例,放到請求隊列里面。具體做法如下:
1) Listener線程循環等待RPC客戶端的發送數據過來
2) 當有數據可以接收時,調用Connection的readAndProcess()方法。readAndProces()又調用processData()方法
3) Connection邊接收邊對數據進行處理,如果接收到一個完整的Call包,則構建一個Call對象。readAndProcess()調用processData()方法把Call對象加入到Call隊列的,并將這個Call對象PUSH到Call隊列中,由Handler線程來處理Call隊列中的所有Call。
3.2.2.Handler線程
該線程負責從請求隊列中取出調用請求,通過調用抽象方法
1) Handler線程循環監聽Call隊列,如果Call隊列為空,則進入wait狀態,否則按FIFO規則從Call隊列取出Call
2) 將Call交給RPC.Server處理(調用RPC.Server的public Writable call(Class<?> protocol, Writable param, long receivedTime)
??? throws IOException ),因為RPC對Server的一些功能進行了實現
3) 借助java里的反射機制,完成對目標方法的調用
4) 返回響應。由于響應需要通過SOCKET返回給RPC客戶端,所以響應的類型必須是Writable。
3.2.2. Response線程???
?Response也監視responselist,如果responselist中某個call需要將結果寫入客戶端就寫出,當某個call的結果被發送完畢,從responselist中刪除該call對象。
注意:handler完成call之后就開始向客戶端寫call結果,但是結果可能太多,無法通過一次性發送完畢,而發送之后還要等待client接受完畢才能再發,如果現在handler在那里等待客戶端接受完畢,然后再發,效率不高。解決辦法是handler處理完畢之后,只向client發送一次處理結果。如果這一次將處理結果發送完畢,接下來就沒有response的事情了,如果沒有發送完畢,接下來response負責將剩下的處理結果發送給客戶端。這樣handler的并發量會大一些。
服務器實現中大量利用監視隊列,比如handler就直觀堅持calllist,一旦發現數據就開始處理,而response就監視responselist,發現數據需要發送就開始發送。
參考:
1.http://bbs.hadoopor.com/thread-329-1-2.html
2.http://jackosn-liao.iteye.com/blog/851914?
3. http://blog.csdn.net/wuixiaobao/article/details/6549781
4.http://bbs.hadoopor.com/thread-329-1-2.html
?
?
?
轉載于:https://www.cnblogs.com/MGGOON/archive/2012/02/24/2367231.html
總結
以上是生活随笔為你收集整理的Hadoop中RPC机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 手游服务器源码 https,python
- 下一篇: linux下svn常用指令