dubbo-服务发布和注册过程

1 设计思想

dubbo是基于url驱动的rpc框架

所以在服务启动之后,需要针对指定的协议生成url,若指定了多协议则生成多个url。

生成url并启动本地的服务,我们称为服务发布,随后会将服务url注册到注册中心,这个过程称为服务注册

服务发布注册需要在服务启动过程中进行,启动->发布->注册

那么就需要去设计服务发布注册的时机,dubbo的做法是向springboot注册了一个事件监听器,监听ContextRefreshedEvent事件,当context刷新完成之后,该监听器会执行dubbo服务的发布注册流程。

2 执行流程

@DubboComponentScan注解可以指定扫描dubbo服务的包,

1
2
@Import(DubboComponentScanRegistrar.class)
public @interface DubboComponentScan { ...}

DubboComponentScanRegistrars是用于动态注册bean的一种方法,注册了什么呢?

1
2
3
4
5
@Override //DubboComponentScanRegistrars
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
...
registerServiceAnnotationBeanPostProcessor(packagesToScan, registry);
}
1
2
3
4
5
6
7
8
9
//DubboComponentScanRegistrars
private void registerServiceAnnotationBeanPostProcessor(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
BeanDefinitionBuilder builder = rootBeanDefinition(ServiceAnnotationBeanPostProcessor.class);
builder.addConstructorArgValue(packagesToScan);
builder.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry);

}

注册了一个ServiceAnnotationBeanPostProcessor,看该类的继承关系可知这是一个BeanDefinitionRegistryPostProcessor

在ApplicationContextInitializer的标准初始化之后修改它的内部bean定义注册表。所有的beanDefinition都已加载,但还没有实例化任何bean。允许在下一个后处理阶段开始之前添加更多的bean定义

以上是BeanDefinitionRegistryPostProcessor的官方注释,

1
2
3
4
5
6
@Override // ServiceClassPostProcessor
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
// @since 2.7.5
registerInfrastructureBean(registry, DubboBootstrapApplicationListener.BEAN_NAME, DubboBootstrapApplicationListener.class);
...
}

可以看到该后置处理器向ioc注册了DubboBootstrapApplicationListener

1
2
3
4
5
6
7
8
@Override //DubboBootstrapApplicationListener
public void onApplicationContextEvent(ApplicationContextEvent event) {
if (event instanceof ContextRefreshedEvent) {
onContextRefreshedEvent((ContextRefreshedEvent) event);
} else if (event instanceof ContextClosedEvent) {
onContextClosedEvent((ContextClosedEvent) event);
}
}
1
2
3
4
5
6
7
8
// DubboBootstrapApplicationListener
private final DubboBootstrap dubboBootstrap;
public DubboBootstrapApplicationListener() {
this.dubboBootstrap = DubboBootstrap.getInstance();
}
private void onContextRefreshedEvent(ContextRefreshedEvent event) {
dubboBootstrap.start();
}

跟到这里,来到了dubbo服务启动类DubboBootstrap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void doExportUrls() {
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);

List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);

for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
// TODO, uncomment this line once service key is unified
serviceMetadata.setServiceKey(pathKey);
doExportUrlsFor1Protocol(protocolConfig, registryURLs); // >>
}
}

doExportUrlsFor1Protocol从方法名能看出这个方法的功能是使用一个协议发布服务,具体是什么协议是由用户配置的。 下图为方法的两个参数,参数protocolConfig是用户配置的协议,参数registryURLs是从用户的注册中心配置中构建的两个url,这里配置了两个

doExportUrlsFor1Protocol方法代码很多,前面主要是根据运行环境以及用户配置,初始化服务的属性,使用一个map存储这些配置属性:

然后根据这些属性生成服务url:

1
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

生成的url如下:

1
dubbo://172.30.66.2:20880/com.pd.ISayHello?anyhost=true&application=spring-boot-dubbo-provider&bind.ip=172.30.66.2&bind.port=20880&cluster=failover&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.pd.ISayHello&loadbalance=roundrobin&metadata-type=remote&methods=sayHello&pid=1013&qos.enable=false&release=2.7.8&side=provider&timestamp=1598924399518

