SpringCloud-Eureka底层源码分析

*设计思想:

*执行流程:

Registry 名词 登记处、挂号处

Registration 名词 登记、注册、挂号

EurekaServiceRegistry 这个类顾名思义就是服务注册的地方,登记处,这个类实现了ServiceRegistry<EurekaRegistration>接口,其中的泛型EurekaRegistration顾名思义就是一次eureka注册,也是一个名词对象。EurekaServiceRegistry负责将EurekaRegistration注册到注册中心

1. 客户端服务注册流程

1.1 启动时注册

容器在实例化CloudEurekaClient时,父类的构造函数会执行第一次注册,不做配置的话默认是不会进行注册的。

1
2
3
4
5
6
7
8
9
10
11
12
// DiscoveryClient 
// registration.enabled=true(默认是true) && shouldEnforceRegistrationAtInit=true(默认是false)时,会进行一次注册
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}

1.2 启动后注册

以下的流程是应用在启动完成之后,触发定时注册任务执行的流程:

  • 服务启动后会调用SmartLifeCycle.start()方法,EurekaAutoServiceRegistration这个类实现了SmartLifecycle接口,所以它是服务注册流程的发起者。

  • start()方法中最终会调用到EurekaServiceRegistryregister方法,这个方法并没有立即执行服务注册,仅仅向ApplicationInfoManager中设置了一个状态以及健康检查处理器。ApplicationInfoManager是用于管理即将发布的服务具体信息对象。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // EurekaServiceRegistry
    public void register(EurekaRegistration reg) {
    maybeInitializeClient(reg);
    ...
    reg.getApplicationInfoManager() //getInitialStatus 结果是UP
    .setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

    reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg
    .getEurekaClient().registerHealthCheck(healthCheckHandler));
    }
  • 关键在设置状态这块,服务启动时的状态是STARTING,发布之后的状态是UP,因此这里要做一个状态切换,切换之后,会向状态监听器发送一个StatusChangeEvent事件。状态监听器 listener 的实例化在DiscoveryClient的构造函数中执行,见后文。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    public synchronized void setInstanceStatus(InstanceStatus status) {
    InstanceStatus next = instanceStatusMapper.map(status);
    if (next == null) {
    return;
    }

    InstanceStatus prev = instanceInfo.setStatus(next);
    if (prev != null) {
    for (StatusChangeListener listener : listeners.values()) {
    try { //发布状态变更通知,listener的初始化需要关注
    listener.notify(new StatusChangeEvent(prev, next));
    } catch (Exception e) {
    ...
    }
    }
    }
    }
  • 客户端自动装配会装载一个CloudEurekaClient实例,它是DiscoveryClient的子类,父类DiscoveryClient的构造函数最终会执行initScheduledTasks()方法,开启几个定时服务:

    • 如果配置了服务列表刷新,则会开启 cacheRefresh 定时任务;
    • 如果开启了服务注册开关,则会建立心跳机制,开启定时任务发送心跳;
    • 通过内部类实例化一个服务状态监听器StatusChangeListener添加到ApplicationInfoManager的listener变量中,重点关注该监听器的notify()方法:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // initScheduledTasks方法中的匿名类对象
    statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
    @Override
    public String getId() {
    return "statusChangeListener";
    }
    @Override
    public void notify(StatusChangeEvent statusChangeEvent) {
    ... // log
    instanceInfoReplicator.onDemandUpdate(); //>>
    }
    });
  • 该监听器接收到StatusChangeEvent后,会执行InstanceInfoReplicatoronDemandUpdate方法。InstanceInfoReplicator是一个Runnable实例,它是一个用于更新和复制本地的服务实例信息到Eureka服务器的任务。

    1
    InstanceInfoReplicator is A task for updating and replicating the local instanceinfo to the remote server.

