dubbo-消费者服务订阅流程分析

Dubbo是以URL驱动的框架,因此本片将跟踪URL的传递以及处理来分析整个服务订阅的流程

消费者在createProxy()方法中,创建了第一个url

URL-1

1
registry://localhost:9090/org.apache.dubbo.registry.RegistryService?application=consumer-test&dubbo=2.0.2&pid=10181&qos.enable=false&registry=spring-cloud&release=2.7.6&timestamp=1610009175272

接着,在上面url上接了一个refer属性:

URL-2

1
registry://localhost:9090/org.apache.dubbo.registry.RegistryService?application=consumer-test&dubbo=2.0.2&pid=10181&qos.enable=false&refer=application%3Dconsumer-test%26dubbo%3D2.0.2%26init%3Dfalse%26interface%3Dcom.pd.ISayHello%26methods%3DsayHello%26pid%3D10181%26qos.enable%3Dfalse%26register.ip%3D172.30.66.2%26release%3D2.7.6%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1610009171036&registry=spring-cloud&release=2.7.6&timestamp=1610009175272

%3D 是 = 号,%26 是 & 符号

拼接这个refer的目的是传参

接下来创建invoker对象

1
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));

REF_PROTOCOL是自适应扩展点,将根据url的协议 进入指定的Protocol进行处理,这里将进入

RegistyProtocol.refer

该方法中首先获取注册url

1
2
3
4
5
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {    
url = getRegistryUrl(url);
Registry registry = registryFactory.getRegistry(url);
...
}

该url为:

URL-3

1
spring-cloud://localhost:9090/org.apache.dubbo.registry.RegistryService?application=consumer-test&dubbo=2.0.2&pid=10181&qos.enable=false&refer=application%3Dconsumer-test%26dubbo%3D2.0.2%26init%3Dfalse%26interface%3Dcom.pd.ISayHello%26methods%3DsayHello%26pid%3D10181%26qos.enable%3Dfalse%26register.ip%3D172.30.66.2%26release%3D2.7.6%26side%3Dconsumer%26sticky%3Dfalse%26timestamp%3D1610009171036&release=2.7.6&timestamp=1610009175272

可以发现与上一阶段的区别就是协议换成了spring-cloud

registryFactory也是自适应扩展的,将根据协议决定使用哪个registryFactory,

这里将进入SpringCloudRegistryFactory获取registry

那么获取到的肯定就是SpringCloudRegistry

继续向下进入doRefer方法

此方法中会创建一个subscribeUrl

URL-4

1
consumer://172.30.66.2/com.pd.ISayHello?application=consumer-test&dubbo=2.0.2&init=false&interface=com.pd.ISayHello&methods=sayHello&pid=10181&qos.enable=false&release=2.7.6&side=consumer&sticky=false&timestamp=1610009171036

这个url最后被添加到了AbstractRegistry一个Set类型的成员变量registered中

进一步处理这个url,添加了一个参数category=providers,configurators,routers

URL-5

1
consumer://172.30.66.2/com.pd.ISayHello?application=consumer-test&category=providers,configurators,routers&dubbo=2.0.2&init=false&interface=com.pd.ISayHello&methods=sayHello&pid=10897&qos.enable=false&release=2.7.6&side=consumer&sticky=false&timestamp=1610011204472

然后开始订阅服务:

1
2
3
4
5
6
7
//RegistryDirectory
public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}

registry是什么类型?

ListenerRegistryWrapper(SpringCloudRegistry)

因此会先调用ListenerRegistryWrapper.subscribe()

1
2
3
4
5
6
@Override
public void subscribe(URL url, NotifyListener listener) {
try {
registry.subscribe(url, listener); //这里的registry是SpringCloudRegistry
} finally {...}
}

再调用SpringCloudRegistry.subscribe(),而该方法继承的是父类的,所以会进入AbstractRegistry.subscribe()方法:

1
2
3
4
5
6
7
8
@Override // FailbackRegistry
public void subscribe(URL url, NotifyListener listener) {
super.subscribe(url, listener); // super是AbstractRegistry
removeFailedSubscribed(url, listener);
try {
doSubscribe(url, listener); //此方法在子类中定义,AbstractSpringCloudRegistry
} catch (Exception e) {...}
}

这里向类型为ConcurrentMap<URL, Set<NotifyListener>>subscribed 成员中中创建一条记录,并将RegistryDirectory放到Set中,RegistryDirectory是一个NotifyListener,能够监听注册中心实例列表的变化,从而更新本地实例目录,一个服务对应一个RegistryDirectory

1
2
3
4
5
6
@Override // AbstractRegistry
public void subscribe(URL url, NotifyListener listener) {
...
Set<NotifyListener> listeners = subscribed.computeIfAbsent(url, n -> new ConcurrentHashSet<>());
listeners.add(listener);
}

