Nacos(二)注册中心核心源码分析

注册中心需要的核心功能:

  1. 服务提供者将服务信息发布到注册中心,此处涉及到服务信息的数据结构设计及存储
  2. 服务消费者能够从注册中心获取所需服务的实例信息
  3. 注册中心需要感知服务的健康状况,心跳机制
  4. 服务消费者需要动态更新依赖的服务列表信息

nacos服务信息数据结构设计与存储

1
2
3
4
/**
* Map(namespace, Map(group::serviceName, Service)).
*/
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

一个服务Service中,可以包含多个集群

1
2
3
4
/**
* map(clustrtName,Cluster)
*/
private Map<String, Cluster> clusterMap = new HashMap<>();

一个集群Cluster中,可以包含多个实例

1
2
3
4
5
@JsonIgnore // 持久化节点
private Set<Instance> persistentInstances = new HashSet<>();

@JsonIgnore //临时节点
private Set<Instance> ephemeralInstances = new HashSet<>();

一个实例Instance 中存储了该实例的所有信息。

1. 服务提供者注册服务

1.1 时机

  • 启动时注册,springboot启动时,会执行dubbo服务发布以及注册流程
  • 启动后注册,此时使用的是事件监听机制,在 DubboServiceRegistrationNonWebApplicationAutoConfiguration 这个类中,会监听ApplicationStartedEvent 事件,这个事件是spring boot在2.0新增的,就是当spring boot应用启动完成之后会发布这个事件。而此时监听到这个事件之后,会触发注册的动作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//DubboServiceRegistrationNonWebApplicationAutoConfiguration 
@EventListener(ApplicationStartedEvent.class)
public void onApplicationStarted() {
setServerPort();
register();
}

private void register() {
if (registered) {
return;
}
serviceRegistry.register(registration);
registered = true;
}

1.2 流程

NacosServiceRegistry.register

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
//NacosServiceRegistry
@Override
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
String serviceId = registration.getServiceId(); //对应当前应用的application.name
String group = nacosDiscoveryProperties.getGroup();//表示nacos上的分组配置
Instance instance = getNacosInstanceFromRegistration(registration);//表示服务实例信息
try {
namingService.registerInstance(serviceId, group, instance);
log.info("nacos registry, {} {} {}:{} register finished", group, serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
// rethrow a RuntimeException if the registration is failed.
// issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e);
}
}

NamingService.registerInstance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) { // 若是临时节点,需要构建心跳信息
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
// 添加心跳信息进行处理
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}

serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}

心跳是nacos客户端发送给nacos服务器的

serverProxy是一个NamingProxy类实例,封装了调用nacos server的api

BeatReactor 心跳发送反应堆。

NamingProxy.registerService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
final Map<String, String> params = new HashMap<String, String>(9);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put(CommonParams.GROUP_NAME, groupName);
params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
// UtilAndComs.NACOS_URL_SERVICE = /nacos/v1/ns/instance
reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);
}

注册时请求的open api 是/nacos/v1/ns/instance

构建的请求参数有:

reqAPI

reqAPI构建POST请求,发送给nacos-server

api: nacos server open api

params: 请求参数

body:

method: 请求方法类型

servers: nacos server地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// NamingProxy
public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException {
params.put(CommonParams.NAMESPACE_ID, getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) {
throw new NacosException(NacosException.INVALID_PARAM, "no server available");
}
NacosException exception = new NacosException();
// 如果nacos-server服务地址不为null,可能为列表
if (servers != null && !servers.isEmpty()) {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());//随机获取一个nacos-server节点
for (int i = 0; i < servers.size(); i++) {
// 确定了server还要用循环处理,为了防止选定的server不靠谱
String server = servers.get(index);
try {
// 调用指定server的接口,一旦调用成功表示注册成功,返回
return callServer(api, params, body, server, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", server, e);
}
}
index = (index + 1) % servers.size(); //调用失败则轮询调用其他server
}
}

if (StringUtils.isNotBlank(nacosDomain)) {
for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {
try {
return callServer(api, params, body, nacosDomain, method);
} catch (NacosException e) {
exception = e;
if (NAMING_LOGGER.isDebugEnabled()) {
NAMING_LOGGER.debug("request {} failed.", nacosDomain, e);
}
}
}
}
// 失败时 写日志。抛出异常
NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}",
api, servers, exception.getErrCode(), exception.getErrMsg());
throw new NacosException(exception.getErrCode(), "failed to req API:/api/" + api + " after all servers(" + servers + ") tried: "
+ exception.getMessage());
}