Demand是强烈要求的意思,所以该方法执行强制刷新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
if (!scheduler.isShutdown()) {
scheduler.submit(new Runnable() {
@Override
public void run() {
Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
... // log
latestPeriodic.cancel(false);
}
InstanceInfoReplicator.this.run();
}
});
return true;
} else {
return false;
}
} else {
return false;
}
}
  • onDemandUpdate方法向线程池提交一个任务,执行run函数,run函数中调用了DiscoveryClientregisty方法,这里才是真正的注册逻辑,构造一个http请求发送给Eureka服务器。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    public void run() {
    try {
    discoveryClient.refreshInstanceInfo();

    Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
    if (dirtyTimestamp != null) {
    discoveryClient.register(); // 这里执行注册 >>
    instanceInfo.unsetIsDirty(dirtyTimestamp);
    }
    } catch (Throwable t) {
    logger.warn("There was a problem with the instance info replicator", t);
    } finally {
    Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
    scheduledPeriodicRef.set(next);
    }
    }
    1
    2
    3
    4
    5
    6
    7
    boolean register() throws Throwable {
    EurekaHttpResponse<Void> httpResponse;
    try {
    httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } ...
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }
  • 总结–Eureka Client发起服务注册时,有两个地方会执行服务注册的任务:

    • 在Spring Boot启动时,由于自动装配机制将CloudEurekaClient注入到了容器,并且执行了构造方法,而在构造DiscoveryClient时会执行第一次注册。
    • DiscoveryClient启动的定时任务,会实例化一个状态监听器statusChangeListener,每当服务状态发生变化的时候会执行StatusChangeListener.notify()进行服务状态变更,更新服务状态会执行服务注册,默认是40秒检查一次。

2. 客户端服务发现流程

继续来研究服务的发现过程,就是客户端需要能够满足两个功能

  • 在启动的时候获取指定服务提供者的地址列表
  • Eureka server端服务提供者地址发生变化时,消费者需要动态感知
DiscoveryClient构造时进行查询

DiscoveryClient构造方法中,如果当前的客户端默认开启了fetchRegistry,则会从eureka-server中拉取数据。

1
2
3
4
//DiscoveryClient
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}

DiscoveryClient构造的时候,会初始化一些任务,这个在前面咱们分析过了。其中有一个任务动态更新本地服务地址列表,叫 cacheRefreshTask 。 这个任务最终执行的是CacheRefreshThread这个线程。它是一个周期性执行的任务,具体我们来看一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}

从整体上看,TimedSupervisorTask是固定间隔的周期性任务,一旦遇到超时就会将下一个周期的间隔 时间调大,如果连续超时,那么每次间隔时间都会增大一倍,一直到达外部参数设定的上限为止,一旦 新任务不再超时,间隔时间又会自动恢复为初始值。这种设计还是值得学习的。

CacheRefreshThread.refreshRegistry

1
2
3
4
5
class CacheRefreshThread implements Runnable {
public void run() {
refreshRegistry();
}
}
1
2
3
4
5
6
@VisibleForTesting
void refreshRegistry()
...
boolean success = fetchRegistry(remoteRegionsModified);
...
}

DisccoveryClient.fetchRegistry

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

try {
// 取出本地缓存的服务列表信息
Applications applications = getApplications();
//判断多个条件,确定是否触发全量更新,如下任一个满足都会全量更新:
//1. 是否禁用增量更新;
//2. 是否对某个region特别关注;
//3. 外部调用时是否通过入参指定全量更新;
//4. 本地还未缓存有效的服务列表信息;
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
//调用全量更新
getAndStoreFullRegistry();
} else {
//调用增量更新
getAndUpdateDelta(applications);
}
//重新计算和设置一致性hash码
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();//日志打印所有应用的所有实例数之和
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
//将本地缓存更新的事件广播给所有已注册的监听器,注意该方法已被CloudEurekaClient类重写
onCacheRefreshed();
//检查刚刚更新的缓存中,有来自Eureka server的服务列表,其中包含了当前应用的状态,
//当前实例的成员变量lastRemoteInstanceStatus,记录的是最后一次更新的当前应用状态,
//上述两种状态在updateInstanceRemoteStatus方法中作比较 ,如果不一致,就更新lastRemoteInstanceStatus,并且广播对应的事件
updateInstanceRemoteStatus();
return true;
}

