dubbo-基于SPI的自适应扩展机制

复习总结:

1、 自适应扩展机制是为了能够在支持多协议的dubbo框架中根据配置灵活的加载对象,主要使用到了@SPI@Adaptive两个注解

2、@SPI注解在扩展点接口之上,可以指定默认的实现,例如@SPI("dubbo")

3、@Adaptive用于指定扩展点接口的自适应代理类,这个代理类可以是静态的(注解在类上),也可以是代码中生成的(注解在扩展点接口方法上),注意这里代理类的生成是生成一个string类型的code源码,再通过javasisit编译成class。

4、自适应代理类中并没有真的实现接口方法,而是创建出合适的扩展点实现,将调用委托给具体的扩展点实例。


源码分析入口:

获取扩展类实例的调用语句如下

ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("myProtocol")

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// ExtensionLoader.class
private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>(64);

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
if (type == null) { //传入的接口类型为null 抛异常
throw new IllegalArgumentException("Extension type == null");
}
if (!type.isInterface()) {// 传入的类型不是接口,抛异常
throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");
}
if (!withExtensionAnnotation(type)) {// 传入的接口没有注解@SPI,抛异常
throw new IllegalArgumentException("Extension type (" + type +
") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");
}

ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
if (loader == null) {
EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
}
return loader;
}

可以看到dubbo将所有的接口的ExtensionLoader都缓存到了一个hashmap中,获取Loader时先尝试从map中获取,没有获取到在创建一个存入map中并返回。

关系:每个被@SPI注解的接口都会对应一个ExtensionLoader

取到loader后,使用loader.getExtension获取指定名称的扩展:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public T getExtension(String name) {
...
if ("true".equals(name)) {
return getDefaultExtension();
}
final Holder<Object> holder = getOrCreateHolder(name);
Object instance = holder.get();
if (instance == null) {
synchronized (holder) {
instance = holder.get();
if (instance == null) {
instance = createExtension(name); // >>
holder.set(instance);
}
}
}
return (T) instance;
}

关系:每一条扩展(key->value)对应一个Holder,由于不同名称的扩展返回不同的类型的对象,所以此处用Holder封装返回的扩展对象,将具体的扩展类型泛型化,这样就可以统一返回类型。

获取扩展的逻辑:

1、 先获取缓存中的Holder,没有就创建一个空的holder放入缓存。

2、 从holder中获取扩展类型实例,没有获取到(刚创建没有实例),则创建实例,放入holder中并返回实例。

跟进创建扩展实例的方法:

createExtension(name)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
private T createExtension(String name) {
Class<?> clazz = getExtensionClasses().get(name);// 根据协议名称获取类对象
if (clazz == null) {
throw findException(name);
}
try { // 缓存实例
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance); // 依赖注入,若实例化的扩展类中有属性也是扩展点,则实例化该扩展点使用反射方式注入当前实例
Set<Class<?>> wrapperClasses = cachedWrapperClasses;
if (CollectionUtils.isNotEmpty(wrapperClasses)) {
for (Class<?> wrapperClass : wrapperClasses) {
instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
}
}
initExtension(instance); //
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance (name: " + name + ", class: " +
type + ") couldn't be instantiated: " + t.getMessage(), t);
}
}

方法getExtensionClasses()获取当前ExtensionLoader的所有扩展(key->value),ExtensionLoader是和接口绑定的,比如说此时正在加载Protocol.class接口的扩展,那么该方法将获取所有的Protocol接口的扩展。见下图:

返回的是Map<String,Class<?>>类型结果,既然是Class那么这些扩展类都已加载到了虚拟机中,之前提到的JDK标准的SPI其中一点不足就是全部加载,那么dubbo这里也是全部加载啊,优化在哪里?

这里说的加载实现类,其实指的的实例化扩展类。JDK的SPI在读取配置文件之后,将扩展类加载到虚拟机,并立即执行实例化操作。而dubbo只是将类加载进虚拟机,并且设计了一套自适应机制,在运行时实例化扩展类,也就是只有配置的扩展类才会被实例化。

在回到源码中:获取到这个map之后,根据name值myProtocal,获取到本次需要扩展点类型,是MyProtocol.class

接下来,尝试从缓存中获取扩展类型的实例,获取不到则使用clazz.newInstance()创建一个实例,放入到缓存。

到此为止,已经处出现了两处使用缓存的场景

  1. 缓存所有扩展点接口对应的ExtensionLoader 使用 ConcurrentMap<Class<?>, ExtensionLoader<?>>存储,Key是扩展点接口类型
  2. 缓存所有实例化后的扩展类对象 使用ConcurrentMap<Class<?>, Object>存储,Key是具体的扩展类型