callServer

该方法用于发起nacos-server的api调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public String callServer(String api, Map<String, String> params, String body, String curServer, String method)
throws NacosException {
long start = System.currentTimeMillis();
long end = 0;
injectSecurityInfo(params); // 添加签名,防止恶意篡改或者网络丢包
List<String> headers = builderHeaders(); // 添加头信息
String url; // 拼接url
if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) {
url = curServer + api;
} else {
if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) {
curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort;
}
url = HttpClient.getPrefix() + curServer + api;
}
//使用HttpClient发起请求
HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, UtilAndComs.ENCODING, method);
end = System.currentTimeMillis();
MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code))
.observe(end - start); // 埋点监控
if (HttpURLConnection.HTTP_OK == result.code) { // 成功 返回服务端结果
return result.content;
}
if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) {
return StringUtils.EMPTY;
}
throw new NacosException(result.code, result.content);
}

2. nacos 处理注册请求

处理注册请求的controller在naming子模块中,InstanceController类中的register方法:

1
2
3
4
5
6
7
8
9
10
11
12
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
final String namespaceId = WebUtils
.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request); // 从请求中解析出Instance
serviceManager.registerInstance(namespaceId, serviceName, instance); // 写内存
return "ok";
}

ServiceManager.registerInstance

serviceManager中有一map,就是文章开头介绍的用于存储服务实例信息的地方,这个map的名字是serviceMap

1
2
3
4
5
6
7
8
9
10
11
12
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//serviceMap中如果没有该服务,则创建一个空的服务放进去
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
//从serviceMap中,根据namespaceId和serviceName得到一个服务对象
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}

addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

createServiceIfAbsent

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster)
throws NacosException {
//从serviceMap中获取namespaceId, serviceName指定的服务
Service service = getService(namespaceId, serviceName);
if (service == null) { // 没有的话就创建一个
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
service.validate();
// >>
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}

putServiceAndInit

1
2
3
4
5
6
7
8
9
10
private void putServiceAndInit(Service service) throws NacosException {
putService(service); // 把服务信息保存到serviceMap集合
service.init(); // 建立心跳检测机制
// 实现数据一致性监听,ephemeral=true表示采用raft协议,false表示采用Distro
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService
.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

这里比较感兴趣的点就是心跳检测,将在进阶篇与客户端的心跳发送一起分析

addInstance

1
2
3
4
5
6
7
8
9
10
11
12
13
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips)
throws NacosException {

String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
Service service = getService(namespaceId, serviceName);
synchronized (service) {
List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
Instances instances = new Instances();
instances.setInstanceList(instanceList);
// nacos数据一致性put
consistencyService.put(key, instances);
}
}

如果nacos-server是集群部署的话,服务注册的实例信息要同步到所有的nacos-server上,所以这里使用consistencyService来完成一致性需求,这里将在进阶篇分析

3. 服务消费者拉取服务列表

服务注册成功之后,消费者就可以从nacos-server中获取到服务提供者的地址,然后进行服务的调用。

在服务消费中,有一个核心的类 NacosDiscoveryClient 来负责和nacos交互,去获得服务提供者的地 址信息。之前在分析dubbo源码的时候已经分析过服务的订阅过程。

NacosDiscoveryClient 中提供了一个 getInstances 方法用来根据服务提供者名称获取服务提供者的 url地址的方法。

3.1 时机

应用启动之时,会执行根据配置的依赖服务名称,进行远程服务代理创建并注入Ioc的流程。在创建代理对象时,会执行服务列表拉取的逻辑。详见《dubbo-服务消费过程分析》

3.2 流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public List<ServiceInstance> getInstances(String serviceId) {
try {
return serviceDiscovery.getInstances(serviceId);
}
catch (Exception e) {
throw new RuntimeException(
"Can not get hosts from nacos server. serviceId: " + serviceId, e);
}
}
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
String group = discoveryProperties.getGroup();
List<Instance> instances = discoveryProperties.namingServiceInstance()
.selectInstances(serviceId, group, true);
return hostToServiceInstanceList(instances, serviceId);
}