DiscoveryClient.getAndStoreFullRegistry

从eureka server端获取服务注册中心的地址信息,然后更新并设置到本地缓存 localRegionApps 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private void getAndStoreFullRegistry() throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();

logger.info("Getting all instance registry info from the eureka server");

Applications apps = null;
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
apps = httpResponse.getEntity();
}
logger.info("The response status is {}", httpResponse.getStatusCode());

if (apps == null) {
logger.error("The application is null for some reason. Not storing this information");
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
localRegionApps.set(this.filterAndShuffle(apps));
logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
} else {
logger.warn("Not updating applications as another thread is updating it already");
}
}

3. Eureka Server收到请求之后的处理

3.1 执行流程

我们就把服务注册在客户端和服务端的处理过程做了一个详细的分析,实际上在Eureka Server 端,会把客户端的地址信息保存到ConcurrentHashMap中存储。并且服务提供者和注册中心之间,会 建立一个心跳检测机制,用于监控服务提供者的健康状态。

在没分析源码实现之前,我们一定知道它肯定对请求过来的服务实例数据进行了存储。那么我们去Eureka Server端看一下处理流程。

请求入口在: com.netflix.eureka.resources.ApplicationResource.addInstance()

可以发现,这里所提供的REST服务,采用的是jersey来实现的。Jersey是基于JAX-RS标准,提供 REST的实现的支持,这里就不展开分析了。

当EurekaClient调用register方法发起注册时,会调用ApplicationResource.addInstance方法。

服务注册就是发送一个 POST 请求带上当前实例信息到类 ApplicationResource 的 addInstance 方法进行服务注册

ApplicationResource.addInstance()

1
2
3
4
5
6
7
8
9
10
 @POST  //ApplicationResource
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
...
// 处理客户端可能注册了错误的DataCenterInfo并丢失数据的情况
...
registry.register(info, "true".equals(isReplication)); // 注册
return Response.status(204).build(); // 204 to be backwards compatible
}

addInstance 方法中,最终调用的是 PeerAwareInstanceRegistryImpl.register 方法。

PeerAwareInstanceRegistryImpl.register

1
2
3
4
5
6
7
8
9
10
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; // 租约过期时间,默认90s
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
super.register(info, leaseDuration, isReplication); // 调用父类
// 将信息复制到Eureka Server集群的其他机器上,
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

PeerAwareInstanceRegistry的最顶层接口为LeaseManager与LookupService,其中LookupService定义了最基本的发现示例的行为,LeaseManager定义了处理客户端注册,续约,注销等操作.

  • leaseDuration 表示租约过期时间,默认90s,当服务器超过90s没有收到客户端的心跳,主动剔除该节点;
  • 调用父类方法发起服务注册
  • 同步实现很简单,获取及群众的所有节点,逐个发起注册。

AbstractInstanceRegistry.register

简单来说,Eureka-Server的服务注册,实际上是将客户端传递过来的实例数据保存到Eureka-Server中的ConcurrentHashMap中。

首先看一下注册表的结构:Map<String,Map<String,Lease<InstanceInfo>>>,第一层映射是appName到服务列表的映射,第二层映射是在一个AppName中,服务InstanceId到租约的映射。

