Nacos(三)心跳与服务更新

nacos通过pull+push结合的方式,来保证服务状态的更新,大致流程如下图所示

  • 消费者应用启动时,会开启一个UpdateTask,每隔10s会向nacos-server发送一个pull请求,拉取最新的服务实例信息

  • 服务提供者,注册时与nacos-server建立了心跳检查,默认是5s发送一次心跳,nacos-server会开启一个心跳检查任务,不停的检查服务心跳,超过15秒没有收到心跳,则将对应实例设置为非健康状态,超过30秒还没有收到心跳,做下线处理,删除实例。

下面,分别对以上两个流程进行源码分析

1 pull请求更新

在上一篇笔记《Nacos(二)注册中心核心源码分析》中,讲到HostReactor.getServiceInfo()方法,在获取到nacos-server的服务实例信息之后,通过一套事件发送响应机制,将实例信息注入到Spring-Boot中,那么在首次获取到实例信息之后,考虑到服务端的实例信息可能会发生改变,所以要建立一个任务,定时的pull服务端信息。

HostReactor.getServiceInfo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
//从本地缓存中取
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
if (null == serviceObj) {// 本地没有,两步走
// 1、创建一个空的ServiceInfo放入缓存
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
// 2、将远程服务信息更新到ServiceInfo中,updatingMap看来只是其到标记作用的
updatingMap.put(serviceName, new Object());
// >> 关键点
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName); //更新完毕删除标记
} else if (updatingMap.containsKey(serviceName)) {
... // 若有其他线程已经创建了标记说明其他线程正在更新,同步等待
}
// 开启定时更新任务
scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey());

}

上面代码已经分析过了,直接看到倒数第二行:

scheduleUpdateIfAbsent(serviceName, clusters)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}

synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}

ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}

futureMap是存放serviceName与定时更新任务异步结果Future映射关系的地方,futureMap中要是存在serviceName对应的Future说明定时任务已经开启了,直接返回,这里做了双重校验。

调用addTask添加一个UpdateTask给定时任务执行,看主要看UpdateTask的run函数做了些什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Override
public void run() {
try {
// 从本地缓存获取ServiceInfo
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
// 本地没有,立即执行更新
if (serviceObj == null) {
updateServiceNow(serviceName, clusters);
// DEFAULT_DELAY = 1000L ,1s后再次执行这个任务
executor.schedule(this, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
return;
}
// 本地有但是过期了,立即更新 lastRefTime默认值是MaxValue
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateServiceNow(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
// if serviceName already updated by push, we should not override it
// since the push data may be different from pull through force push
// 如果服务被基于push机制的流程做了更新,那么我们不需要覆盖本地服务
// 因为push过来的数据和pull来的数据不同,所以这里只是调用接口刷新服务
refreshOnly(serviceName, clusters);
}
// 最后刷新时间
lastRefTime = serviceObj.getLastRefTime();
// 如果没有完成订阅或者futureMap中不包含指定服务信息,则中断更新
if (!eventDispatcher.isSubscribed(serviceName, clusters) &&
!futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
// abort the update task:
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
// 10s后再次执行本任务
executor.schedule(this, serviceObj.getCacheMillis(), TimeUnit.MILLISECONDS);
} catch (Throwable e) {
NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
}

}

看一下两个更新接口

updateServiceNow

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void updateServiceNow(String serviceName, String clusters) {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
// 发送http请求获取实例信息列表
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
if (StringUtils.isNotEmpty(result)) {
processServiceJSON(result); // 处理结果,更新本地缓存
}
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}

refreshOnly

1
2
3
4
5
6
7
public void refreshOnly(String serviceName, String clusters) {
try {
serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
}
}

refreshOnly并没有处理结果,那么这里好像唯一的作用,就是将pushReceiver.getUDPPort()传给了server

因为nacos-server不会永久存储消费者的udp信息,后面的分析会详细介绍。

2. push推送更新

还记得上一篇笔记《Nacos(二)注册中心核心源码分析》分析过,在服务提供者发起服务注册时,nacos-server收到注册请求,处理请求时,会调用createEmptyService方法来创建一个空的服务。

1
2
3
4
5
6
7
8
9
10
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
// >>
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

createEmptyService方法最终会调用到service.init()

1
2
3
4
5
6
7
8
public void init() {
// 开启心跳检查任务
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}

下面就来看一下这个健康检查任务时怎么执行的.

lientBeatCheckTask.run

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
public void run() {
try {
...
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
for (Instance instance : instances) { // 遍历服务节点心跳检测
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { // getInstanceHeartBeatTimeOut 默认15秒
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
..//log
getPushService().serviceChanged(service); // 推送服务变更事件
ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // getIpDeleteTimeout 默认30s
// delete instance
...//log
deleteIp(instance);
}
}
}
...
}

getPushService().serviceChanged(service)

监听服务状态变更事件,然后遍历所有的客户端,通过udp协议进行消息的广播通知

1
2
3
4
5
6
7
8
9
public void serviceChanged(Service service) {
// merge some change events to reduce the push frequency:
if (futureMap
.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
return;
}

this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}

发出了ServiceChangeEvent事件,该事件时Spring事件

PushService既是事件发送方,也是事件监听方,所以定位到响应函数

clientMap nacos-client在调用queryList时,会传一个udp端口号给nacos-server,nacos-server会创建一个udp连接连接到nacos-client,存放在clientMap

clientMap 是一个两层的map结构

第一层的key是:namespaceId##serviceName,将不同的namespace分开

第二层的key是:serviceName:xxx,clusters:xxx,address:xxx,agent:xxx,通过address能够定位到一个服务消费者的ip地址,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
@Override
public void onApplicationEvent(ServiceChangeEvent event) {
Service service = event.getService();
String serviceName = service.getName();
String namespaceId = service.getNamespaceId();

Future future = GlobalExecutor.scheduleUdpSender(() -> {
try {
Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
//拿到服务对应的PushClient key 是namespaceId##serviceName
ConcurrentMap<String, PushClient> clients = clientMap
.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
if (MapUtils.isEmpty(clients)) {
return;
}

Map<String, Object> cache = new HashMap<>(16);
long lastRefTime = System.nanoTime();
for (PushClient client : clients.values()) {
if (client.zombie()) { // 距离上次推送超过10秒
Loggers.PUSH.debug("client is zombie: " + client.toString());
clients.remove(client.toString()); //移除client
Loggers.PUSH.debug("client is zombie: " + client.toString());
continue;
}

Receiver.AckEntry ackEntry;
Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
byte[] compressData = null;
Map<String, Object> data = null;
if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
compressData = (byte[]) (pair.getValue0());
data = (Map<String, Object>) pair.getValue1();

Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
}
// 收到nacos-client的响应数据,
if (compressData != null) {
ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
} else {
ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
if (ackEntry != null) {
cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
}
}

Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}",
client.getServiceName(), client.getAddrStr(), client.getAgent(),
(ackEntry == null ? null : ackEntry.key));

udpPush(ackEntry);
}
} catch (Exception e) {
Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);

} finally {
futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
}

}, 1000, TimeUnit.MILLISECONDS);

futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);

}

从代码中可以了解到

nacos-server端保存的消费者udp连接,并不是一直存储的,当超过一定时间(10秒)没有更新(client请求queryList的时候会创建一个udp连接放到clientsMap中),就会被移除。

因此,面pull更新的时候,有一个refreshOnly接口,就是专门用于刷新server端的udp。

那么,server端push了数据之后,client端又是怎么处理的呢?

在Client端构造HostReactor的时候:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, String cacheDir,
boolean loadCacheAtStart, int pollingThreadCount) {

executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.client.naming.updater");
return thread;
}
});

this.eventDispatcher = eventDispatcher;
this.serverProxy = serverProxy;
this.cacheDir = cacheDir;
if (loadCacheAtStart) {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
} else {
this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
}

this.updatingMap = new ConcurrentHashMap<String, Object>();
this.failoverReactor = new FailoverReactor(this, cacheDir);
this.pushReceiver = new PushReceiver(this); //
}

看到最后一行,创建了一个PushReceiver

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public PushReceiver(HostReactor hostReactor) {
try {
this.hostReactor = hostReactor;
udpSocket = new DatagramSocket();
//创建了一个
executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(true);
thread.setName("com.alibaba.nacos.naming.push.receiver");
return thread;
}
});

executorService.execute(this);
} catch (Exception e) {
NAMING_LOGGER.error("[NA] init udp socket failed", e);
}
}

PushReceiver中开启了一个线程池,任务逻辑就是PushReceiver的run函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Override
public void run() {
while (true) {
try {
// byte[] is initialized with 0 full filled by default
byte[] buffer = new byte[UDP_MSS];
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);

udpSocket.receive(packet);

String json = new String(IoUtils.tryDecompress(packet.getData()), "UTF-8").trim();
NAMING_LOGGER.info("received push data: " + json + " from " + packet.getAddress().toString());

PushPacket pushPacket = JSON.parseObject(json, PushPacket.class);
String ack;
if ("dom".equals(pushPacket.type) || "service".equals(pushPacket.type)) {
hostReactor.processServiceJSON(pushPacket.data);

// send ack to server
ack = "{\"type\": \"push-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
} else if ("dump".equals(pushPacket.type)) {
// dump data to server
ack = "{\"type\": \"dump-ack\""
+ ", \"lastRefTime\": \"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\""
+ StringUtils.escapeJavaScript(JSON.toJSONString(hostReactor.getServiceInfoMap()))
+ "\"}";
} else {
// do nothing send ack only
ack = "{\"type\": \"unknown-ack\""
+ ", \"lastRefTime\":\"" + pushPacket.lastRefTime
+ "\", \"data\":" + "\"\"}";
}

udpSocket.send(new DatagramPacket(ack.getBytes(Charset.forName("UTF-8")),
ack.getBytes(Charset.forName("UTF-8")).length, packet.getSocketAddress()));
} catch (Exception e) {
NAMING_LOGGER.error("[NA] error while receiving push data", e);
}
}
}

开启了一个udp监听,收到DatagramPacket后进行反序列化,解析,通过processServiceJSON()方法进行处理,最后还响应出一个ack。

大致的流程就分析完了,一些细节部分略过了,有时间会继续跟。