dubbo-spi源码解析(1)

未来已来2018-11-25 11:02

此文已由作者赵计刚薪授权网易云社区发布。

欢迎访问网易云社区,了解更多网易技术产品运营经验。


 1 package com.alibaba.dubbo.demo.test;
 2 
 3 import com.alibaba.dubbo.common.extension.ExtensionLoader;
 4 import com.alibaba.dubbo.rpc.Protocol;
 5 
 6 public class TestExtension {
 7     public static void main(String[] args) {
 8         ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
 9         final Protocol dubboProtocol = loader.getExtension("dubbo");
10         final Protocol adaptiveExtension = loader.getAdaptiveExtension();
11     }
12 }

讲解这三行代码的源码。

一  Protocol接口的定义

 1 package com.alibaba.dubbo.rpc;
 2 
 3 import com.alibaba.dubbo.common.URL;
 4 import com.alibaba.dubbo.common.extension.Adaptive;
 5 import com.alibaba.dubbo.common.extension.SPI;
 6 
 7 /**
 8  * Protocol. (API/SPI, Singleton, ThreadSafe)
 9  */
10 @SPI("dubbo")
11 public interface Protocol {
12 
13     /**
14      * 获取缺省端口,当用户没有配置端口时使用。
15      *
16      * @return 缺省端口
17      */
18     int getDefaultPort();
19 
20     /**
21      * 暴露远程服务:<br>
22      * 1. 协议在接收请求时,应记录请求来源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
23      * 2. export()必须是幂等的,也就是暴露同一个URL的Invoker两次,和暴露一次没有区别。<br>
24      * 3. export()传入的Invoker由框架实现并传入,协议不需要关心。<br>
25      *
26      * @param <T>     服务的类型
27      * @param invoker 服务的执行体
28      * @return exporter 暴露服务的引用,用于取消暴露
29      * @throws RpcException 当暴露服务出错时抛出,比如端口已占用
30      */
31     @Adaptive
32     <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
33 
34     /**
35      * 引用远程服务:<br>
36      * 1. 当用户调用refer()所返回的Invoker对象的invoke()方法时,协议需相应执行同URL远端export()传入的Invoker对象的invoke()方法。<br>
37      * 2. refer()返回的Invoker由协议实现,协议通常需要在此Invoker中发送远程请求。<br>
38      * 3. 当url中有设置check=false时,连接失败不能抛出异常,并内部自动恢复。<br>
39      *
40      * @param <T>  服务的类型
41      * @param type 服务的类型
42      * @param url  远程服务的URL地址
43      * @return invoker 服务的本地代理
44      * @throws RpcException 当连接服务提供方失败时抛出
45      */
46     @Adaptive
47     <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
48 
49     /**
50      * 释放协议:<br>
51      * 1. 取消该协议所有已经暴露和引用的服务。<br>
52      * 2. 释放协议所占用的所有资源,比如连接和端口。<br>
53      * 3. 协议在释放后,依然能暴露和引用新的服务。<br>
54      */
55     void destroy();
56 }

注意:这里有两个核心注解

  • @SPI:指定一个接口为SPI接口(可扩展接口)
    1 @Documented
    2 @Retention(RetentionPolicy.RUNTIME)
    3 @Target({ElementType.TYPE})
    4 public @interface SPI {
    5     /** 缺省扩展点名 */
    6     String value() default "";
    7 }

     

  • @Adaptive该注解可以注解在两个地方:

    • 接口上:例如AdaptiveExtensionFactory(该类不是工厂类,有特殊的逻辑)  AdaptiveCompiler(实际上也是工厂类,但是不能靠动态生成,否则会形成死循环)
    • 接口的方法上:会动态生成相应的动态类(实际上是一个工厂类,工厂设计模式),例如Protocol$Adapter

这个接口极其重要,后续的整个服务暴露和服务调用会用到该接口的两个方法。

 

二 获取ExtensionLoader

1 ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);

ExtensionLoader可以类比为JDK-SPI中的ServiceLoader。