接下来是injectExtension()这个方法,用来实现依赖注入,如果被加载的扩展类实例中,有成员属性本身也是一个扩展点,则会通过反射的方式进行注入。

注意:依赖注入是dubbo-spi的另一个优化点,突出的就是懒加载策略,包括后文将要分析的自适应扩展

最后是initExtension(instance)方法,如果扩展类型本身是一个Lifecycle类型,那么执行lifecycle.initialize()方法

至此,扩展类的实例就获取到了。

1. Dubbo的默认扩展

getExtension(name)方法中有一段代码

1
2
3
if ("true".equals(name)) {
return getDefaultExtension();
}

当name=true时,将获取默认的扩展,默认扩展是哪个呢?

跟进去看

1
2
3
4
5
6
7
public T getDefaultExtension() {
getExtensionClasses();
if (StringUtils.isBlank(cachedDefaultName) || "true".equals(cachedDefaultName)) {
return null;
}
return getExtension(cachedDefaultName);
}

根据 cachedDefaultName 默认的扩展名称获取扩展实例,那么默认的扩展名称在哪里初始化的呢?

在代码中搜索cachedDefaultName ,找到了 cacheDefaultExtensionName()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void cacheDefaultExtensionName() {
final SPI defaultAnnotation = type.getAnnotation(SPI.class);
if (defaultAnnotation == null) {
return;
}

String value = defaultAnnotation.value();
if ((value = value.trim()).length() > 0) {
String[] names = NAME_SEPARATOR.split(value);
if (names.length > 1) {
throw new IllegalStateException("More than 1 default extension name on extension " + type.getName()
+ ": " + Arrays.toString(names));
}
if (names.length == 1) {
cachedDefaultName = names[0];
}
}
}

原来是从@SPI注解中获取的默认扩展名称,再看Protocol接口:

1
2
3
4
@SPI("dubbo")
public interface Protocol {
...
}

那么Protocol接口默认的实现就是 dubbo也就是org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol

总结:

扩展点接口上的注解@SPI可以写带value,这个value指定了该扩展点接口的默认扩展名称,当我们使用“true”作为名称获取扩展类对象时,获取到的就是这个指定的默认扩展类型对象。

2. 自适应扩展

为什么需要自适应扩展?

有些扩展并不想在框架启动阶段被实例化,而是希望在某些方法被调用时,根据运行时参数再决定实例化哪个具体的扩展类。

什么叫自适应扩展点?

看一个例子:

1
2
3
4
public static void main(String[] args) {
Compiler compiler=ExtensionLoader.getExtensionLoader(Compiler.class).getAdaptiveExtension();
System.out.println(compiler.getClass().getName());
}

可以看到这句代码没有指定扩展名,它会返回一个AdaptiveCompiler,这个就叫做自适应。

怎么实现的呢?

根进AdaptiveCompiler,可以看到这个类上有一个@Adaptive注解

1
2
@Adaptive
public class AdaptiveCompiler implements Compiler {...}

@Adaptive这是自适应扩展类的标志,既可以修饰在类上,也可以修饰到方法上

2.1 注解@Adaptive

在对自适应拓展生成过程进行深入分析之前,我们先来看一下@Adaptive 注解。该注解的定义如下:

1
2
3
4
5
6
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
String[] value() default {};
}

Adaptive 可注解在类或方法上。

当 Adaptive 注解在类上时,Dubbo 不会为该类生成Adaptive代理类,因为注解了之后的类就是代理类(静态代理类)。

当 Adaptive 注解在方法(接口方法)上时,Dubbo 则会为该方法生成代理逻辑,生成代码默认使用javasisit进行编译。

Adaptive 注解在类上的情况很少,在 Dubbo 中,仅有两个类被 Adaptive 注解了,分别是 AdaptiveCompiler 和 AdaptiveExtensionFactory。此种情况,表示拓展的加载逻辑由人工编码完成。

更多时候,Adaptive 是注解在接口方法上的,表示拓展的加载逻辑需由框架自动生成。Adaptive 注解的地方不同,相应的处理逻辑也是不同的。注解在类上时,处理逻辑比较简单。注解在接口方法上时,处理逻辑较为复杂,本文将会重点分析此块逻辑。

再回到获取自适应扩展实例的逻辑中,从getAdaptiveExtension()方法进去:

