Dubbo源码分析(三)Dubbo中的SPI和自适应扩展机制
前言
我們在往期文章中,曾經深入分析過Java的SPI機制,它是一種服務發現機制。具體詳見:深入理解JDK的SPI機制
在繼續深入Dubbo之前,我們必須先要明白Dubbo中的SPI機制。因為有位大神(佚名)曾這樣說過:
要想理解Dubbo,必須要先搞明白Dubbo SPI機制,不然會非常暈。
一、背景
1、來源
Dubbo 的擴展點加載從 JDK 標準的 SPI (Service Provider Interface) 擴展點發現機制加強而來。但還有所不同,它改進了JDK標準的 SPI的以下問題:
-
JDK 標準的 SPI 會一次性實例化擴展點所有實現,如果有擴展實現初始化很耗時,但如果沒用上也加載,會很浪費資源。
-
如果擴展點加載失敗,連擴展點的名稱都拿不到了。比如:JDK 標準的 ScriptEngine,通過 getName() 獲取腳本類型的名稱,但如果 RubyScriptEngine 因為所依賴的 jruby.jar 不存在,導致 RubyScriptEngine 類加載失敗,這個失敗原因被吃掉了,和 ruby 對應不起來,當用戶執行 ruby 腳本時,會報不支持 ruby,而不是真正失敗的原因。
-
增加了對擴展點 IoC 和 AOP 的支持,一個擴展點可以直接 setter 注入其它擴展點。
2、約定
在擴展類的 jar 包內,放置擴展點配置文件?META-INF/dubbo/接口全限定名,內容為:配置名=擴展實現類全限定名,多個實現類用換行符分隔。
3、配置文件
Dubbo SPI 所需的配置文件需放置在 META-INF/dubbo 路徑下,幾乎所有的功能都有擴展點實現。
我們以Protocol接口為例,它里面有很多實現。
二、Dubbo SPI
通過上圖我們可以看到,Dubbo SPI和JDK SPI配置的不同,在Dubbo SPI中可以通過鍵值對的方式進行配置,這樣就可以按需加載指定的實現類。 Dubbo SPI的相關邏輯都被封裝到ExtensionLoader類中,通過ExtensionLoader我們可以加載指定的實現類,一個擴展接口就對應一個ExtensionLoader對象,在這里我們把它親切的稱為:擴展點加載器。
我們先看下它的屬性:
public class ExtensionLoader<T> {//擴展點配置文件的路徑,可以從3個地方加載到擴展點配置文件private static final String SERVICES_DIRECTORY = "META-INF/services/";private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/"; //擴展點加載器的集合private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();//擴展點實現的集合private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();//擴展點名稱和實現的映射緩存private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<Class<?>, String>();//拓展點實現類集合緩存private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();//擴展點名稱和@Activate的映射緩存private final Map<String, Activate> cachedActivates = new ConcurrentHashMap<String, Activate>();//擴展點實現的緩存private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<String, Holder<Object>>(); } 復制代碼ExtensionLoader會把不同的擴展點配置和實現都緩存起來。同時,Dubbo在官網上也給了我們提醒:擴展點使用單一實例加載(請確保擴展實現的線程安全性),緩存在 ExtensionLoader中。下面我們看幾個重點方法。
1、獲取擴展點加載器
我們首先通過ExtensionLoader.getExtensionLoader() 方法獲取一個 ExtensionLoader 實例,它就是擴展點加載器。然后再通過 ExtensionLoader 的 getExtension 方法獲取拓展類對象。這其中,getExtensionLoader 方法用于從緩存中獲取與拓展類對應的 ExtensionLoader,若緩存未命中,則創建一個新的實例。
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {if (type == null)throw new IllegalArgumentException("Extension type == null");if (!type.isInterface()) {throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");}if (!withExtensionAnnotation(type)) {throw new IllegalArgumentException("Extension type(" + type +") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");}ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);if (loader == null) {EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);}return loader; } 復制代碼比如你可以通過下面這樣,來獲取Protocol接口的ExtensionLoader實例: ExtensionLoader<Protocol> extensionLoader = ExtensionLoader.getExtensionLoader(Protocol.class); 就可以拿到擴展點加載器的對象實例: com.alibaba.dubbo.common.extension.ExtensionLoader[com.alibaba.dubbo.rpc.Protocol]
2、獲取擴展類對象
上一步我們已經拿到加載器,然后可以根據加載器實例,通過擴展點的名稱獲取擴展類對象。
public T getExtension(String name) {//校驗擴展點名稱的合法性if (name == null || name.length() == 0)throw new IllegalArgumentException("Extension name == null");// 獲取默認的拓展實現類if ("true".equals(name)) {return getDefaultExtension();}//用于持有目標對象Holder<Object> holder = cachedInstances.get(name);if (holder == null) {cachedInstances.putIfAbsent(name, new Holder<Object>());holder = cachedInstances.get(name);}Object instance = holder.get();if (instance == null) {synchronized (holder) {instance = holder.get();if (instance == null) {instance = createExtension(name);holder.set(instance);}}}return (T) instance; } 復制代碼它先嘗試從緩存中獲取,未命中則創建擴展對象。那么它的創建過程是怎樣的呢?
private T createExtension(String name) {//從配置文件中獲取所有的擴展類,Map數據結構//然后根據名稱獲取對應的擴展類Class<?> clazz = getExtensionClasses().get(name);if (clazz == null) {throw findException(name);}try {//通過反射創建實例,然后放入緩存T instance = (T) EXTENSION_INSTANCES.get(clazz);if (instance == null) {EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());instance = (T) EXTENSION_INSTANCES.get(clazz);}//注入依賴injectExtension(instance);Set<Class<?>> wrapperClasses = cachedWrapperClasses;if (wrapperClasses != null && !wrapperClasses.isEmpty()) {// 包裝為Wrapper實例for (Class<?> wrapperClass : wrapperClasses) {instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));}}return instance;} catch (Throwable t) {throw new IllegalStateException("Extension instance(name: " + name + ", class: " +type + ") could not be instantiated: " + t.getMessage(), t);} } 復制代碼這里的重點有兩個,依賴注入和Wrapper包裝類,它們是Dubbo中IOC 與 AOP 的具體實現。
2.1、依賴注入
向拓展對象中注入依賴,它會獲取類的所有方法。判斷方法是否以 set 開頭,且方法僅有一個參數,且方法訪問級別為 public,就通過反射設置屬性值。所以說,Dubbo中的IOC僅支持以setter方式注入。
private T injectExtension(T instance) {try {if (objectFactory != null) {for (Method method : instance.getClass().getMethods()) {if (method.getName().startsWith("set")&& method.getParameterTypes().length == 1&& Modifier.isPublic(method.getModifiers())) {Class<?> pt = method.getParameterTypes()[0];try {String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";Object object = objectFactory.getExtension(pt, property);if (object != null) {method.invoke(instance, object);}} catch (Exception e) {logger.error("fail to inject via method " + method.getName()+ " of interface " + type.getName() + ": " + e.getMessage(), e);}}}}} catch (Exception e) {logger.error(e.getMessage(), e);}return instance; } 復制代碼2.2、Wrapper
它會將當前 instance 作為參數傳給 Wrapper 的構造方法,并通過反射創建 Wrapper 實例。 然后向 Wrapper 實例中注入依賴,最后將 Wrapper 實例再次賦值給 instance 變量。說起來可能比較繞,我們直接看下它最后生成的對象就明白了。 我們以DubboProtocol為例,它包裝后的對象為:
綜上所述,如果我們獲取一個擴展類對象,最后拿到的就是這個Wrapper類的實例。 就像這樣:
ExtensionLoader<Protocol> extensionLoader = ExtensionLoader.getExtensionLoader(Protocol.class); Protocol extension = extensionLoader.getExtension("dubbo"); System.out.println(extension); 復制代碼輸出為:com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper@4cdf35a9
3、獲取所有的擴展類
在我們通過名稱獲取擴展類對象之前,首先需要根據配置文件解析出所有的擴展類。 它是一個擴展點名稱和擴展類的映射表Map<String, Class<?>>
首先,還是從緩存中cachedClasses獲取,如果沒有就調用loadExtensionClasses從配置文件中加載。配置文件有三個路徑:
META-INF/services/ META-INF/dubbo/ META-INF/dubbo/internal/
先嘗試從緩存中獲取。
private Map<String, Class<?>> getExtensionClasses() {//從緩存中獲取Map<String, Class<?>> classes = cachedClasses.get();if (classes == null) {synchronized (cachedClasses) {classes = cachedClasses.get();if (classes == null) {//加載擴展類classes = loadExtensionClasses();cachedClasses.set(classes);}}}return classes; } 復制代碼如果沒有,就調用loadExtensionClasses從配置文件中讀取。
private Map<String, Class<?>> loadExtensionClasses() {//獲取 SPI 注解,這里的 type 變量是在調用 getExtensionLoader 方法時傳入的final SPI defaultAnnotation = type.getAnnotation(SPI.class);if (defaultAnnotation != null) {String value = defaultAnnotation.value();if ((value = value.trim()).length() > 0) {String[] names = NAME_SEPARATOR.split(value);if (names.length > 1) {throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()+ ": " + Arrays.toString(names));}//設置默認的擴展名稱,參考getDefaultExtension 方法//如果名稱為true,就是調用默認擴贊類if (names.length == 1) cachedDefaultName = names[0];}}//加載指定路徑的配置文件Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY);loadDirectory(extensionClasses, DUBBO_DIRECTORY);loadDirectory(extensionClasses, SERVICES_DIRECTORY);return extensionClasses; } 復制代碼以Protocol接口為例,獲取到的實現類集合如下,我們就可以根據名稱加載具體的擴展類對象。
{registry=class com.alibaba.dubbo.registry.integration.RegistryProtocolinjvm=class com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocolthrift=class com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocolmock=class com.alibaba.dubbo.rpc.support.MockProtocoldubbo=class com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocolhttp=class com.alibaba.dubbo.rpc.protocol.http.HttpProtocolredis=class com.alibaba.dubbo.rpc.protocol.redis.RedisProtocolrmi=class com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol } 復制代碼三、自適應擴展機制
在Dubbo中,很多拓展都是通過 SPI 機制進行加載的,比如 Protocol、Cluster、LoadBalance 等。這些擴展并非在框架啟動階段就被加載,而是在擴展方法被調用的時候,根據URL對象參數進行加載。 那么,Dubbo就是通過自適應擴展機制來解決這個問題。
自適應拓展機制的實現邏輯是這樣的: 首先 Dubbo 會為拓展接口生成具有代理功能的代碼。然后通過 javassist 或 jdk 編譯這段代碼,得到 Class 類。最后再通過反射創建代理類,在代理類中,就可以通過URL對象的參數來確定到底調用哪個實現類。
1、Adaptive注解
在開始之前,我們有必要先看一下與自適應拓展息息相關的一個注解,即 Adaptive 注解。
@Documented @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE, ElementType.METHOD}) public @interface Adaptive {String[] value() default {}; } 復制代碼從上面的代碼中可知,Adaptive 可注解在類或方法上。
- 標注在類上 Dubbo 不會為該類生成代理類。
- 標注在方法上 Dubbo 則會為該方法生成代理邏輯,表示當前方法需要根據 參數URL 調用對應的擴展點實現。
2、獲取自適應拓展類
getAdaptiveExtension 方法是獲取自適應拓展的入口方法。
public T getAdaptiveExtension() {// 從緩存中獲取自適應拓展Object instance = cachedAdaptiveInstance.get();if (instance == null) {if (createAdaptiveInstanceError == null) {synchronized (cachedAdaptiveInstance) {instance = cachedAdaptiveInstance.get();//未命中緩存,則創建自適應拓展,然后放入緩存if (instance == null) {try {instance = createAdaptiveExtension();cachedAdaptiveInstance.set(instance);} catch (Throwable t) {createAdaptiveInstanceError = t;throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);}}}}}return (T) instance; } 復制代碼getAdaptiveExtension方法首先會檢查緩存,緩存未命中,則調用 createAdaptiveExtension方法創建自適應拓展。
private T createAdaptiveExtension() {try {return injectExtension((T) getAdaptiveExtensionClass().newInstance());} catch (Exception e) {throw new IllegalStateException("Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);} } 復制代碼這里的代碼較少,調用 getAdaptiveExtensionClass方法獲取自適應拓展 Class 對象,然后通過反射實例化,最后調用injectExtension方法向拓展實例中注入依賴。 獲取自適應擴展類過程如下:
private Class<?> getAdaptiveExtensionClass() {//獲取當前接口的所有實現類//如果某個實現類標注了@Adaptive,此時cachedAdaptiveClass不為空getExtensionClasses();if (cachedAdaptiveClass != null) {return cachedAdaptiveClass;}//以上條件不成立,就創建自適應拓展類return cachedAdaptiveClass = createAdaptiveExtensionClass(); } 復制代碼在上面方法中,它會先獲取當前接口的所有實現類,如果某個實現類標注了@Adaptive,那么該類就被賦值給cachedAdaptiveClass變量并返回。如果沒有,就調用createAdaptiveExtensionClass創建自適應拓展類。
3、創建自適應拓展類
createAdaptiveExtensionClass方法用于生成自適應拓展類,該方法首先會生成自適應拓展類的源碼,然后通過 Compiler 實例(Dubbo 默認使用 javassist 作為編譯器)編譯源碼,得到代理類 Class 實例。
private Class<?> createAdaptiveExtensionClass() {//構建自適應拓展代碼String code = createAdaptiveExtensionClassCode();ClassLoader classLoader = findClassLoader();// 獲取編譯器實現類 這個Dubbo默認是采用javassist Compiler compiler =ExtensionLoader.getExtensionLoader(Compiler.class).getAdaptiveExtension();//編譯代碼,返回類實例的對象return compiler.compile(code, classLoader); } 復制代碼在生成自適應擴展類之前,Dubbo會檢查接口方法是否包含@Adaptive。如果方法上都沒有此注解,就要拋出異常。
if (!hasAdaptiveAnnotation){throw new IllegalStateException("No adaptive method on extension " + type.getName() + ", refuse to create the adaptive class!"); } 復制代碼我們還是以Protocol接口為例,它的export()和refer()方法,都標注為@Adaptive。destroy和 getDefaultPort未標注 @Adaptive注解。Dubbo 不會為沒有標注 Adaptive 注解的方法生成代理邏輯,對于該種類型的方法,僅會生成一句拋出異常的代碼。
package com.alibaba.dubbo.rpc; import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.extension.Adaptive; import com.alibaba.dubbo.common.extension.SPI;@SPI("dubbo") public interface Protocol {int getDefaultPort();@Adaptive<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;@Adaptive<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;void destroy(); } 復制代碼所以說當我們調用這兩個方法的時候,會先拿到URL對象中的協議名稱,再根據名稱找到具體的擴展點實現類,然后去調用。下面是生成自適應擴展類實例的源代碼:
package com.viewscenes.netsupervisor.adaptive;import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.common.extension.ExtensionLoader; import com.alibaba.dubbo.rpc.Exporter; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.Protocol; import com.alibaba.dubbo.rpc.RpcException;public class Protocol$Adaptive implements Protocol {public void destroy() {throw new UnsupportedOperationException("method public abstract void Protocol.destroy() of interface Protocol is not adaptive method!");}public int getDefaultPort() {throw new UnsupportedOperationException("method public abstract int Protocol.getDefaultPort() of interface Protocol is not adaptive method!");}public Exporter export(Invoker invoker)throws RpcException {if (invoker == null) {throw new IllegalArgumentException("Invoker argument == null");}if (invoker.getUrl() == null) {throw new IllegalArgumentException("Invoker argument getUrl() == null");}URL url = invoker.getUrl();String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());if (extName == null) {throw new IllegalStateException("Fail to get extension(Protocol) name from url("+ url.toString() + ") use keys([protocol])");}Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);return extension.export(invoker);}public Invoker refer(Class clazz,URL ur)throws RpcException {if (ur == null) {throw new IllegalArgumentException("url == null");}URL url = ur;String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());if (extName == null) {throw new IllegalStateException("Fail to get extension(Protocol) name from url("+ url.toString() + ") use keys([protocol])");}Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);return extension.refer(clazz, url);} } 復制代碼綜上所述,當我們獲取某個接口的自適應擴展類,實際就是一個Adaptive類實例。
ExtensionLoader<Protocol> extensionLoader = ExtensionLoader.getExtensionLoader(Protocol.class); Protocol adaptiveExtension = extensionLoader.getAdaptiveExtension(); System.out.println(adaptiveExtension); 復制代碼輸出為: com.alibaba.dubbo.rpc.Protocol$Adaptive@47f6473
四、實例
我們看完以上流程之后,如果想寫一套自己的邏輯替換Dubbo中的流程,就變得很簡單。
Dubbo默認使用dubbo協議來暴露服務。我們可以搞一個自定義的協議來替換它。
1、實現類
首先,我們創建一個MyProtocol類,它實現Protocol接口。
package com.viewscenes.netsupervisor.protocol;import com.alibaba.dubbo.common.URL; import com.alibaba.dubbo.rpc.Exporter; import com.alibaba.dubbo.rpc.Invoker; import com.alibaba.dubbo.rpc.Protocol; import com.alibaba.dubbo.rpc.RpcException; import com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol;public class MyProtocol extends DubboProtocol implements Protocol{public int getDefaultPort() {return 28080;}public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException { URL url = invoker.getUrl();System.out.println("自定義協議,進行服務暴露:"+url); return super.export(invoker);}public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {return super.refer(type, url);}public void destroy() {} } 復制代碼2、擴展點配置文件
然后,在自己的項目中META-INF/services創建com.alibaba.dubbo.rpc.Protocol文件,文件內容為: myProtocol=com.viewscenes.netsupervisor.protocol.MyProtocol
3、修改Dubbo配置文件
最后修改生產者端的配置文件:
<!-- 用自定義協議在20880端口暴露服務 --> <dubbo:protocol name="myProtocol" port="20880"/> 復制代碼這樣在我們啟動生產者端項目的時候,Dubbo在進行服務暴露的過程中,就會調用到我們自定義的MyProtocol類,完成相應的邏輯處理。
轉載于:https://juejin.im/post/5c909949e51d450fae18deb8
總結
以上是生活随笔為你收集整理的Dubbo源码分析(三)Dubbo中的SPI和自适应扩展机制的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Git cherry-pick 使用总结
- 下一篇: 阿里巴巴开源OpenJDK长期支持版本,