Dubbo原理和源码解析之服务引用
github新增倉(cāng)庫(kù) "dubbo-read"(點(diǎn)此查看),集合所有《Dubbo原理和源碼解析》系列文章,后續(xù)將繼續(xù)補(bǔ)充該系列,同時(shí)將針對(duì)Dubbo所做的功能擴(kuò)展也進(jìn)行分享。不定期更新,歡迎Follow。
?
一、框架設(shè)計(jì)
在官方《Dubbo 開發(fā)指南》框架設(shè)計(jì)部分,給出了引用服務(wù)時(shí)序圖:
?另外,在官方《Dubbo 用戶指南》集群容錯(cuò)部分,給出了服務(wù)引用的各功能組件關(guān)系圖:
?本文將根據(jù)以上兩張圖,分析服務(wù)引用的實(shí)現(xiàn)原理,并進(jìn)行詳細(xì)的代碼跟蹤與解析。
二、原理和源碼解析
2.1 創(chuàng)建代理
Dubbo 基于 Spring 的 Schema 擴(kuò)展實(shí)現(xiàn) XML 配置解析,DubboNamespaceHandler?會(huì)將 <dubbo:reference> 標(biāo)簽解析為 ReferenceBean,ReferenceBean?實(shí)現(xiàn)了 FactoryBean,因此當(dāng)它在代碼中有引用時(shí),會(huì)調(diào)用 ReferenceBean#getObject()?方法進(jìn)入節(jié)點(diǎn)注冊(cè)和服務(wù)發(fā)現(xiàn)流程。
ReferenceBean.java
public Object getObject() throws Exception {return get(); }ReferenceConfig.java
public synchronized T get() {if (destroyed){throw new IllegalStateException("Already destroyed!");}if (ref == null) {init();}return ref; }private void init() {//.......忽略ref = createProxy(map); }private T createProxy(Map<String, String> map) {//.....忽略invoker = refprotocol.refer(interfaceClass, urls.get(0));//.....忽略// 創(chuàng)建服務(wù)代理return (T) proxyFactory.getProxy(invoker); }?2.2 服務(wù)發(fā)現(xiàn)
因?yàn)橥ㄟ^(guò)注冊(cè)中心,因此在 ReferenceConfig.java#createProxy()?方法中,進(jìn)入 RegistryProtocol.java#refer()?方法。
RegistryProtocol.java
private Cluster cluster;public void setCluster(Cluster cluster) {this.cluster = cluster; }private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);directory.setRegistry(registry);directory.setProtocol(protocol);URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters());if (! Constants.ANY_VALUE.equals(url.getServiceInterface())&& url.getParameter(Constants.REGISTER_KEY, true)) {registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY,Constants.CHECK_KEY, String.valueOf(false)));}directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY));return cluster.join(directory); }RegistryDirectory 通過(guò) RegistryDirectory#subscribeUrl() 向 Zookeeper 訂閱服務(wù)節(jié)點(diǎn)信息并 watch 變更,這樣就實(shí)現(xiàn)了服務(wù)自動(dòng)發(fā)現(xiàn)。
2.3 Invoker選取
2.3.1 Cluster
上面我之所以把設(shè)置 Cluster 的代碼貼上,是因?yàn)榇颂幧婕暗揭粋€(gè) Dubbo 服務(wù)框架核心的概念——微內(nèi)核和插件機(jī)制(此處會(huì)單獨(dú)一篇文章詳細(xì)介紹):
?
有關(guān) Dubbo 的設(shè)計(jì)原則,請(qǐng)查看Dubbo《一些設(shè)計(jì)上的基本常識(shí)》。
Cluster 類的定義如下:
Cluster.java
@SPI(FailoverCluster.NAME) public interface Cluster {/*** Merge the directory invokers to a virtual invoker.*/@Adaptive<T> Invoker<T> join(Directory<T> directory) throws RpcException; }cluster 的類型是 Cluster$Adaptive,實(shí)際上是一個(gè)通用的代理類,它會(huì)根據(jù) URL 中的 cluster 參數(shù)值定位到實(shí)際的 Cluster 實(shí)現(xiàn)類(默認(rèn)是 FailoverCluster)。 由于 ExtensionLoader?在實(shí)例化對(duì)象時(shí),會(huì)在實(shí)例化完成之后自動(dòng)套上 Wrapper 類,而 MockerClusterWrapper?就是這樣一個(gè) Wrapper。
MockerClusterWrapper.java
public class MockClusterWrapper implements Cluster {private Cluster cluster;public MockClusterWrapper(Cluster cluster) {this.cluster = cluster;}public <T> Invoker<T> join(Directory<T> directory) throws RpcException {return new MockClusterInvoker<T>(directory, this.cluster.join(directory));} }也就是說(shuō),實(shí)例化出來(lái)的 FailoverCluster?會(huì)作為參數(shù)賦予 MockerClusterWrapper#cluster,而 MockClusterWrapper?會(huì)作為參數(shù)賦予 RegistryProtocol#cluster。因此 RegistryProtocol#doRefer()?中調(diào)用 cluster.join(directory)?實(shí)際上是調(diào)用的 MockClusterWrapper#join(directory)。 使用這種機(jī)制,可以把一些公共的處理放在 Wrapper 類中,實(shí)現(xiàn)代碼和功能收斂。
MockClusterInvoker.java
public Result invoke(Invocation invocation) throws RpcException {Result result = null;String value = directory.getUrl().getMethodParameter(invocation.getMethodName(),Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")){//no mockresult = this.invoker.invoke(invocation);} else if (value.startsWith("force")) {if (logger.isWarnEnabled()) {logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());}//force:direct mockresult = doMockInvoke(invocation, null);} else {//fail-mocktry {result = this.invoker.invoke(invocation);}catch (RpcException e) {if (e.isBiz()) {throw e;} else {if (logger.isWarnEnabled()) {logger.info("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);}//fail:mockresult = doMockInvoke(invocation, e);}}}return result; }這里還涉及到 Dubbo 另外一個(gè)核心機(jī)制——Mock。Mock 可以在測(cè)試中模擬服務(wù)調(diào)用的各種異常情況,還用來(lái)實(shí)現(xiàn)服務(wù)降級(jí)。 從 MockClusterWrapper.join()?方法可知,實(shí)際創(chuàng)建的 ClusterInvoker?是封裝了 FailoverClusterInvoker?的 MockerClusterInvoker。
在 MockerClusterInvoker?中,調(diào)用之前 Dubbo 會(huì)先檢查 URL 中是否有 mock 參數(shù)(通過(guò)服務(wù)治理后臺(tái) Consumer 端的屏蔽和容錯(cuò)進(jìn)行設(shè)置,或者直接動(dòng)態(tài)設(shè)置 mock 參數(shù)值),如果存在且以 force 開頭,則不發(fā)起遠(yuǎn)程調(diào)用直接執(zhí)行降級(jí)邏輯;如果存在且以 fail 開頭,則在遠(yuǎn)程調(diào)用異常時(shí)才會(huì)執(zhí)行降級(jí)邏輯。
因此,通過(guò) MockerClusterWrapper?成功地在 Invoker 中植入了 Mock 機(jī)制。
2.3.2 Directory
在 RegistryProtocol#doRefer()?中可以看到,服務(wù)發(fā)現(xiàn)過(guò)程是通過(guò) RegistryDirectory?向 Zookeeper 訂閱來(lái)實(shí)現(xiàn)的。 先看看 Directory 類之間的關(guān)系:
看下 Directory 接口的定義:
Directory.java
public interface Directory<T> extends Node {Class<T> getInterface();List<Invoker<T>> list(Invocation invocation) throws RpcException; }Directory 可以看做是對(duì)應(yīng) Interface 的 Invoker 列表,而這個(gè)列表可能是動(dòng)態(tài)變化的,比如注冊(cè)中心推送變更。
通過(guò) ReferenceConfig#createProxy()?方法可知,StaticDirectory?主要用于多注冊(cè)中心引用的場(chǎng)景,它的 invoker 列表是通過(guò)參數(shù)傳入的、固定的。在此不做更詳細(xì)的解析了。
RegistryDirectory?用于使用單注冊(cè)中心發(fā)現(xiàn)服務(wù)的場(chǎng)景。RegistryDirectory?沒有重寫 list() 方法,所以使用 AbstractDirectory#list()?方法:
AbstractDirectory.java
RegistryDirectory.java
/*** 獲取 invoker 列表*/ public List<Invoker<T>> doList(Invocation invocation) {if (forbidden) {throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "Forbid consumer " +NetUtils.getLocalHost() + " access service " + getInterface().getName() + " from registry " + getUrl().getAddress() + " use dubbo version " +Version.getVersion() + ", Please check registry access list (whitelist/blacklist).");}List<Invoker<T>> invokers = null;Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap; //本地緩存if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {String methodName = RpcUtils.getMethodName(invocation);//根據(jù)方法名從本地緩存中獲取invoker列表,此處略//…… }return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers; }/*** 節(jié)點(diǎn)變更通知*/ public synchronized void notify(List<URL> urls) {List<URL> invokerUrls = new ArrayList<URL>();List<URL> routerUrls = new ArrayList<URL>();List<URL> configuratorUrls = new ArrayList<URL>();for (URL url : urls) {String protocol = url.getProtocol();String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) {routerUrls.add(url);} else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {configuratorUrls.add(url);} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {invokerUrls.add(url);} else {logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());}}// configurators if (configuratorUrls != null && configuratorUrls.size() >0 ){this.configurators = toConfigurators(configuratorUrls);}// routersif (routerUrls != null && routerUrls.size() >0 ){List<Router> routers = toRouters(routerUrls);if(routers != null){ // null - do nothing setRouters(routers);}}List<Configurator> localConfigurators = this.configurators; // local reference// 合并override參數(shù)this.overrideDirectoryUrl = directoryUrl;if (localConfigurators != null && localConfigurators.size() > 0) {for (Configurator configurator : localConfigurators) {this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);}}// providers refreshInvoker(invokerUrls); }/*** 根據(jù)invokerURL列表轉(zhuǎn)換為invoker列表*/ private void refreshInvoker(List<URL> invokerUrls){//......Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 將URL列表轉(zhuǎn)成Invoker列表//...... }/*** 合并url參數(shù) 順序?yàn)閛verride > -D >Consumer > Provider*/ private Map<String, Invoker<T>> toInvokers(List<URL> urls) {//......URL url = mergeUrl(providerUrl);//...... }在 dolist() 方法中,如果通過(guò)服務(wù)治理禁止 Consumer 訪問的話,此處直接拋出響應(yīng)的異常。
RegistryDirectory?實(shí)現(xiàn)了 NotifyListener,在 ZK 節(jié)點(diǎn)變化時(shí)能收到通知更新內(nèi)存緩存,其中 RegistryDirectory#mergeUrl()?方法中會(huì)按照優(yōu)先級(jí)合并參數(shù)(動(dòng)態(tài)配置在此處生效)。
服務(wù)引用時(shí)從內(nèi)存緩存中獲取并返回invoker列表,并根據(jù)路由規(guī)則再進(jìn)行一次過(guò)濾。
2.3.3 Router
Router 的作用就是從 Directory 的 invoker 列表中刷選出符合路由規(guī)則的 invoker 子集。目前 Dubbo 提供了基于IP、應(yīng)用名和協(xié)議等的靜態(tài)路由功能,功能和實(shí)現(xiàn)比較簡(jiǎn)單,在此不做過(guò)多解釋。
2.3.4 LoadBalance
通過(guò) Direcotry 和 Router 之后,返回的是可用的 invoker 子集;在發(fā)起服務(wù)調(diào)用時(shí),需要通過(guò) LoadBalance 選擇最終的一個(gè)目標(biāo) invoker。
在上面的 Cluster 章節(jié)中我們知道,調(diào)用時(shí)首先會(huì)經(jīng)過(guò) MockerClusterInvoker?攔截 Mock 設(shè)置,然后再根據(jù)配置調(diào)用實(shí)際的 Invoker(默認(rèn)是 FailoverClusterInvoker)。
FailoverClusterInvoker?繼承 AbstractClusterInvoker,在 AbstractClusterInvoker?中:
AbstractClusterInvoker.java
FailoverClusterInvoker.java
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {List<Invoker<T>> copyinvokers = invokers;checkInvokers(copyinvokers, invocation);int len = getUrl().getMethodParameter(invocation.getMethodName(), Constants.RETRIES_KEY, Constants.DEFAULT_RETRIES) + 1;if (len <= 0) {len = 1;}// retry loop.RpcException le = null; // last exception.List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyinvokers.size());Set<String> providers = new HashSet<String>(len);for (int i = 0; i < len; i++) {//重試時(shí),進(jìn)行重新選擇,避免重試時(shí)invoker列表已發(fā)生變化.//注意:如果列表發(fā)生了變化,那么invoked判斷會(huì)失效,因?yàn)閕nvoker示例已經(jīng)改變if (i > 0) {checkWheatherDestoried();copyinvokers = list(invocation);//重新檢查一下 checkInvokers(copyinvokers, invocation);}Invoker<T> invoker = select(loadbalance, invocation, copyinvokers, invoked);invoked.add(invoker);RpcContext.getContext().setInvokers((List)invoked);try {Result result = invoker.invoke(invocation);if (le != null && logger.isWarnEnabled()) {logger.warn("", le);}return result;} catch (RpcException e) {if (e.isBiz()) { // biz exception.throw e;}le = e;} catch (Throwable e) {le = new RpcException(e.getMessage(), e);} finally {providers.add(invoker.getUrl().getAddress());}}throw new RpcException(); }在調(diào)用或重試時(shí),每次都通過(guò) LoadBalance 選出一個(gè) Invoker 進(jìn)行調(diào)用。
至此,調(diào)用流程結(jié)束。
?
轉(zhuǎn)載于:https://www.cnblogs.com/cyfonly/p/9079228.html
總結(jié)
以上是生活随笔為你收集整理的Dubbo原理和源码解析之服务引用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 期转现是指什么
- 下一篇: 退市的股票有重新上市的吗