调用NamingService,根据serviceId、group获得服务实例列表。 然后把instance转化为ServiceInstance对象

NacosNamingService.selectInstances

1
2
3
4
5
6
7
8
9
10
11
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {

ServiceInfo serviceInfo;
if (subscribe) {
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
return selectInstances(serviceInfo, healthy);
}

selectInstances首先从hostReactor获取serviceInfo,

然后再从serviceInfo.getHosts()剔除非healty、 非enabled、weight小于等于0的instance再返回;

如果subscribe为true,则执行 hostReactor.getServiceInfo获取serviceInfo

否则执行 hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo

HostReactor.getServiceInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {

NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
//从本地缓存中取
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {// 本地没有,两步走
// 1、创建一个空的ServiceInfo放入缓存
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
// 2、将远程服务信息更新到ServiceInfo中,updatingMap看来只是其到标记作用的
updatingMap.put(serviceName, new Object());
// >> 关键点
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName); //更新完毕删除标记
} else if (updatingMap.containsKey(serviceName)) {
... // 若有其他线程已经创建了标记说明其他线程正在更新,同步等待
}
// 开启定时更新任务
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());
}

updateServiceNow(serviceName, clusters)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void updateServiceNow(String serviceName, String clusters) {
// 获取旧的服务信息,应用启动阶段为null
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// 拉取nacos-server上的实例信息,该方法传入的参数 pushReceiver.getUDPPort() 有讲究
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
if (StringUtils.isNotEmpty(result)) {
processServiceJSON(result); //>> 处理返回的结果
}
} catch (Exception e) {
...
} finally {
...
}
}

processServiceJSON

很长的方法,大致逻辑就是,获取新信息,如果存在老信息,则进行比较,分别记录修改的,新增的,删除的实例信息,记录日志,并发出服务修改事件;若旧信息不存在,将新信息 添加到本地缓存并发出服务修改事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
public ServiceInfo processServiceJSON(String json) {
ServiceInfo serviceInfo = JSON.parseObject(json, ServiceInfo.class);//解析json
ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());//本地缓存获取旧服务
if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
//若nacos-server返回的信息为null或不合法,返回老信息
return oldService;
}
boolean changed = false;
// 旧信息不为null,更新阶段
if (oldService != null) {
if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {//旧信息时间不能晚于新信息
NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime()
+ ", new-t: " + serviceInfo.getLastRefTime());
}
// 新信息入缓存
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
// 获取旧信息中所有的实例对象
Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
for (Instance host : oldService.getHosts()) {
oldHostMap.put(host.toInetAddr(), host);
}
// 获取信息中所有的实例对象
Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
for (Instance host : serviceInfo.getHosts()) {
newHostMap.put(host.toInetAddr(), host);
}

Set<Instance> modHosts = new HashSet<Instance>();
Set<Instance> newHosts = new HashSet<Instance>();
Set<Instance> remvHosts = new HashSet<Instance>();

List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
newHostMap.entrySet());
for (Map.Entry<String, Instance> entry : newServiceHosts) {
Instance host = entry.getValue();
String key = entry.getKey();
if (oldHostMap.containsKey(key) && !StringUtils.equals(host.toString(),
oldHostMap.get(key).toString())) {
modHosts.add(host); // 记录前后不一致的实例
continue;
}

if (!oldHostMap.containsKey(key)) {
newHosts.add(host); // 记录新增实例
}
}

for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
Instance host = entry.getValue();
String key = entry.getKey();
if (newHostMap.containsKey(key)) {
continue;
}

if (!newHostMap.containsKey(key)) {
remvHosts.add(host); // 记录删除实例
}
}

if (newHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: "
+ serviceInfo.getKey() + " -> " + JSON.toJSONString(newHosts));
}
if (remvHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: "
+ serviceInfo.getKey() + " -> " + JSON.toJSONString(remvHosts));
}
if (modHosts.size() > 0) {
changed = true;
NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: "
+ serviceInfo.getKey() + " -> " + JSON.toJSONString(modHosts));
}
serviceInfo.setJsonFromServer(json);
if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
eventDispatcher.serviceChanged(serviceInfo);
DiskCache.write(serviceInfo, cacheDir);
}
} else { // 旧信息为null
changed = true;
...//log
serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); // 新信息添加到缓存
eventDispatcher.serviceChanged(serviceInfo); // 发布服务changed事件
serviceInfo.setJsonFromServer(json);
DiskCache.write(serviceInfo, cacheDir); // 磁盘缓存
}

MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
if (changed) {
NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() +
" -> " + JSON.toJSONString(serviceInfo.getHosts()));
}
return serviceInfo;
}

总结:到此为止,服务信息已经拿到了,并存放在HostReactor的本地缓存中,但存在这里没用啊,应用要使用这些信息,还需要将其进行一些列处理后注入到spring容器中,所以这里需要发送一个事件通知spring,此处的数据已经进行了修改。

接下来我们看看事件是怎发出和处理的,非常巧妙

EventDispatcher.serviceChanged(serviceInfo)

1
2
3
4
5
6
7
public void serviceChanged(ServiceInfo serviceInfo) {
if (serviceInfo == null) {
return;
}
// 仅仅只是将服务信息添加到了一个List中
changedServices.add(serviceInfo);
}

changedServices是一个LinkedBlockingQueue阻塞对列,EventDispatcher在构造的时候啊,启动了一个线程来处理这个阻塞队列中的元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private class Notifier implements Runnable {
@Override
public void run() {
while (true) {
ServiceInfo serviceInfo = null;
try {
// 取出元素
serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
} catch (Exception ignore) {
}

if (serviceInfo == null) {
continue;
}
try {
// 从observerMap中取出监听器,并执行
List<EventListener> listeners = observerMap.get(serviceInfo.getKey());
if (!CollectionUtils.isEmpty(listeners)) {
for (EventListener listener : listeners) {
List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts));
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] notify error for service: "
+ serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);
}
}
}
}

observerMap中的监听器是什么时候加进去的呢?

DubboServiceDiscoveryAutoConfiguration.NacosConfiguration中注册了一个事件监听,监听SubscribedServicesChangedEvent事件:

1
2
3
4
5
6
@EventListener(SubscribedServicesChangedEvent.class)
public void onSubscribedServicesChangedEvent(SubscribedServicesChangedEvent event)
throws Exception {
// subscribe EventListener for each service
event.getNewSubscribedServices().forEach(this::subscribeEventListener);
}

当接受到事件时,会执行subscribeEventListener方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private void subscribeEventListener(String serviceName) {
if (listeningServices.add(serviceName)) {
try {
String group = nacosDiscoveryProperties.getGroup();
namingService.subscribe(serviceName, group, event -> {
if (event instanceof NamingEvent) {
NamingEvent namingEvent = (NamingEvent) event;
List<ServiceInstance> serviceInstances = hostToServiceInstanceList(
namingEvent.getInstances(), serviceName);
// 将Changed事件转发给spring-boot
dispatchServiceInstancesChangedEvent(serviceName,serviceInstances);
}
});
}
catch (NacosException e) {
ReflectionUtils.rethrowRuntimeException(e);
}
}
}

我们看subscribe方法:

1
2
3
4
5
@Override//NacosNamingService
public void subscribe(String serviceName, String groupName, List<String> clusters, EventListener listener) throws NacosException {
eventDispatcher.addListener(hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
StringUtils.join(clusters, ",")), StringUtils.join(clusters, ","), listener);
}

subscribe做了两件事:

  1. 主要的工作是将hostReactor中的服务实例信息取出来

  2. 兼职是向eventDispatcher添加了一个Listener,该Listener是通过lamda表达式定义的,见上上段代码。

    这个Listener监听NamingEvent类型的事件。

那我们再回到EventDispatcher的监听逻辑中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private class Notifier implements Runnable {
@Override
public void run() {
while (true) {
ServiceInfo serviceInfo = null;
try {
// 取出元素
serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
} catch (Exception ignore) {
}

if (serviceInfo == null) {
continue;
}
try {
// 从observerMap中取出监听器,并执行
List<EventListener> listeners = observerMap.get(serviceInfo.getKey());
if (!CollectionUtils.isEmpty(listeners)) {
for (EventListener listener : listeners) {
List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
// 向监听器发出NamingEvent
listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts));
}
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] notify error for service: "
+ serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e);
}
}
}
}