dubbo默认的服务路径是接口的全类名,例如dubbo://172.30.66.2:20880/com.pd.ISayHello

实际应用中可以在配置文件中指定dubbo.provider.contextPath属性,这个是一级路径,此外在服务注解中也可以指定具体服务的二级路径,注解中的path属性指定,例如进行以下配置:

1
2
3
dubbo:
provider:
contextpath: /panda
1
2
3
4
5
6
7
@DubboService(registry = {"nacos","zookeeper"},
path = "sayHelloImpl", // 指定服务二级路径
protocol = {"dubbo"},
loadbalance = "roundrobin",
cluster = "failover",
retries = 2)
public class SayHelloImpl implements ISayHello {

得到的url就是

1
dubbo://172.30.66.2:20880/panda/sayHelloImpl?anyhost=true&application=spring-boot-dubbo-provider&bind.ip=172.30.66.2&bind.port=20880&cluster=failover&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.pd.ISayHello&loadbalance=roundrobin&metadata-type=remote&methods=sayHello&pid=3333&qos.enable=false&release=2.7.8&side=provider&timestamp=1600236724722

到此为止服务配置的解析就完成了,接下来就是要执行服务的发布。

2.1 服务发布

还是doExportUrlsFor1Protocol方法,后半段落去非关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
String scope = url.getParameter(SCOPE_KEY);  // 一般不指定的scope的时候 此处为null
if (!SCOPE_NONE.equalsIgnoreCase(scope)) { // null != none
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) { // null != remote
exportLocal(url); // 执行本地发布
}
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) { //null != local
if (CollectionUtils.isNotEmpty(registryURLs)) {
for (URL registryURL : registryURLs) {
// if protocol is only injvm ,not register
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
...
// 以下为发布逻辑的重点内容
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
// 生成调用器
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass,
registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
} else { // 如果注册中心配置registryURLs为空,执行以下代码,只发布dubbo服务,不进行注册
...
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}

PROXY_FACTORY是使用自适应扩展获得的动态代理工厂,这里获取的是javassist:

1
private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
2.2.1 Invoker

看这个获取invoker对象的方法getInvoker

1
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));

参数ref: 具体的service实例对象

参数interfaceClass :service接口

参数url : 在registryURL中添加了一个export参数,存放dubbo协议的服务url

1
registry://10.0.12.76:8848/org.apache.dubbo.registry.RegistryService?application=spring-boot-dubbo-provider&dubbo=2.0.2&export=dubbo%3A%2F%2F172.30.66.2%3A20880%2Fcom.pd.ISayHello%3Fanyhost%3Dtrue%26application%3Dspring-boot-dubbo-provider%26bind.ip%3D172.30.66.2%26bind.port%3D20880%26cluster%3Dfailover%26deprecated%3Dfalse%26dubbo%3D2.0.2%26dynamic%3Dtrue%26generic%3Dfalse%26interface%3Dcom.pd.ISayHello%26loadbalance%3Droundrobin%26metadata-type%3Dremote%26methods%3DsayHello%26pid%3D1013%26qos.enable%3Dfalse%26release%3D2.7.8%26side%3Dprovider%26timestamp%3D1598924399518&pid=1013&qos.enable=false&registry=nacos&release=2.7.8&timestamp=1598923996985

再看javassist中的getInvoker方法

1
2
3
4
5
6
7
8
9
10
11
12
public <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) { //proxy 指的是被代理的对象,也就是具体的服务类对象
// TODO Wrapper cannot handle this scenario correctly: the classname contains '$'
final Wrapper wrapper = Wrapper.getWrapper(proxy.getClass().getName().indexOf('$') < 0 ? proxy.getClass() : type);
return new AbstractProxyInvoker<T>(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class<?>[] parameterTypes,
Object[] arguments) throws Throwable {
return wrapper.invokeMethod(proxy, methodName, parameterTypes, arguments);
}
};
}

在手写rpc的文章中,消费者使用动态代理将需要调用的服务名、方法名、参数等信息发送个服务提供者,服务提供者获取这些信息之后使用发射调用,获得返回值,再通过网络发送个消费者,这里存在一个性能问题,就是服务端的反射调用