1
2
3
4
5
public T getAdaptiveExtension() {
...
instance = createAdaptiveExtension();
...
}
1
2
3
4
5
6
7
private T createAdaptiveExtension() {
try {
// 先获取自适应扩展类,实例化;在对其进行依赖注入
return injectExtension((T) getAdaptiveExtensionClass().newInstance());
}
...
}
1
2
3
4
5
6
7
private Class<?> getAdaptiveExtensionClass() {
getExtensionClasses(); // 加载所有的扩展类
if (cachedAdaptiveClass != null) {
return cachedAdaptiveClass; // 如果有某个扩展类注解了@Adptive,返回这个类
}
return cachedAdaptiveClass = createAdaptiveExtensionClass(); // 没有自适应类,那就创造一个
}

此处有个判断:

  • 如果cachedAdaptiveClass!=null 返回cachedAdaptiveClass

  • 如果为空,则创建一个AdaptiveExtensionClass

这里先说一下结论:前者是处理类上的@Adaptive注解,后者是处理方法上的,下面逐个分析

2.2 类上注解@Adaptive

这里只要搞明白cachedAdaptiveClass什么时候初始化的就可以了,在代码中搜索cachedAdaptiveClass,发现了初始化的地方:

1
2
3
4
5
6
private void cacheAdaptiveClass(Class<?> clazz, boolean overridden) {
if (cachedAdaptiveClass == null || overridden) {
cachedAdaptiveClass = clazz;
}
...
}

在找cacheAdaptiveClass()方法的调用处,找到了

1
2
3
4
5
6
7
8
private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
boolean overridden) throws NoSuchMethodException {
...
if (clazz.isAnnotationPresent(Adaptive.class)) {
cacheAdaptiveClass(clazz, overridden);
}
...
}

loadClass()方法的调用处:

1
2
3
4
5
6
7
8
private void loadResource(Map<String, Class<?>> extensionClasses, ClassLoader classLoader,
java.net.URL resourceURL, boolean overridden, String... excludedPackages) {
...
if (line.length() > 0 && !isExcluded(line, excludedPackages)) {
loadClass(extensionClasses, resourceURL, Class.forName(line, true, classLoader), name, overridden);
}
...
}

继续往上找:

1
2
3
4
5
6
private void loadDirectory(Map<String, Class<?>> extensionClasses, String dir, String type,
boolean extensionLoaderClassLoaderFirst, boolean overridden, String... excludedPackages) {
...
loadResource(extensionClasses, classLoader, resourceURL, overridden, excludedPackages);
...
}

继续:

