hadoop3.0.0 源码阅读之一:IPC Client部分
之前一直在看Hadoop源代碼,接下來打算好好的總結一下,先占一個坑,先把之前注釋的代碼發出來。如有不對,請大家指正。
一、RPC基礎概念
1.1?RPC的基礎概念
RPC,即Remote?Procdure?Call,中文名:遠程過程調用;
(1)它允許一臺計算機程序遠程調用另外一臺計算機的子程序,而不用去關心底層的網絡通信細節,對我們來說是透明的。因此,它經常用于分布式網絡通信中。
RPC協議假定某些傳輸協議的存在,如TCP或UDP,為通信程序之間攜帶信息數據。在OSI網絡通信模型中,RPC跨越了傳輸層和應用層。RPC使得開發包括網絡分布式多程序在內的應用程序更加容易。
(2)Hadoop的進程間交互都是通過RPC來進行的,比如Namenode與Datanode直接,Jobtracker與Tasktracker之間等。
因此,可以說Hadoop的運行就是建立在RPC基礎之上的。
1.2?RPC的顯著特點
(1)透明性:遠程調用其他機器上的程序,對用戶來說就像是調用本地方法一樣;
(2)高性能:RPC?Server能夠并發處理多個來自Client的請求;
(3)可控性:jdk中已經提供了一個RPC框架—RMI,但是該PRC框架過于重量級并且可控之處比較少,所以Hadoop?RPC實現了自定義的PRC框架。
?
? (1)RPC采用了C/S的模式;
(2)Client端發送一個帶有參數的請求信息到Server;
(3)Server接收到這個請求以后,根據發送過來的參數調用相應的程序,然后把自己計算好的結果發送給Client端;
(4)Client端接收到結果后繼續運行;
1.4?Hadoop中的RPC機制(IPC)
同其他RPC框架一樣,Hadoop?RPC分為四個部分:
(1)序列化層:Clent與Server端通信傳遞的信息采用了Hadoop里提供的序列化類或自定義的Writable類型;
(2)函數調用層:Hadoop?RPC通過動態代理以及java反射實現函數調用;
(3)網絡傳輸層:Hadoop?RPC采用了基于TCP/IP的socket機制;
(4)服務器端框架層:RPC?Server利用java?NIO以及采用了事件驅動的I/O模型,提高RPC?Server的并發處理能力;
1.5?Hadoop?RPC設計技術
(1)動態代理
動態代理可以提供對另一個對象的訪問,同時隱藏實際對象的具體事實,代理對象對客戶隱藏了實際對象。目前Java開發包中提供了對動態代理的支持,但現在只支持對接口的實現。
(2)反射——動態加載類
(3)序列化
(4)非阻塞的異步IO(NIO)
 
