dubbo-服务消费者启动过程

1、设计思想

要实现服务消费,需要考虑以下问题:

  • 生成远程服务的代理
  • 获得目标服务的url地址
  • 实现远程通信
  • 实现负载均衡
  • 实现集群容错

2、注解解析

Dubbo的服务消费者注入也有两种方式:

  • 通过xml形式

  • 基于注解的方式

我来分析一下基于注解的方式的解析过程:

@DubboReference 注解的解析逻辑在ReferenceAnnotationBeanPostProcessor类中,在2.7.6版本中这个类在在DubboAutoConfiguration中有显示的配置:

1
2
3
4
@ConditionalOnMissingBean
@Bean(name = ReferenceAnnotationBeanPostProcessor.BEAN_NAME) public ReferenceAnnotationBeanPostProcessor referenceAnnotationBeanPostProcessor() {
return new ReferenceAnnotationBeanPostProcessor();
}

但其实这里出现了重复注入的情况,因为在这段代码执行之前,ReferenceAnnotationBeanPostProcessor已经在容器中了,所以2.7.8版本删除了这段代码。

那么这个类到底在哪里注入的?

配置类DubboAutoConfiguration上注解了@EnableDubboConfig

1
2
3
4
5
6
7
8
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Import(DubboConfigConfigurationRegistrar.class)
public @interface EnableDubboConfig {
boolean multiple() default true;
}

该注解使用了@Import动态注入,进入DubboConfigConfigurationRegistrar

1
2
3
4
5
6
7
8
9
public class DubboConfigConfigurationRegistrar implements ImportBeanDefinitionRegistrar {

@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
...
// Since 2.7.6
registerCommonBeans(registry);
}
}

registerCommonBeans()方法是定义在DubboBeanUtils中的一个静态方法,注释可以看到这个方法是从2.7.6版本添加的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static void registerCommonBeans(BeanDefinitionRegistry registry) {

// Since 2.5.7 Register @Reference Annotation Bean Processor as an infrastructure Bean
registerInfrastructureBean(registry, ReferenceAnnotationBeanPostProcessor.BEAN_NAME,
ReferenceAnnotationBeanPostProcessor.class);

// Since 2.7.4 [Feature] https://github.com/apache/dubbo/issues/5093
registerInfrastructureBean(registry, DubboConfigAliasPostProcessor.BEAN_NAME,
DubboConfigAliasPostProcessor.class);

// Since 2.7.5 Register DubboLifecycleComponentApplicationListener as an infrastructure Bean
registerInfrastructureBean(registry, DubboLifecycleComponentApplicationListener.BEAN_NAME,
DubboLifecycleComponentApplicationListener.class);

// Since 2.7.4 Register DubboBootstrapApplicationListener as an infrastructure Bean
registerInfrastructureBean(registry, DubboBootstrapApplicationListener.BEAN_NAME,
DubboBootstrapApplicationListener.class);

// Since 2.7.6 Register DubboConfigDefaultPropertyValueBeanPostProcessor as an infrastructure Bean
registerInfrastructureBean(registry, DubboConfigDefaultPropertyValueBeanPostProcessor.BEAN_NAME,
DubboConfigDefaultPropertyValueBeanPostProcessor.class);
}

可以看到这个方法注入了几个dubbo的基础类,其中就有ReferenceAnnotationBeanPostProcessor。当使用xml方式解析dubbo配置的时候,也会使用到这个静态方法来注入这些基础类。

3 . 消费端启动流程

下面我们进入正题

springboot在实例化bean时会进入自动注入流程,之前分析过自动注入流程是在populateBean() 方法中,在此方法中有这段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for (BeanPostProcessor bp : getBeanPostProcessors()) {
if (bp instanceof InstantiationAwareBeanPostProcessor) {
InstantiationAwareBeanPostProcessor ibp = (InstantiationAwareBeanPostProcessor) bp;
PropertyValues pvsToUse = ibp.postProcessProperties(pvs, bw.getWrappedInstance(), beanName);
if (pvsToUse == null) {
if (filteredPds == null) {
filteredPds = filterPropertyDescriptorsForDependencyCheck(bw, mbd.allowCaching);
}
// 主要是这一句
pvsToUse = ibp.postProcessPropertyValues(pvs, filteredPds, bw.getWrappedInstance(), beanName);
if (pvsToUse == null) {
return;
}
}
pvs = pvsToUse;
}
}

这段代码会执行BeanPostProcessor中的逻辑postProcessPropertyValues()方法,