然后调用到了doSubscribe方法,该方法在子类AbstractSpringCloudRegistry中实现

1
2
3
4
5
6
7
8
9
10
11
12
@Override //AbstractSpringCloudRegistry
public final void doSubscribe(URL url, NotifyListener listener) {
... //处理一些特殊场景
else { // for general Dubbo Services
subscribeDubboServiceURLs(url, listener); // 此时的参数url还是URL-5
}
}
protected void subscribeDubboServiceURLs(URL url, NotifyListener listener) {
// 干活
doSubscribeDubboServiceURLs(url, listener);
registerServiceInstancesChangedEventListener(url, listener);
}
1
2
3
4
5
6
7
private void doSubscribeDubboServiceURLs(URL url, NotifyListener listener) {
// 获取应用需要订阅的服务名称,通过配置dubbo.cloud.subscribed-services来指定
Set<String> subscribedServices = repository.getSubscribedServices();
// Sync 根据服务名称订阅所有服务
subscribedServices.forEach(service -> subscribeDubboServiceURL(url, listener,
service, this::getServiceInstances));
}

subscribeDubboServiceURL方法的最后一个参数是一个函数引用,传入的函数参数getServiceInstances封装的是获取注册中心指定服务实例的逻辑

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
protected void subscribeDubboServiceURL(URL url, NotifyListener listener,
String serviceName,
Function<String, Collection<ServiceInstance>> serviceInstancesFunction) {

List<URL> allSubscribedURLs = new LinkedList<>(); // 用于存放所有已订阅的实例Url
//调用函数参数getServiceInstances 获取指定服务的实例
Collection<ServiceInstance> serviceInstances = serviceInstancesFunction
.apply(serviceName);
...
// 初始化服务的元信息,服务元信息里没有具体实例相关的信息
repository.initializeMetadata(serviceName);
DubboMetadataService dubboMetadataService = dubboMetadataConfigServiceProxy
.getProxy(serviceName);

// 根据服务元信息和URL-5获取已发布的服务实例URL-6,同一个服务可能会以多种协议发布,所以这里是一个列表,通常只用dubbo的情况下,列表中只有一条记录
List<URL> exportedURLs = getExportedURLs(dubboMetadataService, url);
for (URL exportedURL : exportedURLs) {
String protocol = exportedURL.getProtocol();
List<URL> subscribedURLs = new LinkedList<>();
// 获取所有的服务实例URl
serviceInstances.forEach(serviceInstance -> {
Integer port = repository.getDubboProtocolPort(serviceInstance, protocol);
String host = serviceInstance.getHost();
...
// 根据serviceInstance对象和exportedURL,拼接处具体的实例url
else {
URL subscribedURL = new URL(protocol, host, port,
exportedURL.getParameters());
subscribedURLs.add(subscribedURL);
}
});

allSubscribedURLs.addAll(subscribedURLs);
}
...
// 通知`RegistryDirectory`
listener.notify(allSubscribedURLs);
}

根据上面的URL-5,从DubboServiceMetadataRepository中拿到符合要求的exportedUrl列表:

URL-6

1
dubbo://172.30.66.2:20881/com.pd.ISayHello?anyhost=true&application=provider-test&deprecated=false&dubbo=2.0.2&dynamic=true&generic=false&interface=com.pd.ISayHello&methods=sayHello&pid=818&release=2.7.6&side=provider&timestamp=1610069264009

这个url已经可以定位到一个具体的服务实例了。

然后使用URL-6和之前获取的serviceInstances列表,生成所有的服务实例allSubscribedURLs

然后通知RegistryDirectory

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override//RegistryDirectory
public synchronized void notify(List<URL> urls) {
// 过滤参数传入的实例url列表,并创建categoryUrls
Map<String, List<URL>> categoryUrls = urls.stream()
.filter(Objects::nonNull)
.filter(this::isValidCategory)
.filter(this::isNotCompatibleFor26x)
.collect(Collectors.groupingBy(this::judgeCategory));

List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);

List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
toRouters(routerURLs).ifPresent(this::addRouters);

// providers
List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
/**
* 3.x added for extend URL address
*/
ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
if (supportedListeners != null && !supportedListeners.isEmpty()) {
for (AddressListener addressListener : supportedListeners) {
providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
}
}
refreshOverrideAndInvoker(providerURLs);
}

categoryUrls是一个Map,保存category和Urls之间的映射关系:

refreshOverrideAndInvoker

该方法将根据提供者的URLs,创建调用器,Invoker,每个provider创建一个Invoker,并存储到

RegistryDirectory的成员invokers这个List中。