RPC是在分布式系統中必須要關注的,就是你在某一臺機器要調用其他機器上的函數的時候,就可以用RPC,使得這個函數調用就像調用本地函數一樣,你不需要擔心底層如何實現的,就跟TCP一樣, 上層調用無需關注下層實現。
Client的大致流程全在下面的代碼中,你需要有的基礎知識(1)動態代理 (2)JAVA NIO 。
所有的RPC請求都會重定向,然后所有請求都會形成一個Call類,Call類會加到傳輸隊列中,然后會有一個線程獲取Call,并進行數據的傳輸調用,數據傳輸用的NIO。具體請看代碼注釋。
/*需要的知識:1動態代理 2.JAVA NIO *//*客戶端所有的方法調用都重定向到了Invoker.invoke()方法中,所以分析IPC的連接建立與方法調用就從Invoker類開始。所有的ipc代理最后都會調用這個invoke()方法@proxy 需要被代理的協議@method 需要被ipc的方法@args 參數 */ // ProtobufRpcEngine.invoke() public Object invoke(Object proxy, final Method method, Object[] args)throws ServiceException {long startTime = 0;if (args.length != 2) { // RpcController + Messagethrow new ServiceException("Too many parameters for request. Method: ["+ method.getName() + "]" + ", Expected: 2, Actual: "+ args.length);}if (args[1] == null) {throw new ServiceException("null param while calling Method: ["+ method.getName() + "]");}// if Tracing is on then start a new span for this rpc.// guard it in the if statement to make sure there isn't// any extra string manipulation.Tracer tracer = Tracer.curThreadTracer();TraceScope traceScope = null;if (tracer != null) {traceScope = tracer.newScope(RpcClientUtil.methodToTraceString(method));}//IPC產生消息發送頭,包括函數名,ProtocolRequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);//參數列表Message theRequest = (Message) args[1];//ipc調用的返回值final RpcResponseWrapper val;try {/** 發送rpc請求,等待返回結果* */val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId,fallbackToSimpleAuth);} finally {if (traceScope != null) traceScope.close();}if (Client.isAsynchronousMode()) {final AsyncGet<RpcResponseWrapper, IOException> arr= Client.getAsyncRpcResponse();final AsyncGet<Message, Exception> asyncGet= new AsyncGet<Message, Exception>() {@Overridepublic Message get(long timeout, TimeUnit unit) throws Exception {return getReturnMessage(method, arr.get(timeout, unit));}@Overridepublic boolean isDone() {return arr.isDone();}};ASYNC_RETURN_MESSAGE.set(asyncGet);return null;} else {return getReturnMessage(method, val);} }/** * Make a call, passing <code>rpcRequest</code>, to the IPC server defined by* <code>remoteId</code>, returning the rpc respond.** @param rpcKind* @param rpcRequest - contains serialized method and method parameters* @param remoteId - the target rpc server* @param fallbackToSimpleAuth - set to true or false during this method to* indicate if a secure client falls back to simple auth* @returns the rpc response* Throws exceptions if there are network problems or if the remote code* threw an exception.*/ // Client.call() public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,ConnectionId remoteId, AtomicBoolean fallbackToSimpleAuth)throws IOException {return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT,fallbackToSimpleAuth); }/*** Make a call, passing <code>rpcRequest</code>, to the IPC server defined by* <code>remoteId</code>, returning the rpc response.** @param rpcKind* @param rpcRequest - contains serialized method and method parameters* @param remoteId - the target rpc server* @param serviceClass - service class for RPC* @param fallbackToSimpleAuth - set to true or false during this method to* indicate if a secure client falls back to simple auth* @returns the rpc response* Throws exceptions if there are network problems or if the remote code* threw an exception.*//** 產生一個 call,傳遞rcpRequest到由remoteId指定的IPC server,并且返回一個 rpc response** */ // Client.call() Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,ConnectionId remoteId, int serviceClass,AtomicBoolean fallbackToSimpleAuth) throws IOException {//產生一個回調實例final Call call = createCall(rpcKind, rpcRequest);//獲得連接,里面包含握手,握手發送了一些基本消息final Connection connection = getConnection(remoteId, call, serviceClass,fallbackToSimpleAuth);try {checkAsyncCall();try {connection.sendRpcRequest(call); // send the rpc request} } if (isAsynchronousMode()) {final AsyncGet<Writable, IOException> asyncGet= new AsyncGet<Writable, IOException>() {@Overridepublic Writable get(long timeout, TimeUnit unit)throws IOException, TimeoutException{boolean done = true;try {final Writable w = getRpcResponse(call, connection, timeout, unit);if (w == null) {done = false;throw new TimeoutException(call + " timed out "+ timeout + " " + unit);}return w;} finally {if (done) {releaseAsyncCall();}}}@Overridepublic boolean isDone() {synchronized (call) {return call.done;}}};ASYNC_RPC_RESPONSE.set(asyncGet);return null;} else {//返回rpc responsereturn getRpcResponse(call, connection, -1, null);} }/** Get a connection from the pool, or create a new one and add it to the* pool. Connections to a given ConnectionId are reused. */ // Client.Connetcion.getConnection() private Connection getConnection(ConnectionId remoteId,Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)throws IOException {Connection connection;/* we could avoid this allocation for each RPC by having a * connectionsId object and with set() method. We need to manage the* refs for keys in HashMap properly. For now its ok.*/while (true) {// These lines below can be shorten with computeIfAbsent in Java8connection = connections.get(remoteId);if (connection == null) {// 初始化connection信息connection = new Connection(remoteId, serviceClass);// putIfAbsent/*** If the specified key is not already associated* with a value, associate it with the given value.* This is equivalent to* <pre>* if (!map.containsKey(key))* return map.put(key, value);* else* return map.get(key);</pre>* except that the action is performed atomically.* .....*///放入隊列中,線程安全的放入Connection existing = connections.putIfAbsent(remoteId, connection);if (existing != null) {connection = existing;}}// 在該connection中加入一個call,線程安全的加if (connection.addCall(call)) {break;} else {// This connection is closed, should be removed. But other thread could// have already known this closedConnection, and replace it with a new// connection. So we should call conditional remove to make sure we only// remove this closedConnection.connections.remove(remoteId, connection);}}// If the server happens to be slow, the method below will take longer to// establish a connection.//設置連接IO流connection.setupIOstreams(fallbackToSimpleAuth);return connection; }/*** Add a call to this connection's call queue and notify* a listener; synchronized.* Returns false if called during shutdown.* @param call to add* @return true if the call was added.*//*往這個連接中加入一個call, 并且喚醒Connection run線程的等待*/ //Client.Connection.addCall() private synchronized boolean addCall(Call call) {if (shouldCloseConnection.get())return false;//加入calls發送隊列匯總calls.put(call.id, call);//喚醒線程notify();return true; }/** Connect to the server and set up the I/O streams. It then sends* a header to the server and starts* the connection thread that waits for responses.*//** 建立這個socket 的IO 流* *///Client.Connection.setupIOstreams() private synchronized void setupIOstreams(AtomicBoolean fallbackToSimpleAuth) {try {Span span = Tracer.getCurrentSpan();if (span != null) {span.addTimelineAnnotation("IPC client connecting to " + server);}short numRetries = 0;Random rand = null;while (true) {// 建立socket連接setupConnection();//獲得這個socket連接的輸入流InputStream inStream = NetUtils.getInputStream(socket);//獲得輸出流OutputStream outStream = NetUtils.getOutputStream(socket);// 寫 rpc 請求頭/*** Write the connection header - this is sent when connection is established* +----------------------------------+* | "hrpc" 4 bytes |* +----------------------------------+* | Version (1 byte) |* +----------------------------------+* | Service Class (1 byte) |* +----------------------------------+* | AuthProtocol (1 byte) |* +----------------------------------+*///第一次寫,寫headerwriteConnectionHeader(outStream);/*private void writeConnectionHeader(OutputStream outStream)throws IOException {DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));// Write out the header, version and authentication method// “hrpc”out.write(RpcConstants.HEADER.array());//version="9"out.write(RpcConstants.CURRENT_VERSION);out.write(serviceClass);out.write(authProtocol.callId);out.flush();}*/// 都是以 Ping請求發送的 if (doPing) {inStream = new PingInputStream(inStream);}this.in = new DataInputStream(new BufferedInputStream(inStream));// SASL may have already buffered the streamif (!(outStream instanceof BufferedOutputStream)) {outStream = new BufferedOutputStream(outStream);}this.out = new DataOutputStream(outStream);//將connectionHeader發送到服務端口// 第二次寫writeConnectionContext(remoteId, authMethod);// update last activity timetouch();span = Tracer.getCurrentSpan();if (span != null) {span.addTimelineAnnotation("IPC client connected to " + server);}// start the receiver thread after the socket connection has been set// up//開啟 connection線程,如果calls隊列中有call,就會去接受消息start();return;}} }/*發送rpc 請求 第三次寫 */ // Client.Connection.sendRpcRequest() public void sendRpcRequest(final Call call)throws InterruptedException, IOException {if (shouldCloseConnection.get()) {return;}// Serialize the call to be sent. This is done from the actual// caller thread, rather than the sendParamsExecutor thread,// so that if the serialization throws an error, it is reported// properly. This also parallelizes the serialization.//// Format of a call on the wire:// 0) Length of rest below (1 + 2)// 1) RpcRequestHeader - is serialized Delimited hence contains length// 2) RpcRequest//// Items '1' and '2' are prepared here. final DataOutputBuffer d = new DataOutputBuffer();/** call.rpcKind rpc引擎** */RpcRequestHeaderaderProto header = ProtoUtil.makeRpcRequestHeader(call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,clientId);header.writeDelimitedTo(d);call.rpcRequest.write(d);// 同步鎖 sendRpcRequestLock synchronized (sendRpcRequestLock) {//為了后續的數據發送Future<?> senderFuture = sendParamsExecutor.submit(new Runnable() {// 發送具體回調函數給server端@Overridepublic void run() {try {synchronized (Connection.this.out) {if (shouldCloseConnection.get()) {return;}if (LOG.isDebugEnabled())LOG.debug(getName() + " sending #" + call.id);// 獲取數據長度byte[] data = d.getData();int totalLength = d.getLength();out.writeInt(totalLength); // Total Lengthout.write(data, 0, totalLength);// RpcRequestHeader + RpcRequestout.flush();}} catch (IOException e) {// exception at this point would leave the connection in an// unrecoverable state (eg half a call left on the wire).// So, close the connection, killing any outstanding callsmarkClosed(e);} finally {//the buffer is just an in-memory buffer, but it is still polite to// close earlyIOUtils.closeStream(d);}}});try {senderFuture.get();} } }//Client.Connection.run() public void run() {try {// 等待receive的工作while (waitForWork()) {//wait here for work - read or close connectionreceiveRpcResponse();}} close(); }private void receiveRpcResponse() {if (shouldCloseConnection.get()) {return;}//修改最后一次活動時間touch();try {//讀取數據頭int totalLen = in.readInt();/*** Protobuf type {@code hadoop.common.RpcResponseHeaderProto}** <pre>*** Rpc Response Header* +------------------------------------------------------------------+* | Rpc total response length in bytes (4 bytes int) |* | (sum of next two parts) |* +------------------------------------------------------------------+* | RpcResponseHeaderProto - serialized delimited ie has len |* +------------------------------------------------------------------+* | if request is successful: |* | - RpcResponse - The actual rpc response bytes follow |* | the response header |* | This response is serialized based on RpcKindProto |* | if request fails : |* | The rpc response header contains the necessary info |* +------------------------------------------------------------------+** Note that rpc response header is also used when connection setup fails.* Ie the response looks like a rpc response with a fake callId.* </pre>*/RpcResponseHeaderProto header =RpcResponseHeaderProto.parseDelimitedFrom(in);checkResponse(header);int headerLen = header.getSerializedSize();headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);//獲取callid號int callId = header.getCallId();if (LOG.isDebugEnabled())LOG.debug(getName() + " got value #" + callId);//Rpc 回復的狀態RpcStatusProto status = header.getStatus();//判斷返回的rpc response 狀態if (status == RpcStatusProto.SUCCESS) {Writable value = ReflectionUtils.newInstance(valueClass, conf);value.readFields(in); // read value// 移除這個callidfinal Call call = calls.remove(callId);// 設置返回值call.setRpcResponse(value);// verify that length was correct// only for ProtobufEngine where len can be verified easilyif (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) {ProtobufRpcEngine.RpcWrapper resWrapper = (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse();if (totalLen != headerLen + resWrapper.getLength()) { throw new RpcClientException("RPC response length mismatch on rpc success");}}} else { // Rpc Request failed// Verify that length was correctif (totalLen != headerLen) {throw new RpcClientException("RPC response length mismatch on rpc error");}final String exceptionClassName = header.hasExceptionClassName() ?header.getExceptionClassName() : "ServerDidNotSetExceptionClassName";final String errorMsg = header.hasErrorMsg() ? header.getErrorMsg() : "ServerDidNotSetErrorMsg" ;final RpcErrorCodeProto erCode = (header.hasErrorDetail() ? header.getErrorDetail() : null);if (erCode == null) {LOG.warn("Detailed error code not set by server on rpc error");}RemoteException re = new RemoteException(exceptionClassName, errorMsg, erCode);if (status == RpcStatusProto.ERROR) {final Call call = calls.remove(callId);call.setException(re);} else if (status == RpcStatusProto.FATAL) {// Close the connectionmarkClosed(re);}}} catch (IOException e) {markClosed(e);} }
總結
以上是生活随笔為你收集整理的hadoop3.0.0 源码阅读之一:IPC Client部分的全部內容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: Vim+ctags+cscope+Ner
 - 下一篇: voting设计模式