再看ReferenceAnnotationBeanPostProcessor的类继承关系,它实现了InstantiationAwareBeanPostProcessor接口,该接口定义了postProcessPropertyValues方法

进入postProcessPropertyValues方法:

1
2
3
4
5
6
7
8
9
10
11
12
@Override // AbstractAnnotationBeanPostProcessor
public PropertyValues postProcessPropertyValues(PropertyValues pvs, PropertyDescriptor[] pds,
Object bean, String beanName) throws
BeanCreationException {
// 从这里进入就是注解解析流程,这里不深究
InjectionMetadata metadata = findInjectionMetadata(beanName, bean.getClass(), pvs);
try {
// 从这里注入
metadata.inject(bean, beanName, pvs);
} ...
return pvs;
}

findInjectionMetadata()方法完成了对几个指定注解的解析,返回一个InjectionMetadata对象,执行该对象的inject()方法进行注入流程。

4. 获取远程服务实例的代理对象

继续跟进inject(),这是私有内部类 AnnotatedFiledElement中的方法

1
2
3
4
5
6
7
8
9
@Override //AbstractAnnotationBeanPostProcessor.AnnotatedFiledElement
protected void inject(Object bean, String beanName, PropertyValues pvs) throws Throwable {
Class<?> injectedType = field.getType();
//getInjectedObject方法返回需要注入的代理对象,此处开始进入正题
Object injectedObject = getInjectedObject(attributes, bean, beanName, injectedType, this);
ReflectionUtils.makeAccessible(field);
// 反射方式注入代理对象
field.set(bean, injectedObject);
}

跟进getInjectedObject方法:

1
2
3
4
5
6
7
8
9
10
11
//AbstractAnnotationBeanPostProcessor
protected Object getInjectedObject(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
InjectionMetadata.InjectedElement injectedElement) throws Exception {
...
if (injectedObject == null) {
// >> 干活
injectedObject = doGetInjectedBean(attributes, bean, beanName, injectedType, injectedElement);
injectedObjectsCache.putIfAbsent(cacheKey, injectedObject);
}
return injectedObject;
}

最终会执行到doGetInjectedBean()方法,这里使用模板模式,该方法定义在子类ReferenceAnnotationBeanPostProcessor中,

4.1 创建ReferenceBean并注册

doGetInjectedBean

该方法主要做两件事:

  • 创建了一个ReferenceBean注册到Spring IOC容器中

  • 调用 referenceBean.get() 获取一个动态代理对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
protected Object doGetInjectedBean(AnnotationAttributes attributes, Object bean, String beanName, Class<?> injectedType,
InjectionMetadata.InjectedElement injectedElement) throws Exception {
// ServiceBean:com.pd.ISayHello
String referencedBeanName = buildReferencedBeanName(attributes, injectedType);
// @Reference(check=false,mock=com.pd.moke.ISayHelloMoke,protocol=dubbo) com.pd.ISayHello
String referenceBeanName = getReferenceBeanName(attributes, injectedType);
// 创建一个ReferenceBean
ReferenceBean referenceBean = buildReferenceBeanIfAbsent(referenceBeanName, attributes, injectedType);
boolean localServiceBean = isLocalServiceBean(referencedBeanName, referenceBean, attributes);
prepareReferenceBean(referencedBeanName, referenceBean, localServiceBean);
// 将referenceBean注册到ioc
registerReferenceBean(referencedBeanName, referenceBean, attributes, localServiceBean, injectedType);
cacheInjectedReferenceBean(referenceBean, injectedElement);
// 使用ReferenceBean获取远程服务的调用对象
return referenceBean.get();
}
1
2
3
4
5
6
7
8
9
public synchronized T get() {
if (destroyed) {
throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
}
if (ref == null) {
init(); //>>
}
return ref; // ref 是dubbo返回的用于远程调用的代理对象
}

init() 方法有点类发布服务时的doExport方法,主要执行步骤如下:

1、检查配置信息

2、根据dubbo配置构建map集合

3、调用createProxy方法创建动态代理对象

本文重点分析第三步:

1
ref = createProxy(map);

4.2 创建远程代理对象

先思考一下,创建动态代理对象这个过程中,它可能会有哪些操作步骤?这个方法要能猜出来, 那必然需要对dubbo的使用比较熟悉。

首先我们需要注意一个点,这里是创建一个代理对象,而这个代理对象应该也和协议有关系,也就是不同的协议,使用的代理对象也应该不一样。