首先来看一下ExtensionLoader的类属性:

 1     /** 存放SPI文件的三个目录,其中META-INF/services/也是jdk的SPI文件的存放目录 */
 2     private static final String SERVICES_DIRECTORY = "META-INF/services/";
 3     private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
 4     private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";//这个是最终jar包中存放spi文件的位置
 5 
 6     private static final Pattern NAME_SEPARATOR = Pattern.compile("\\s*[,]+\\s*");
 7     /** key: SPI接口Class value: 该接口的ExtensionLoader */
 8     private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();
 9     /** key: SPI接口Class value: SPI实现类的对象实例 */
10     private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();

注意:上述的都是类属性,即所有该类的实例都共享。而后边的实例属性就属于每一个类的实例私有。

再来看一下ExtensionLoader的实例属性:

 1     /** SPI接口Class */
 2     private final Class<?> type;
 3     /** SPI实现类对象实例的创建工厂 */
 4     private final ExtensionFactory objectFactory;
 5     /** key: ExtensionClass的Class value: SPI实现类的key */
 6     private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<Class<?>, String>();
 7     /** 存放所有的extensionClass */
 8     private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();
 9 
10     private final Map<String, Activate> cachedActivates = new ConcurrentHashMap<String, Activate>();
11     /** 缓存创建好的extensionClass实例 */
12     private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<String, Holder<Object>>();
13     /** 缓存创建好的适配类实例 */
14     private final Holder<Object> cachedAdaptiveInstance = new Holder<Object>();
15     /** 存储类上带有@Adaptive注解的Class */
16     private volatile Class<?> cachedAdaptiveClass = null;
17     /** 默认的SPI文件中的key */
18     private String cachedDefaultName;
19     /** 存储在创建适配类实例这个过程中发生的错误 */
20     private volatile Throwable createAdaptiveInstanceError;
21     /** 存放具有一个type入参的构造器的实现类的Class对象 */
22     private Set<Class<?>> cachedWrapperClasses;
23     /** key :实现类的全类名  value: exception, 防止真正的异常被吞掉 */
24     private Map<String, IllegalStateException> exceptions = new ConcurrentHashMap<String, IllegalStateException>();

来看一下getExtensionLoader(Class<T> type)的源码:

 1     /**
 2      * 1 校验入参type:非空 + 接口 + 含有@SPI注解
 3      * 2 根据type接口从全局缓存EXTENSION_LOADERS中获取ExtensionLoader,如果有直接返回;如果没有,则先创建,之后放入缓存,最后返回
 4      */
 5     public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
 6         if (type == null)
 7             throw new IllegalArgumentException("Extension type == null");
 8         if (!type.isInterface()) {
 9             throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
10         }
11         if (!withExtensionAnnotation(type)) {
12             throw new IllegalArgumentException("Extension type(" + type +
13                     ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
14         }
15 
16         ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
17         if (loader == null) {
18             EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
19             loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
20         }
21         return loader;
22     }

创建ExtensionLoader:

1     private ExtensionLoader(Class<?> type) {
2         this.type = type;
3         objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
4     }

当前创建的ExtensionLoader对象(我们取名为ExtensionLoader对象1)的type是com.alibaba.dubbo.rpc.Protocol,所以此时会执行:ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension()。

首先是创建ExtensionFactory,通过上边核心类部分ExtensionFactory接口的源码可以看出,此类也是一个SPI接口类,且没有指定默认的实现类的key。

1 ExtensionLoader.getExtensionLoader(ExtensionFactory.class)

下面的代码与上述的过程相似,只是此时创建的另外一个ExtensionLoader对象(我们取名为ExtensionLoader对象2)的type是com.alibaba.dubbo.common.extension.ExtensionFactory,而objectFactory是null。之后,这个ExtensionLoader对象2被放入EXTENSION_LOADERS缓存。这里给出ExtensionFactory的定义,该类也极其重要。//TODO

1 package com.alibaba.dubbo.common.extension;
2 
3 @SPI
4 public interface ExtensionFactory {
5     <T> T getExtension(Class<T> type, String name);
6 }

