怎么看调用的接口_Hadoop RPC调用实例分析
以ClientProtocol接口中的rename RPC調用進行一次實例分析。
rename方法在ClientProtocol接口中定義,它的兩個參數是String類型的,不能直接通過網絡傳輸。
我們看誰實現了ClientProtocol接口并重寫rename方法。
看到是ClientNamenodeProtocolTranslatorPB這個類。看下這個類如何實現的rename方法,代碼如下:
??@Override??public?boolean?rename(String?src,?String?dst)?throws?IOException?{
??????//把String類型的參數進行包裝,變成可通過網絡傳輸的序列化類型
????RenameRequestProto?req?=?RenameRequestProto.newBuilder()
????????.setSrc(src)
????????.setDst(dst).build();
????try?{
????????//通過調用rpcProxy的rename方法實現rpc調用,接下來我們就看rpcProxy這個對象
??????return?rpcProxy.rename(null,?req).getResult();
????}?catch?(ServiceException?e)?{
??????throw?ProtobufHelper.getRemoteException(e);
????}
??}
rpcProxy是ClientNamenodeProtocolTranslatorPB這個類的成員變量,類型是ClientNamenodeProtocolPB。
那我們就繼續點進去rpcProxy.rename方法。發現點不進去了,因為rpcProxy的類型是ClientNamenodeProtocolPB。這個類繼承了protoBuf生成的類,我們還沒有編譯所以點不進去。
但是可以看看相關的.proto文件。找到ClientNamenodeProtocol.proto文件,搜索rename。能夠看到protobuf定義的參數和返回值。參數就是把原本的兩個字符串類型包裝成新的RenameRequestProto類型。
似乎我們是沒法往下看了。感覺也沒什么收獲?那你感覺錯了。我們還可以看看ClientNamenodeProtocolTranslatorPB的構造函數,看看是怎么初始化的rpcProxy對象。
繼續追,看看誰調用了這個構造函數。
這三處地方調用了。我們進入第一個方法:
我們繼續看這個proxy是怎么生成的。
這是一個鏈式調用。先調用了RPC.getProtocolProxy,然后調用了getProxy()。
調用getProtocolProxy得到一個ProtocolProxy< ClientNamenodeProtocolPB>對象,這個對象包含了用于和服務端通信的代理對象和服務端支持的方法集合。在這個對象的基礎上調用getProxy方法得到那個代理對象。
RPC.getProtocolProxy()的代碼如下:
可以看到首先調用了getProtocolEngine獲得通信所使用的RpcEngine,然后又調用了getProxy獲得一個客戶端-side 的代理對象。
RpcEngine.getProxy()的實現如下(我們進入的是ProtobufRpcEngine的實現):
進到這里一切豁然開朗,可以看到這里使用了JDK的動態代理。我們關注這句代碼:
return?new?ProtocolProxy(protocol,?(T)?Proxy.newProxyInstance(????????protocol.getClassLoader(),?new?Class[]{protocol},?invoker),?false);
new ProtocolProxy()的第二個參數代表這個對象里保存的代理對象。
(T)?Proxy.newProxyInstance(????????protocol.getClassLoader(),?new?Class[]{protocol},?invoker)
生成代理對象使用JDK中Proxy類的靜態方法newProxyInstance。第三個參數是InvocationHandler的實現,真正的方法調用就在這個InvocationHandler里的invoke里調用。我們看到傳入的invoke是new的ProtobufRpcEngine的內部類Invoker。這個Invoker最終會在invoke方法中去調用Client.call方法向RPC Server發送RPC請求,這個過程對客戶端是完全透明的。如下圖所示, 已用紅框標出。
至此我們是明白了客戶端是如何把RPC請求發送到服務器了。總結一下(采用正序的方式)
因為我們調用方法一般是使用DFSClient這個類的對象去調用,所以在DFSClient里搜rename方法。可以看到這個方法內部調用namenode.rename()方法。那么我們就需要知道那么namenode是怎么構造出來的。
namenode的聲明如下:
它是一個ClientProtocol類型的變量,因此可以調用所有客戶端對NN進行操作的方法。接下來找構造函數,看看如何給namenode成員變量賦值的。
DFSClient在自己的構造函數里會調用 NameNodeProxiesClient.createProxyWithClientProtocol()來獲得一個支持ClientProtocol協議的代理對象。最后把NameNode的代理賦值給namenode成員。這樣我們就可以通過namenode對象來調用各種方法操作了。
創建這個代理的方法是如下代碼:
紅色箭頭所指分別代表創建non-HA的和HA的代理。如果配置了dfs.client.failover.proxy.provider.$nameservice屬性,就認為是開啟HA的集群。因為現在集群幾乎都是HA的,所以我們看下面這個箭頭的創建HA代理對象的方法createHAProxy。記住傳入的最后兩個參數ClientProtocol.class和failoverProxyProvider。后面我們還會用到這兩個參數,到時候可以再回來看。
createHAProxy方法代碼如下:
??public?static??ProxyAndInfo?createHAProxy(??????Configuration?conf,?URI?nameNodeUri,?Class?xface,
??????AbstractNNFailoverProxyProvider?failoverProxyProvider)?{
????Preconditions.checkNotNull(failoverProxyProvider);//?HA?case
????DfsClientConf?config?=?new?DfsClientConf(conf);//我們主要看這個,因為你看方法的返回值是Proxy和Info,我們只需要關注Proxy就好了。
????T?proxy?=?(T)?RetryProxy.create(xface,?failoverProxyProvider,
????????RetryPolicies.failoverOnNetworkException(
????????????RetryPolicies.TRY_ONCE_THEN_FAIL,?config.getMaxFailoverAttempts(),
????????????config.getMaxRetryAttempts(),?config.getFailoverSleepBaseMillis(),
????????????config.getFailoverSleepMaxMillis()));
????Text?dtService;if?(failoverProxyProvider.useLogicalURI())?{
??????dtService?=?HAUtilClient.buildTokenServiceForLogicalUri(nameNodeUri,
??????????HdfsConstants.HDFS_URI_SCHEME);
????}?else?{
??????dtService?=?SecurityUtil.buildTokenService(
??????????DFSUtilClient.getNNAddress(nameNodeUri));
????}return?new?ProxyAndInfo<>(proxy,?dtService,
????????DFSUtilClient.getNNAddressCheckLogical(conf,?nameNodeUri));
??}
因為一般我們都會有retry機制,所以這里使用了RetryProxy去create代理。跟到RetryProxy.create中。注意create的前兩個參數就是剛剛讓大家記住的兩個。分別是:ClientProtocol.class和failoverProxyProvider。
進到RetryProxy.create源碼:
此時這個泛型T,其實就是ClientProtocol。這個方法里使用JDK的動態代理生成代理對象,這塊我們應該很熟悉。newProxyInstance的三個參數,我們看第三個參數,因為在代理對象調用方法最終都會由這個RetryInvocationHandler的invoke方法接管。所以我們去看看RetryInvocationHandler的實現。點進去。
可以看到,RetryInvocationHandler的構造方法中還是有我們之前讓大家記住的參數failoverProxyProvider。
因為JDK的動態代理最終方法會走到invoke。所以我們直接去看RetryInvocationHandler的invoke方法。
這里主要是通過循環去實現retry。循環中關鍵方法是invokeOnce()。點進去。我們關注正常情況下的處理,先不關注異常情況。正常情況下就是再調用invoke()
再點進去:
繼續點進invokeMethod()方法中:
再點進invokeMethod中,終于到頭了。不能再繼續往下點invoke了,那就是JDK的方法了。框起來的代碼表示在代理對象上調用此Method底層代表的方法(本例是rename),并把args作為參數傳給它。
那這個代理對象是誰呢,proxyDescriptor.getProxy()返回的是什么呢?這時候想想剛才讓大家記住的那個參數。
所以proxyDescriptor里面的proxy就是proxyProvider中保存的代理對象信息。而創建這個proxyProvider的時機正是在DFSClient調用的createProxyWithClientProtocol方法里。而且,從下面的的代碼可以看到,if (failoverProxyProvider == null)還會再這樣我們就可以唱:“又回到最初的起點,呆呆的站在代碼前”。
點進去看看怎么創建的failoverProxyProvider。經過一系列重載,最終走到下圖這個地方。可以看到是通過反射機制,去new的FailoverProxyProvider對象。newInstance的第三個參數xface就是我們的代理類的類型,所以最終這個failoverProxyprovider里面會有我們需要的代理對象。到此分析結束。
這里面還有一個細節。就是如何得到的FailoverProxyProvider的Class對象。通過跟蹤getFailoverProxyProviderClass方法。
上圖紅框中圈出的configKey = “dfs.client.failover.proxy.provider”+host。這個host是在core-site.xml中fs.defaultFS這個屬性里配置的,把hdfs的schema去掉就是host。這個配置正好對應了官網hdfs-site.xml中的:
那這個host就是nameservice ID。為什么這么說呢,因為官網配置解釋就這么說的:
那我們看看一般這個類是什么,可以看到,一般是ConfiguredFailoverProxyProvider這個類。我們通過這個配置項拿到了類的全路徑名。
最后通過Class.forName就加載了這個類。
大功告成。
創建完namenode這個代理對象之后,就可以通過它調用各種方法了。比如我們之前舉例的rename。我們點進去,進入了ClientProtocol接口。那最終肯定調用的是它的實現類。這就回到了我們文章開頭。
至此Hadoop RPC在客戶端一側的調用流程已經全部分析完畢。
總結
以上是生活随笔為你收集整理的怎么看调用的接口_Hadoop RPC调用实例分析的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: parseInt(string, rad
- 下一篇: 技术人的未来在哪里?