1
2
3
4
5
6
7
8
9
//ReferenceConfig
private T createProxy(Map<String, String> map) {
if (shouldJvmRefer(map)) {
URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = REF_PROTOCOL.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
} else {...}

上面代码是方法的第一部分,根据map进行判断,需要注入的服务是不是injvm的服务,主要判断依据是scope属性(scope==injvm时返回true),其次是generic属性(是泛化服务则返回false),最后的依据是getExporter(exporterMap, url) != null时,返回true,因为只有当前应用发布了该服务,getExporter才不会为空。

我们继续分析createProxy第二段代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//ReferenceConfig
else {
urls.clear();
// 点对点调用时的处理逻辑
if (url != null && url.length() > 0) {
...
} else {
// 再次确认协议不是injvm
if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
//检查注册中心配置
checkRegistry();
//返回注册中心url列表
List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
if (CollectionUtils.isNotEmpty(us)) {
for (URL u : us) {
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
if (monitorUrl != null) {
map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
}
// 将url添加一个refer属性,添加到成员urls中
urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
}
}
if (urls.isEmpty()) {
... // 抛异常
}
}
}
...
}

代码解析:

  1. 成员变量url是没有使用注册中心,点对点调用时的硬编码url,不是本文重点不做分析

  2. ConfigValidationUtils.loadRegistries()返回注册中心url列表,url形状如下:

    1
    2
    registry://10.0.12.74:2181/org.apache.dubbo.registry.RegistryService?application=spring-boot-dubbo-consumer&dubbo=2.0.2&pid=1841&qos.enable=false&register=false&registry=zookeeper
    &release=2.7.8&timestamp=1600308477956

    是registry协议

  3. 每个注册中心url需要添加一个refer属性再放到urls中取,refer属性是之前的map生成的一个字符串,这样做只是为了随着url方便传参数

    refer属性的形状如下:

    1
    2
    3
    4
    refer=application=spring-boot-dubbo-consumer&check=false&dubbo=2.0.2&init=false
    &interface=com.pd.ISayHello&metadata-type=remote&methods=sayHello
    &mock=com.pd.moke.ISayHelloMoke&pid=1841&protocol=dubbo&qos.enable=false
    &register.ip=172.30.66.2&release=D2.7.8&side=Dconsumer&sticky=false &timestamp=D1600308463376&register=false&registry=zookeeper&release=2.7.8&timestamp=1600308477956

createProxy()第三段

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
...// 接上段代码...
if (urls.size() == 1) {
invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
} else {
List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
URL registryURL = null;
for (URL url : urls) {
// 多注册中心配置,针对每个配置中心会返回一个调用器
// url是register协议,进入RegisterProtocol.refer()
invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
if (UrlUtils.isRegistry(url)) {
registryURL = url; // use last registry url
}
}
if (registryURL != null) { // registry url is available
// for multi-subscription scenario, use 'zone-aware' policy by default
String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);

invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
} else { // not a registry url, must be direct invoke.
String cluster = CollectionUtils.isNotEmpty(invokers)
? (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) : Cluster.DEFAULT)
: Cluster.DEFAULT;
invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
}
}

代码解析:

这段代码就是比较重要的,生成调用器(代理对象) ,可以看到此处对单注册中心 和 多注册中心 的情况做了区分:

  • 单注册中心配置下直接调用refer方法,返回一个invoker
  • 多注册中心配置下,针对每个注册中心,产生一个invoker,然后通过CLUSTER.join把invokers以静态的Directory形式构建一个invoker对象。 目的是实现注册中心的路由

理解此处逻辑的关键在于要清楚 Directory的概念 和 Cluster.join()的逻辑,后面我会对此分析。

此时我还是先将注意力集中在主流程上,看看invoker是怎么被创建的

1
REF_PROTOCOL.refer(interfaceClass, urls.get(0))

REF_PROTOCOL是Protocol接口的自适应扩展类,这里协议类型registry,因此会执行RegistryProtocol的refer方法

4.3 创建代理对象的调用器Invoker

RegistryProtocol.refer()

贴代码先

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 返回具体的注册中心协议url
url = getRegistryUrl(url);
// 自适应扩展点创建的是一个被ListenerRegisterWrapper包装的ZookeeperRegister
Registry registry = registryFactory.getRegistry(url);
if (RegistryService.class.equals(type)) {
return proxyFactory.getInvoker((T) registry, type, url);
}
// group="a,b" or group="*"
// 将refer属性解析成map
Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
// 指定了group的情况
String group = qs.get(GROUP_KEY);
if (group != null && group.length() > 0) {
if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url);
}
}
// 没指定group,创建Cluster对象,未指定的话默认是"failover"(重试)
Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
// type 是服务接口类型
return doRefer(cluster, registry, type, url);
}