如果每次处理请求,都需要去查找类查找方法,这肯定会增加服务的响应时间,dubbo提供的解决方案是除了使用静态代理的设计,在服务发布的时候就生成一个服务对象的代理类,这样在调用服务方法时直接调用代理对象响应的方法就可以。

dubbo 使用Wrapper类对服务对象进行代理,服务发布时就会生成一个Invoker对象,其成员变量wrapper就调用代理类对象,看上面的代码,消费端的请求发送到服务端之后,会进入doInvoke方法,而实际执行的是wrapper的invokeMethod方法。

wrapper对象的生成代码比较多,可以单独分析。

2.2.2 RegisterProtocol
1
2
private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);

这里的PROTOCOL也是一个自适应的扩展,wrapperInvoker.getUrl()返回的是registry,因此此处将进入RegisterProtocol的export方法:

1
2
3
4
5
6
7
8
9
10
11
12
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
URL registryUrl = getRegistryUrl(originInvoker);
// url to export locally
URL providerUrl = getProviderUrl(originInvoker);
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
//export invoker 启动dubbo服务,具体就是启动nettyserver
final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
...
}
1
2
3
4
5
6
7
8
private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);

return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
});
}

重构了一个Invoker,url为dubbo协议的服务url,因此此处的Protocol.export将进入DubboProtocol

1
2
3
4
5
6
7
8
9
10
11
12
13
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();

// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
...
openServer(url); // >> 开启服务监听
optimizeSerialization(url);

return exporter;
}

再往下跟就是传输层的逻辑了,dubbo使用netty4作为传输层协议,最后开启了一个netty server。

dubbo服务启动完成之后,将执行服务的注册流程

2.3 服务注册

回到registryProtocol的export方法后半段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
...
// decide if we need to delay publish
boolean register = providerUrl.getParameter(REGISTER_KEY, true);
if (register) {
register(registryUrl, registeredProviderUrl); //>>
}

// register stated url on provider model
registerStatedUrl(registryUrl, registeredProviderUrl, register);

exporter.setRegisterUrl(registeredProviderUrl);
exporter.setSubscribeUrl(overrideSubscribeUrl);

// Deprecated! Subscribe to override rules in 2.6.x or before.
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

notifyExport(exporter);
//Ensure that a new exporter instance is returned every time export
return new DestroyableExporter<>(exporter);
}

进入注册流程register(registryUrl, registeredProviderUrl); //>>

1
2
3
4
private void register(URL registryUrl, URL registeredProviderUrl) {
Registry registry = registryFactory.getRegistry(registryUrl);
registry.register(registeredProviderUrl);
}

看第一句先获取一个RegistryregistryFactory这里也是一个自适应扩展点,扩展如下

META-INF/dubbo/internal/org.apache.dubbo.registry.RegistryFactory

1
2
3
4
5
6
7
8
9
10
11
12
service-discovery-registry=org.apache.dubbo.registry.client.ServiceDiscoveryRegistryFactory
wrapper=org.apache.dubbo.registry.RegistryFactoryWrapper
dubbo=org.apache.dubbo.registry.dubbo.DubboRegistryFactory
multicast=org.apache.dubbo.registry.multicast.MulticastRegistryFactory
zookeeper=org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory
redis=org.apache.dubbo.registry.redis.RedisRegistryFactory
consul=org.apache.dubbo.registry.consul.ConsulRegistryFactory

etcd3=org.apache.dubbo.registry.etcd.EtcdRegistryFactory
nacos=org.apache.dubbo.registry.nacos.NacosRegistryFactory
sofa=org.apache.dubbo.registry.sofa.SofaRegistryFactory
multiple=org.apache.dubbo.registry.multiple.MultipleRegistryFactory

debug到这里,registryUrl的值如下:

注意RegistryFactory的扩展有wapper包装器,所以创建的RegistryFactory是被RegistryFactoryWrapper包装之后的:

1
2
3
4
5
6
// RegistryFactoryWrapper  对饮wrapper扩展点
public Registry getRegistry(URL url) {
return new ListenerRegistryWrapper(registryFactory.getRegistry(url), // 到这里才是创建nacos的registryFactory
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(RegistryServiceListener.class)
.getActivateExtension(url, "registry.listeners")));
}