Lease是租约,持有一个InstanceInfo引用,以及该实例信息的注册时间、上次更新时间、剔除时间、服务上线时间、租约有效期限等等信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// AbstractInstanceRegistry
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
read.lock();
//从registry中获得当前实例信息,根据appName
Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
REGISTER.increment(isReplication);//增加注册次数到监控信息中
if (gMap == null) {//如果当前appName是第一次注册,则初始化一个ConcurrentHashMap
final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
if (gMap == null) {
gMap = gNewMap;
}
}
//从gMap中查询已经存在的Lease信息,Lease中文翻译为租约
//实际上它把服务提供者的实例信息包装成了一个lease,里面提供了对于服务实例的租约管理
Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
//当instance已经存在,和客户端的instance的信息做比较,时间最新的那个,为有效 instance信息
if (existingLease != null && (existingLease.getHolder() != null)) {
Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
...
if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
...
registrant = existingLease.getHolder();
}
} else {
// 当租约不存在
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
//客户端续约次数+1
this.expectedNumberOfClientsSendingRenews =
this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
logger.debug("No previous lease information found; it is new registration");
}
// 构建一个租约
Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
if (existingLease != null) {
// 当原来存在Lease的信息时,设置serviceUpTimestamp,
//保证服务启动的时间一直是第一次注册的那个
lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
}
gMap.put(registrant.getId(), lease);
recentRegisteredQueue.add(new Pair<Long, String>( //添加到最近注册的队列中
System.currentTimeMillis(),
registrant.getAppName() + "(" + registrant.getId() + ")"));
// 检查实例状态是否发生变化,如果是并且存在,则覆盖原来的状态
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
...
if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
logger.info("Not found overridden id {} and hence adding it", registrant.getId());
overriddenInstanceStatusMap.put(registrant.getId(),
registrant.getOverriddenStatus());
}
}
InstanceStatus overriddenStatusFromMap =
overriddenInstanceStatusMap.get(registrant.getId());
if (overriddenStatusFromMap != null) {
logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
registrant.setOverriddenStatus(overriddenStatusFromMap);
}

// Set the status based on the overridden status rules
InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
registrant.setStatusWithoutDirty(overriddenInstanceStatus);

// 得到instanceStatus,判断是否是UP状态,
if (InstanceStatus.UP.equals(registrant.getStatus())) {
lease.serviceUp();
}
// 设置注册类型为添加
registrant.setActionType(ActionType.ADDED);
// 租约变更记录队列,记录了实例的每次变化, 用于注册信息的增量获取
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
registrant.setLastUpdatedTimestamp();
// 让缓存失效
invalidateCache(registrant.getAppName(), registrant.getVIPAddress(),
registrant.getSecureVipAddress());
...
} finally {
read.unlock();
}
}

3.2 Eureka 的多级缓存设计

Eureka Server存在三个变量:(registry、readWriteCacheMap、readOnlyCacheMap)保存服务注册信息,默认情况下定时任务每30s将readWriteCacheMap同步至readOnlyCacheMap,每60s清理超过90s未续约的节点,Eureka Client每30s从readOnlyCacheMap更新服务注册信息,而客户端服务的注册则从registry更新服务注册信息

1、多级缓存的意义

这里为什么要设计多级缓存呢?原因很简单,就是当存在大规模的服务注册和更新时,如果只是修改一 个ConcurrentHashMap数据,那么势必因为锁的存在导致竞争,影响性能。

而Eureka又是AP模型,只需要满足最终可用就行。所以它在这里用到多级缓存来实现读写分离。注册方法写的时候直接写内存注册表,写完表之后主动失效读写缓存。

获取注册信息接口先从只读缓存取,只读缓存没有再去读写缓存取,读写缓存没有再去内存注册表里取(不只是取,此处较复杂)。并且,读写缓存会更新回写只读缓存

  • responseCacheUpdateIntervalMs : readOnlyCacheMap 缓存更新的定时器时间间隔,默认为 30秒
  • responseCacheAutoExpirationInSeconds : readWriteCacheMap 缓存过期时间,默认为 180 秒 。
2、服务注册的缓存失效