主要工作有:

1、获取指定注册中心协议的url

2、根据注册中心协议创建Registry对象

3、将url参数refer解析成map

4、获取cluster对象

5、指定group和未指定,分两种情况执行dorefer操作

doRefer()

首先创建了RegistryDirectory对象,该对象作用很重要,它负责动态维护服务提供者列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// RegistryProtocol
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
//初始化RegistryDirectory ,对每个注册中心会创建一个Directory
RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
//创建consumer://协议的url
URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
if (directory.isShouldRegister()) {
//注册服务消费者的url地址
directory.setRegisteredConsumerUrl(subscribeUrl);
registry.register(directory.getRegisteredConsumerUrl());
}
directory.buildRouterChain(subscribeUrl);
// 订阅注册中心的
directory.subscribe(toSubscribeUrl(subscribeUrl));
//一个注册中心会存在多个服务提供者,所以在这里需要把多个服务提供者通过cluster.join合并成一个集群调用器 ClusterInvoker,
Invoker<T> invoker = cluster.join(directory);
// 获取监听器
List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
if (CollectionUtils.isEmpty(listeners)) {
return invoker; // 从这里返回
}

RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker);
for (RegistryProtocolListener listener : listeners) {
listener.onRefer(this, registryInvokerWrapper);
}
return registryInvokerWrapper;
}

directory.subscribe方法,分析见文章《dubbo-消费者服务订阅分析》,次方法执行之后,directory的成员invokers已经装入了具体服务实例的调用器。

我们继续往下看:

4.4 合并调用器

cluser.join()

Cluter自适应扩展时会被包装:

1
2
mock=org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapper
failover=org.apache.dubbo.rpc.cluster.support.FailoverCluster

所以默认得到的Cluster是:

MockClusterWrapper(FailoverCluster)

cluster.join 会先进入MockClusterWrapper

1
2
3
4
5
@Override //MockClusterWrapper
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}

然后在执行FailoverCluster.join,然而这个类没有实现join方法而是继承了抽象父类AbstractCluster的join方法:

1
2
3
4
@Override //AbstractCluster
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY));
}

doJoin()是模板方法,声明在抽象类,其实现在子类中:

1
2
3
4
@Override//FailoverCluster
public <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new FailoverClusterInvoker<>(directory);
}

这里创建一个FailoverClusterInvoker返回

紧接着通过方法buildClusterInterceptors给这个Invoker添加拦截器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private <T> Invoker<T> buildClusterInterceptors(AbstractClusterInvoker<T> clusterInvoker, String key) {
// clusterInvoker = FailoverClusterInvoker
// key = null
AbstractClusterInvoker<T> last = clusterInvoker;
List<ClusterInterceptor> interceptors = ExtensionLoader.getExtensionLoader(ClusterInterceptor.class).getActivateExtension(clusterInvoker.getUrl(), key); // 默认只有一个拦截器ConsumerContextClusterInterceptor

if (!interceptors.isEmpty()) {
for (int i = interceptors.size() - 1; i >= 0; i--) {
final ClusterInterceptor interceptor = interceptors.get(i);
final AbstractClusterInvoker<T> next = last;
last = new InterceptorInvokerNode<>(clusterInvoker, interceptor, next);
}
}
return last;
}

ClusterInterceptor使用激活扩展方式,没有指定的情况下默认激活了ConsumerContextClusterInterceptor

1
2
context=org.apache.dubbo.rpc.cluster.interceptor.ConsumerContextClusterInterceptor
zone-aware=org.apache.dubbo.rpc.cluster.interceptor.ZoneAwareClusterInterceptor

使用拦截器包装之后,得到一个拦截器链,单链表

InterceptorInvokerNode->FailoverClusterInvokerInterceptorInvokerNode中包含了拦截器以及FailoverClusterInvoker,和指向下一个拦截器链节点的指针

1
2
3
4
5
6
7
public InterceptorInvokerNode(AbstractClusterInvoker<T> clusterInvoker,
ClusterInterceptor interceptor,
AbstractClusterInvoker<T> next) {
this.clusterInvoker = clusterInvoker;
this.interceptor = interceptor; // 当前节点的拦截器
this.next = next; // 指向下一个节点指针
}