之后执行ExtensionLoader对象2的getAdaptiveExtension()方法。

 1     /**
 2      * 首先从cachedAdaptiveInstance缓存中获取AdaptiveExtension实例
 3      * 如果不为null, 直接返回;
 4      * 如果为null, 先创建AdaptiveExtension实例, 之后放入cachedAdaptiveInstance缓存中,最后返回
 5      */
 6     public T getAdaptiveExtension() {
 7         Object instance = cachedAdaptiveInstance.get();
 8         if (instance == null) {
 9             if (createAdaptiveInstanceError == null) {
10                 synchronized (cachedAdaptiveInstance) {
11                     instance = cachedAdaptiveInstance.get();
12                     if (instance == null) {
13                         try {
14                             instance = createAdaptiveExtension();
15                             cachedAdaptiveInstance.set(instance);
16                         } catch (Throwable t) {
17                             createAdaptiveInstanceError = t;
18                             throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
19                         }
20                     }
21                 }
22             } else {
23                 throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
24             }
25         }
26 
27         return (T) instance;
28     }

来看createAdaptiveExtension()创建AdaptiveExtension的源码:

 1     /**
 2      * createAdaptiveExtension()
 3      * --getAdaptiveExtensionClass()
 4      *   //从dubbo-spi配置文件中获取AdaptiveExtensionClass
 5      *   --getExtensionClasses()
 6      *     --loadExtensionClasses()
 7      *       --loadFile(Map<String, Class<?>> extensionClasses, String dir)
 8      *   //创建动态代理类
 9      *   --createAdaptiveExtensionClass()
10      *
11      * --injectExtension(T instance)  //dubbo-ioc
12      */
13     private T createAdaptiveExtension() {
14         try {
15             return injectExtension((T) getAdaptiveExtensionClass().newInstance());
16         } catch (Exception e) {
17             throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
18         }
19     }

调用层级看注释。injectExtension(T instance)方法只对objectFactory有用,如果objectFactory==null,则直接返回T instance。所以这里返回的是getAdaptiveExtensionClass().newInstance()

来看getAdaptiveExtensionClass()的源码:

 1     /**
 2      * 获取ExtensionClasses和适配类
 3      * 如果实现类上带有@Adaptive注解,直接创建修饰类
 4      * 如果方法上带有@Adaptive注解,动态生成代理类
 5      */
 6     private Class<?> getAdaptiveExtensionClass() {
 7         getExtensionClasses();
 8         if (cachedAdaptiveClass != null) {
 9             return cachedAdaptiveClass;
10         }
11         return cachedAdaptiveClass = createAdaptiveExtensionClass();
12     }

现在来看getExtensionClasses():

 1     /**
 2      * 先从cachedClasses缓存中获取所有的ExtensionClass,如果有,直接返回;
 3      * 如果没有,通过loadExtensionClasses()从SPI文件中去读取,之后写入缓存
 4      */
 5     private Map<String, Class<?>> getExtensionClasses() {
 6         Map<String, Class<?>> classes = cachedClasses.get();
 7         if (classes == null) {
 8             synchronized (cachedClasses) {
 9                 classes = cachedClasses.get();
10                 if (classes == null) {
11                     classes = loadExtensionClasses();
12                     cachedClasses.set(classes);
13                 }
14             }
15         }
16         return classes;
17     }

现在来看loadExtensionClasses()

 1     /**
 2      * 1 从@SPI注解中将默认值解析出来,并缓存到cachedDefaultName中
 3      * 2 从SPI文件中获取extensionClass并存储到extensionClasses中,最后返回extensionClasses
 4      * 注意:此方法已经getExtensionClasses方法同步过。
 5      */
 6     private Map<String, Class<?>> loadExtensionClasses() {
 7         final SPI defaultAnnotation = type.getAnnotation(SPI.class);
 8         if (defaultAnnotation != null) {
 9             String value = defaultAnnotation.value();
10             if (value != null && (value = value.trim()).length() > 0) {
11                 String[] names = NAME_SEPARATOR.split(value);
12                 if (names.length > 1) {
13                     throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
14                             + ": " + Arrays.toString(names));
15                 }
16                 if (names.length == 1) cachedDefaultName = names[0];
17             }
18         }
19 
20         Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
21         loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
22         loadFile(extensionClasses, DUBBO_DIRECTORY);
23         loadFile(extensionClasses, SERVICES_DIRECTORY);
24         return extensionClasses;
25     }


免费体验云安全(易盾)内容安全、验证码等服务

更多网易技术、产品、运营经验分享请点击