因此最终获得的Registry是一个被包装的:

ListenerRegistryWrapper(NacosRegistry),外面的包装用于监听注册事件,与注册流程无关这里不进行分析。

这里是nacos协议 因此将会进入到NacosRegistryFactory来创建Registry对象,且此处使用了模板方法,NacosRegistryFactory中并没有getRegistry()方法,该方法定义在其父类AbstractRegistryFactory中,抽象父类定义了抽象方法createRegistry()getRegistry()最终调用子类对该抽象方法的的具体实现来生成Registry对象。

下面看一下NacosRegistry的类关系图,这里也是模板模式的设计,register()方法是定义在父类中的,需要注意的是FailbackRegistry也重载了register()方法,所以register()方法的调用顺序是:ListenerRegistryWrapper–>FailbackRegistry–>AbstractRegistry,

1
2
3
4
5
@Override //AbstractRegistry
public void register(URL url) {
...
registered.add(url); // 这里只是将url存放到一个set中
}

远程注册的逻辑在

FailbackRegistry中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void register(URL url) {
if (!acceptable(url)) {
logger.info("URL " + url + " will not be registered to Registry. Registry " + url + " does not accept service of this protocol type.");
return;
}
super.register(url);
removeFailedRegistered(url);
removeFailedUnregistered(url);
try {
// Sending a registration request to the server side >> 此处是执行真正的远程注册
doRegister(url);
} catch (Exception e) {...}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
@Override // NacosRegistry
public void doRegister(URL url) {
final String serviceName = getServiceName(url);
final Instance instance = createInstance(url);
/**
* namingService.registerInstance with {@link org.apache.dubbo.registry.support.AbstractRegistry#registryUrl}
* default {@link DEFAULT_GROUP}
*
* in https://github.com/apache/dubbo/issues/5978
*/
execute(namingService -> namingService.registerInstance(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP), instance));
}
1
2
3
4
5
6
7
8
9
private void execute(NamingServiceCallback callback) {
try {
callback.callback(namingService);
} catch (NacosException e) {
if (logger.isErrorEnabled()) {
logger.error(e.getErrMsg(), e);
}
}
}

此处使用lamda表达式,execute方法接收一个NamingServiceCallback类型的参数,该参数只有一个函数式接口callback(namingService),所以在callback函数中调用了NacosNamingService来进行注册,针对不同的注册中心有不同的NamingService实现。

对服务的发布和注册就先分析到这里。

3 延伸:dubbo xml配置的解析

spring中定义了一个NameSpaceHandler接口,接口注释如下:

1
2
3
4
/**
* Base interface used by the {@link DefaultBeanDefinitionDocumentReader}
* for handling custom namespaces in a Spring XML configuration file.
*/

这个接口是用来处理spring xml配置文件中的定制的namespace,例如dubbo:servicedubbo:provider等等

Spring默认会加载jar包中的META-INF/spring.handlers文件寻找对应的NameSpaceHandler,例如dubbo实现了该接口,在dubbo-config-spring模块中,就能找到这个META-INF/spring.handlers文件:

1
2
3
# dubbo-config/dubbo-config-spring/src/main/resources/META-INF/spring.handlers
http\://dubbo.apache.org/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler
http\://code.alibabatech.com/schema/dubbo=org.apache.dubbo.config.spring.schema.DubboNamespaceHandler

dubbo的实现类是DubboNamespaceHandler,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class DubboNamespaceHandler extends NamespaceHandlerSupport implements ConfigurableSourceBeanMetadataElement {

static {
Version.checkDuplicate(DubboNamespaceHandler.class);
}

@Override
public void init() {
registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
registerBeanDefinitionParser("ssl", new DubboBeanDefinitionParser(SslConfig.class, true));
registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
}
...
}

可以看到注册了一堆的element解析器来解析不同的配置标签,这些标签对应的配置最终都会被解析成BeanDefinition注册到IoC容器中。最后注册的一个是注解解析器,用于处理注解配置的情况。

我们仔细看,发现涉及到服务发布和服务调用的两个配置的解析,用的是ServiceBeanreferenceBean。并不是config结尾的,这两个类稍微特殊些,当然他们也继承了ServiceConfigReferenceConfig