因为默认只有一个激活的拦截器,所以得到的链表如下:

buildClusterInterceptors方法返回的是链表头指针InterceptorInvokerNode引用,InterceptorInvokerNode继承自AbstractClusterInvoker

最后不要忘了最开始是从Cluster的包装类MockClusterWrapper中开始调用的join方法,最后还要加上一层包装

1
2
3
4
5
@Override //MockClusterWrapper
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return new MockClusterInvoker<T>(directory,
this.cluster.join(directory));
}

最终得到的Invoker是这样的:

MockClusterInvoker(InterceptorInvokerNode(FailoverClusterInvoker))

到此是针对于一个注册中心得到的Invoker对象

多注册中心的情况

配置多个注册中心的时候,还需要调用一次cluster.join方法,将多个注册中心产生的Invoker合并成一个,以实现注册中心的负载均衡,回顾一下代码:

1
2
3
// ReferenceConfig
String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));//invokers就是多注册中心得到的Invoker集合

可以看到,会先创建一个StaticDirectory静态的目录,为什么是静态的?因为注册中心的配置是写死的,不会变,针对注册中心创建动态的目录是因为注册中心的服务提供者目录可能随时会发生变化,目录相应的也要跟着变

上面的代码 cluster = zone-aware

最终得到的Cluster是 ZoneAwareCluster类型,且不需要进行包装:

join直接进入抽象类:

1
2
3
4
@Override
public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
return buildClusterInterceptors(doJoin(directory), directory.getUrl().getParameter(REFERENCE_INTERCEPTOR_KEY));
}

doJoin进入ZoneAwareCluster

1
2
3
4
@Override
protected <T> AbstractClusterInvoker<T> doJoin(Directory<T> directory) throws RpcException {
return new ZoneAwareClusterInvoker<T>(directory);
}

再执行buildClusterInterceptors进行拦截器封装,最终得到的Invoker是这样的:

InterceptorInvokerNode(FailoverClusterInvoker)

结合注册中心生成的Invoker,最终产生的Invoker是这样的

回到createProxy()

拿到最终的Invoker之后,要根据这个对象创建动态代理类,以便于消费端调用,看到createProxy方法的最后一句:

1
return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));

使用JavassistProxyFactory创建一个代理对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override // AbstractProxyFactory
public <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException {
Set<Class<?>> interfaces = new HashSet<>();
String config = invoker.getUrl().getParameter(INTERFACES);
if (config != null && config.length() > 0) {
...
}
if (generic) {
...
}
interfaces.add(invoker.getInterface());
interfaces.addAll(Arrays.asList(INTERNAL_INTERFACES));
return getProxy(invoker, interfaces.toArray(new Class<?>[0]));//>> 进入
}

interfaces 最后包含三个接口:

  • interface org.apache.dubbo.rpc.service.Destroyable

  • interface com.alibaba.dubbo.rpc.service.EchoService

  • interface com.pd.ISayHello

1
2
3
4
5
@Override
@SuppressWarnings("unchecked")
public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}

新建实例的时候,传入了一个InvokerInvocationHandler对象,动态代理见多了肯定知道,当消费者调用远程服务时,会进入该对象的invoke方法,这里在消费流程中再分析。

到这里貌似消费端注入远程服务的流程就结束了,但是中间缺少了一个最重要的环节分析,那就是:

  1. 服务目录的订阅和监听
  2. netty client 创建以及连接的建立

这两项内容都是在RegistryDirectory中实现的

5. RegistryDirectory

在哪里创建?

RegistryProtocol.doRefer的时候创建,每个注册中心会创建一个RegistryDirectory对象,RegistryDirectory中保存了所有服务提供者的调用Invoker,这些invoker在RegistryDirectory.suscribe方法被调用的时候进行创建。

如何实现目录实时有效?

并且这个对象中保存的服务提供者目录是动态可变的,可变的原因是该目录实现了NotifyListener接口,该接口就一个重要的接口方法notify

每当注册中心的服务提供者列表发生变化的时候,例如使用的是zookeeper作为注册中心,dubbo会针对几个目录创建一些监听(subscribe时创建监听),当目录中的内容发生变化的时候会向监听的创建者(服务消费者)发送通知。

服务消费者收到通知会调用RegistryDirectory中实现的notify方法,修改消费者内存RegistryDirectory对象中保存的的服务提供者目录,以保证服务提供者目录实时有效