1
2
3
4
5
6
7
8
9
private Map<String, Class<?>> loadExtensionClasses() {
cacheDefaultExtensionName();
Map<String, Class<?>> extensionClasses = new HashMap<>();
for (LoadingStrategy strategy : strategies) {
loadDirectory(extensionClasses, strategy.directory(), type.getName(), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
loadDirectory(extensionClasses, strategy.directory(), type.getName().replace("org.apache", "com.alibaba"), strategy.preferExtensionClassLoader(), strategy.overridden(), strategy.excludedPackages());
}
return extensionClasses;
}

这个方法我们很熟悉了,是用来加载配置文件中的扩展条目的(key->value),

总结:

对每个扩展点接口,调用loadExtensionClasses加载所有扩展类的时候,会检查扩展类上是否注解了@Adaptive,如果有,则将这个类型对象赋值给cachedAdaptiveClass变量。

2.3 方法注解@Adaptive

再贴一下上面的一段代码

1
2
3
4
private Class<?> getAdaptiveExtensionClass() {
...
return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

通过createAdaptiveExtensionClass()方法创建一个AdaptiveExtensionClass,跟进方法:

1
2
3
4
5
6
private Class<?> createAdaptiveExtensionClass() {
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();
ClassLoader classLoader = findClassLoader();
org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
return compiler.compile(code, classLoader);
}

代码逻辑比较清晰,将生成一段java代码,然后获取自适应的Compiler去编译这段代码。

可以看到这里用到了自适应的Compiler,这个接口有 jdk 和 javassist 两种实现方式,@Spi注解中指定的默认实现是javassist

贴一下AdaptiveCompiler的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Adaptive
public class AdaptiveCompiler implements Compiler {

private static volatile String DEFAULT_COMPILER;

public static void setDefaultCompiler(String compiler) {
DEFAULT_COMPILER = compiler;
}

@Override
public Class<?> compile(String code, ClassLoader classLoader) {
Compiler compiler;
ExtensionLoader<Compiler> loader = ExtensionLoader.getExtensionLoader(Compiler.class);
String name = DEFAULT_COMPILER; // copy reference
if (name != null && name.length() > 0) {
compiler = loader.getExtension(name);
} else {
compiler = loader.getDefaultExtension();
}
return compiler.compile(code, classLoader);
}

}

如果没有指定compiler,使用默认的javasissit进行编译,否则使用指定的compiler,这就是自适应Compiler的逻辑。

所以这里的重点是在java代码的生成那一块,

1
String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();

首先创建了一个自适应类代码生成器,传进两个参数,接口类型type 和 默认的实现名(@SPI注解指定的名称),然后调用该生成器的generate()方法生成代码。代码自动生成的逻辑比较多,且复杂。

运行时加载,到底怎么理解呢,以Protocol接口为例,贴一段自动生成的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package com.viewscenes.netsupervisor.adaptive;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Protocol;
import com.alibaba.dubbo.rpc.RpcException;

public class Protocol$Adaptive implements Protocol {
public void destroy() {
throw new UnsupportedOperationException(
"method public abstract void Protocol.destroy() of "
+ "interface Protocol is not adaptive method!");
}
public int getDefaultPort() {
throw new UnsupportedOperationException(
"method public abstract int Protocol.getDefaultPort()"
+ "of interface Protocol is not adaptive method!");
}
/**
* export 方法什么时候调用?服务发布的时候调用,服务发布肯定会指定发布服务的协议名称、
* 注册中心地址等信息
*/
public Exporter export(Invoker invoker)throws RpcException {
if (invoker == null) {
throw new IllegalArgumentException("Invoker argument == null");
}
if (invoker.getUrl() == null) {
throw new IllegalArgumentException("Invoker argument getUrl() == null");
}

URL url = invoker.getUrl(); // 从调用此方法的Invoker获取url
// 如果url中没有指定协议,那么使用默认“dubbo”(之前构造代码生成器的时候传进来的)
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null) {
throw new IllegalStateException("Fail to get extension(Protocol) name from url("
+ url.toString() + ") use keys([protocol])");
}
// 使用指定的协议名或者默认协议名获取Protocol扩展实例
Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class)
.getExtension(extName);
// 调用具体扩展实现的export方法。
return extension.export(invoker);
}
/**
* refer 方法什么时候调用?注入远程服务时候调用,调用服务也要指定发布服务的协议名称、
* 注册中心地址等信息
*/
public Invoker refer(Class clazz,URL ur)throws RpcException {
if (ur == null) {
throw new IllegalArgumentException("url == null");
}
URL url = ur;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null) {
throw new IllegalStateException(
"Fail to get extension(Protocol) name from url(" +
url.toString() + ") use keys([protocol])");
}
// 使用指定的 或者 默认的 Protocol扩展名获取扩展实例
Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class)
.getExtension(extName);
// 调用具体实现类的refer方法
return extension.refer(clazz, url);
}
}

分析完对以上的代码,对自适应扩展机制应该有了一个基本的理解,自适应的Protocol$Adaptive,本身并没有具体的实现,只是根据运行时的配置,将自适应方法转发给配置的或者默认的实现类,可以算是一种代理模式的实现,这种设计非常灵活,可以借鉴。

激活扩展点

自动激活扩展点,有点类似我们讲 springboot 的时候用到的 conditional,根据条件进行自动激活。但是这里设计的初衷是,对

于一个类会加载多个扩展点的实现,这个时候可以通过自动激活扩展点进行动态加载, 从而简化我们的配置工作

@Activate 提供了一些配置来允许我们配置加载条件,比如 group 过滤,比如 key 过滤。

举个例子,我们可以看看 org.apache.dubbo.Filter 这个类,它有非常多的实现,比如说 CacheFilter,这个缓存过滤器,配置信息

如下

group 表示客户端和和服务端都会加载,value 表示 url 中有 cache_key 的时候

1
2
@Activate(group = {CONSUMER, PROVIDER}, value = CACHE_KEY) 
public class CacheFilter implements Filter {...}

通过下面这段代码,演示关于 Filter 的自动激活扩展点的效果。没有添加“红色部分的代码”时,list 的结果是 10,添加之后 list 的结果是 11. 会自动把 cacheFilter 加载进来

1
2
3
4
5
6
7
public static void main(String[] args) {
ExtensionLoader<Filter> loader=ExtensionLoader.getExtensionLoader(Filter.class);
URL url = new URL("","",0);
//url=url.addParameter("cache","cache"); 有和没有的区别
List<Filter> filters=loader.getActivateExtension(url,"cache");
System.out.println(filters.size());
}