在AbstractInstanceRegistry.register方法的最后,会调用invalidateCache方法,使得读写缓存失效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void invalidate(Key... keys) {
for (Key key : keys) {
...
readWriteCacheMap.invalidate(key);
Collection<Key> keysWithRegions = regionSpecificKeys.get(key);
if (null != keysWithRegions && !keysWithRegions.isEmpty()) {
for (Key keysWithRegion : keysWithRegions) {
logger.debug("Invalidating the response cache key : {} {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());
readWriteCacheMap.invalidate(keysWithRegion);
}
}
}
}
3、定时同步缓存

ResponseCacheImpl的构造方法中,会启动一个定时任务,这个任务会定时检查读写缓存中的数据变化,进行更新和同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
...
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
...
} finally {
CurrentRequestVersion.remove();
}
}
}
};
}

3.3 服务续约

所谓的服务续约,其实就是一种心跳检查机制。客户端会定期发送心跳来续约。那么简单看一下代码的实现

前文第一部分分析了客户端会在 initScheduledTasks 中,创建一个心跳检测的定时任务

1
2
3
4
5
6
7
8
9
heartbeatTask = new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
);

然后这个定时任务中,会执行一个 HearbeatThread 的线程,这个线程会定时调用renew()来做续约。

1
2
3
4
5
6
7
private class HeartbeatThread implements Runnable {
public void run() {
if (renew()) {
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}

服务端收到心跳请求之后

ApplicationResource.getInstanceInfo这个接口中,会返回一个InstanceResource的实例,在该实例 下,定义了一个statusUpdate的接口来更新状态

1
2
3
4
@Path("{id}")
public InstanceResource getInstanceInfo(@PathParam("id") String id) {
return new InstanceResource(this, id, serverConfig, registry);
}

InstanceResource.statusUpdate()

在该方法中,我们重点关注 这个方法,它会调用 AbstractInstanceRegistry.statusUpdate来更新指定服务提供者在服务端存储的信息中的变化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@PUT
@Path("status")
public Response statusUpdate(
@QueryParam("value") String newStatus,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
try {
if (registry.getInstanceByAppAndId(app.getName(), id) == null) {
logger.warn("Instance not found: {}/{}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
boolean isSuccess = registry.statusUpdate(app.getName(), id,
InstanceStatus.valueOf(newStatus), lastDirtyTimestamp,
"true".equals(isReplication));

if (isSuccess) {
return Response.ok().build();
} else {
return Response.serverError().build();
}
} catch (Throwable e) {
return Response.serverError().build();
}
}

AbstractInstanceRegistry.statusUpdate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
@Override
public boolean statusUpdate(String appName, String id,
InstanceStatus newStatus, String lastDirtyTimestamp,
boolean isReplication) {
try {
read.lock();
// 更新状态的次数 状态统计
STATUS_UPDATE.increment(isReplication);
// 从本地数据里面获取实例信息,
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> lease = null;
if (gMap != null) {
lease = gMap.get(id);
}
// 实例不存在,则直接返回,表示失败
if (lease == null) {
return false;
} else {
// 执行一下lease的renew方法,里面主要是更新了这个instance的最后更新时间。
lease.renew();
InstanceInfo info = lease.getHolder(); // 获取instance实例信息
...
// 当instance信息不为空时,并且实例状态发生了变化
if ((info != null) && !(info.getStatus().equals(newStatus))) {
// 如果新状态是UP的状态,那么启动一下serviceUp() , 主要是更新服务的注册时间
if (InstanceStatus.UP.equals(newStatus)) {
lease.serviceUp();
}
// 将instance Id 和这个状态的映射信息放入覆盖缓存MAP里面去
overriddenInstanceStatusMap.put(id, newStatus);
// 设置覆盖状态到实例信息里面去
info.setOverriddenStatus(newStatus);
long replicaDirtyTimestamp = 0;
info.setStatusWithoutDirty(newStatus);
if (lastDirtyTimestamp != null) {
replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp);
}
//如果replicaDirtyTimestamp 的时间大于instance的 getLastDirtyTimestamp() ,则更新
if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) {
info.setLastDirtyTimestamp(replicaDirtyTimestamp);
}
info.setActionType(ActionType.MODIFIED);
recentlyChangedQueue.add(new RecentlyChangedItem(lease));
info.setLastUpdatedTimestamp();
//更新写缓存
invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress());
}
return true;
}
} finally {
read.unlock();
}
}

至此,心跳续约功能就分析完成了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
public void run() {
Future<?> future = null;
try {
//使用Future,可以设定子线程的超时时间,这样当前线程就不用无限等待了
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
//指定等待子线程的最长时间
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
//delay是个很有用的变量,后面会用到,这里记得每次执行任务成功都会将delay重置
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();
} catch (TimeoutException e) {
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();

long currentDelay = delay.get();
//任务线程超时的时候,就把delay变量翻倍,但不会超过外部调用时设定的最大延时时间
long newDelay = Math.min(maxDelay, currentDelay * 2);
//设置为最新的值,考虑到多线程,所以用了CAS
delay.compareAndSet(currentDelay, newDelay);

} catch (RejectedExecutionException e) {
//一旦线程池的阻塞队列中放满了待处理任务,触发了拒绝策略,就会将调度器停掉
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}

rejectedCounter.increment();
} catch (Throwable e) {//一旦出现未知的异常,就停掉调度器
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}

throwableCounter.increment();
} finally {
//这里任务要么执行完毕,要么发生异常,都用cancel方法来清理任务;
if (future != null) {
future.cancel(true);
}
//只要调度器没有停止,就再指定等待时间之后在执行一次同样的任务
if (!scheduler.isShutdown()) {
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}

这里就是周期性任务的原因:只要没有停止调度器,就再创建一次性任务,执行时间时dealy的值,假设外部调用时传入的超时时间为30秒(构造方法的入参timeout),最大间隔时间为50 秒(构造方法的入参expBackOffBound)

如果最近一次任务没有超时,那么就在30秒后开始新任务,如果最近一次任务超时了,那么就在50秒后开始新任务(异常处理中有个乘以二的操作, 乘以二后的60秒超过了最大间隔50秒)

3.4 服务端查询服务地址流程

前面我们知道,客户端发起服务地址的查询有两种,一种是全量、另一种是增量。对于全量查询请求, 会调用Eureka-server的ApplicationsResource的getContainers方法。

而对于增量请求,会调用ApplicationsResource.getContainerDifferential。

ApplicationsResource.getContainers

接收客户端发送的获取全量注册信息请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {

boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
// EurekaServer无法提供服务,返回403
if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
}
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;// 设置返回数据格式,默认JSON
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
// 如果接收到的请求头部没有具体格式信息,则返回格式为XML
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
// 构建缓存键
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
// 返回不同的编码类型的数据,去缓存中取数据的方法基本一致
Response response;
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
response = Response.ok(responseCache.get(cacheKey))
.build();
}
CurrentRequestVersion.remove();
return response;
}

responseCache.getGZIP

从缓存中读取数据。

1
2
3
4
5
6
7
public byte[] getGZIP(Key key) {
Value payload = getValue(key, shouldUseReadOnlyResponseCache);
if (payload == null) {
return null;
}
return payload.getGzipped();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@VisibleForTesting
Value getValue(final Key key, boolean useReadOnlyCache) {
Value payload = null;
try {
if (useReadOnlyCache) {
final Value currentPayload = readOnlyCacheMap.get(key);
if (currentPayload != null) {
payload = currentPayload;
} else {
payload = readWriteCacheMap.get(key);
readOnlyCacheMap.put(key, payload);
}
} else {
payload = readWriteCacheMap.get(key);
}
} catch (Throwable t) {
logger.error("Cannot get value for key : {}", key, t);
}
return payload;
}