可以看到,EventDispatcher向监听器发出的正是NamingEvent

这个NamingEvent被接收后,响应逻辑会创建出一个SubscribedServicesChangedEvent发出:

1
2
3
4
5
6
7
8
9
10
11
private void dispatchServiceInstancesChangedEvent(String serviceName,
Collection<ServiceInstance> serviceInstances) {
if (!hasText(serviceName) || serviceInstances == null) {
return;
}
// 创建事件并发布
ServiceInstancesChangedEvent event = new ServiceInstancesChangedEvent(serviceName,
serviceInstances);
。。。
applicationEventPublisher.publishEvent(event);
}

然后被Spring-boot接收到处理:

1
2
3
4
5
6
@EventListener(SubscribedServicesChangedEvent.class)
public void onSubscribedServicesChangedEvent(SubscribedServicesChangedEvent event)
throws Exception {
// subscribe EventListener for each service
event.getNewSubscribedServices().forEach(this::subscribeEventListener);
}

这里的事件发送监听形成了一个闭环。

scheduleUpdateIfAbsent

添加一个定时任务,更新服务信息,在下一篇笔记《Nacos(三)心跳与服务更新》中分析

4. 核心对象

NacosNamingService

NacosNamingService是NamingService接口的实现类。实现了上面提到的那些方法。
此外,NacosNamingService还起到了初始化其他核心类的作用,因为对外提供的方法都是委托给其他核心类处理的。按顺序将依次初始化EventDispatcher、NamingProxy、BeatReactor、HostReactor。
从NacosNamingService的构造函数我们也可以了解到,可以进行一些参数的自定义,总结如下(部分概念的含义可参考官方文档):

EventDispatcher

EventDispatcher与其他事件分发的组件没什么不同,用于处理subscribe、unsubscribe等等与服务监听相关的方法,并分发NamingEvent到各Listener。
成员变量ConcurrentMap<String, List> observerMap保存了注册的Listener,key为{服务名}@@{集群名},value为各个EventListener的列表。
EventDispatcher会启动1个名为com.alibaba.nacos.naming.client.listener的线程用于处理事件的分发。

注意点:

  • 分发NamingEvent时,按照subscribe(…)方法的调用顺序串行依次调用EventListener的onEvent(…)方法。
  • 调用subscribe(…)方法会引起对应Service的事件分发。

NamingProxy

NamingProxy用于与Nacos服务端通信,注册服务、注销服务、发送心跳等都经由NamingProxy来请求服务端。
NamingProxy会启动1个名为com.alibaba.nacos.client.naming.serverlist.updater的线程,用于定期调用refreshSrvIfNeed()方法更新Nacos服务端地址,默认间隔为30秒
对服务端API的调用将在后文总结。

注意点:refreshSrvIfNeed()方法对Nacos服务端地址的更新仅在使用endpoint的时候才会进行实际更新,如果是通过serverAddr配置的Nacos服务端地址,refreshSrvIfNeed()方法将不会进行任何操作。

BeatReactor

BeatReactor用于向Nacos服务端发送已注册服务的心跳。
成员变量Map<String, BeatInfo> dom2Beat中保存了需要发送的BeatInfo,key为{serviceName}#{ip}#{port},value为对应的BeatInfo。
BeatReactor会启动名为com.alibaba.nacos.naming.beat.sender的线程来发送心跳,默认线程数为1~CPU核心数的一半,可由namingClientBeatThreadCount参数指定。
默认情况下每5秒发送一次心跳,可根据Nacos服务端返回的clientBeatInterval的值调整心跳间隔。

心跳请求的是nacos-server的注册接口,这也就是意味着,每隔5秒,服务都会重新注册一次。

HostReactor

HostReactor用于获取、保存、更新各Service实例信息。
成员变量Map<String, ServiceInfo> serviceInfoMap中保存了已获取到的服务的信息,key为{服务名}@@{集群名}。
HostReactor会启动名为com.alibaba.nacos.client.naming.updater的线程来更新服务信息,默认线程数为1~CPU核心数的一半,可由namingPollingThreadCount参数指定。定时任务UpdateTask会根据服务的cacheMillis值定时更新服务信息,默认值为10秒。该定时任务会在获取某一服务信息时创建,保存在成员变量Map<String, ScheduledFuture<?>> futureMap中。