SpringCloud-Eureka自我保护机制

常用的注册中解决方案:

  • Eureka 非持久化存储、ap模型、去中心化集群模式
  • Zookeeper zab协议
  • Consul
  • Nacos
  • Redis
  • Etcd

服务信息发生变化,消费者如何感知

  • 等待注册中心push
    • 优点:实时性高
    • 缺点:注册中心需要维护所有活动消费者的信息才能完成推送
  • 主动pull
    • 优点:没有以上的缺点
    • 缺点:服务信息更新的延迟时间可能会导致请求发送到实效节点
  • long polling

Eureka的自我保护机制

Eureka Server在运行期间会去统计心跳成功的比例在15分钟之内是否低于85% , 如果低于85%, Eureka Server会认为当前实例的客户端与自己的心跳连接出现了网络故障,那么Eureka Server会把这 些实例保护起来,让这些实例不会过期导致实例剔除。

这样做的目的是为了减少网络不稳定或者网络分区的情况下,Eureka Server将健康服务剔除下线的问题。 使用自我保护机制可以使得Eureka 集群更加健壮和稳定的运行。

进入自我保护状态后,会出现以下几种情况:

  • Eureka Server不再从注册列表中移除因为长时间没有收到心跳而应该剔除的过期服务
  • Eureka Server仍然能够接受新服务的注册和查询请求,但是不会被同步到其他节点上,保证当前节点依然可用。
开启自我保护

为了演示效果,我们通过如下配置,将判定时间改为10s,接着启动Eureka Server,等待10s之 后,就会出现以上提示信息,表示自我保护被激活了。

1
2
3
#设置 eureka server同步失败的等待时间默认5分
#在这期间,它不向客户端提供服务注册信息
eureka.server.wait-time-in-ms-when-sync-empty=10000
两个变量

在Eureka的自我保护机制中,有两个很重要的变量,Eureka的自我保护机制,都是围绕这两个变量来 实现的,在AbstractInstanceRegistry这个类中定义的

1
2
3
4
//每分钟最小续约数量
protected volatile int numberOfRenewsPerMinThreshold;
//预期每分钟收到续约的 客户端数量,取决于注册到eureka server上的服务数量
protected volatile int expectedNumberOfClientsSendingRenews;

numberOfRenewsPerMinThreshold 表示每分钟的最小续约数量,它表示什么意思呢?就是Eureka Server期望每分钟收到客户端实例续约的总数的阈值。如果小于这个阈值,就会触发自我保护机制。

它是在以下代码中赋值的:

1
2
3
4
5
6
protected void updateRenewsPerMinThreshold() {
this.numberOfRenewsPerMinThreshold = (int)(this.expectedNumberOfClientsSendingRenews
* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
* serverConfig.getRenewalPercentThreshold());
}
//自我保护阀值 = 服务总数 * 每分钟续约数(60S/客户端续约间隔) * 自我保护续约百分比阀值因子
  • getExpectedClientRenewalIntervalSeconds,客户端的续约间隔,默认为30s
  • getRenewalPercentThreshold,自我保护续约百分比阈值因子,默认0.85。 也就是说每分钟的续 约数量要大于85%
这两个变量的更新

需要注意的是,这两个变量是动态更新的,有四个地方来更新这两个值

  1. Eureka-Server的初始化

在EurekaBootstrap这个类中,有一个 initEurekaServerContext 方法

1
2
3
4
5
 protected void initEurekaServerContext() throws Exception {
EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
//...
registry.openForTraffic(applicationInfoManager, registryCount);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// PeerAwareInstanceRegistryImpl.openForTraffic
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int
count) {
this.expectedNumberOfClientsSendingRenews = count; //初始化
updateRenewsPerMinThreshold(); //更新numberOfRenewsPerMinThreshold
logger.info("Got {} instances from neighboring DS node", count);
logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis();
if (count > 0) {
this.peerInstancesTransferEmptyOnStartup = false;
}
DataCenterInfo.Name selfName =applicationInfoManager.getInfo().getDataCenterInfo().getName();
boolean isAws = Name.Amazon == selfName;
if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
logger.info("Priming AWS connections for all replicas..");
primeAwsReplicas(applicationInfoManager);
}
logger.info("Changing status to UP");
applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
super.postInit();
}
  1. PeerAwareInstanceRegistryImpl.cancel

当服务提供者主动下线时,表示这个时候Eureka-Server要剔除这个服务提供者的地址,同时也代表这 这个心跳续约的阈值要发生变化。所以在 PeerAwareInstanceRegistryImpl.cancel 中可以看到数据 的更新

调用路径 PeerAwareInstanceRegistryImpl.cancel -> AbstractInstanceRegistry.cancel- >internalCancel

服务下线之后,意味着需要发送续约的客户端数量递减了,所以在这里进行修改

1
2
3
4
5
6
7
8
9
10
11
protected boolean internalCancel(String appName, String id, boolean
isReplication) {
//....
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
// Since the client wants to cancel it, reduce the number of clientsto send renews.
this.expectedNumberOfClientsSendingRenews =this.expectedNumberOfClientsSendingRenews - 1;
updateRenewsPerMinThreshold();
}
}
}
  1. PeerAwareInstanceRegistryImpl.register

当有新的服务提供者注册到eureka-server上时,需要增加续约的客户端数量,所以在register方法中会 进行处理

register ->super.register(AbstractInstanceRegistry)

1
2
3
4
5
6
7
8
9
public void register(InstanceInfo registrant, int leaseDuration, booleanisReplication) {
// The lease does not exist and hence it is a new registration
synchronized (lock) {
if (this.expectedNumberOfClientsSendingRenews > 0) {
this.expectedNumberOfClientsSendingRenews =this.expectedNumberOfClientsSendingRenews + 1;
updateRenewsPerMinThreshold();
}
}
}
  1. PeerAwareInstanceRegistryImpl.scheduleRenewalThreshold UpdateTask

15分钟运行一次,判断在15分钟之内心跳失败比例是否低于85%。在

DefaultEurekaServerContext 》@PostConstruct修饰的initialize()方法》init()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private void scheduleRenewalThresholdUpdateTask() {
timer.schedule(new TimerTask() {
@Override
public void run() {
updateRenewalThreshold();
}
}, serverConfig.getRenewalThresholdUpdateIntervalMs(),
serverConfig.getRenewalThresholdUpdateIntervalMs());
}
private void updateRenewalThreshold() {
try {
Applications apps = eurekaClient.getApplications();
int count = 0;
for (Application app : apps.getRegisteredApplications()) {
for (InstanceInfo instance : app.getInstances()) {
if (this.isRegisterable(instance)) {
++count;
}
}
}
synchronized (lock) {
// Update threshold only if the threshold is greater than the
// current expected threshold or if self preservation is disabled.
if ((count) > (serverConfig.getRenewalPercentThreshold() *
expectedNumberOfClientsSendingRenews)
|| (!this.isSelfPreservationModeEnabled())) {
this.expectedNumberOfClientsSendingRenews = count;
updateRenewsPerMinThreshold();
}
}
logger.info("Current renewal threshold is : {}",numberOfRenewsPerMinThreshold);
} catch (Throwable e) {
logger.error("Cannot update renewal threshold", e);
}
}

自我保护机制触发任务

在AbstractInstanceRegistry的postInit方法中,会开启一个EvictionTask的任务,这个任务用来检测是 否需要开启自我保护机制。

1
2
3
4
5
6
7
8
9
10
protected void postInit() {
renewsLastMin.start();
if (evictionTaskRef.get() != null) {
evictionTaskRef.get().cancel();
}
evictionTaskRef.set(new EvictionTask());
evictionTimer.schedule(evictionTaskRef.get(),
serverConfig.getEvictionIntervalTimerInMs(),
serverConfig.getEvictionIntervalTimerInMs());
}

其中,EvictionTask表示最终执行的任务

1
2
3
4
5
6
7
8
9
10
11
 private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);
@Override
public void run() {
try {
long compensationTimeMs = getCompensationTimeMs();
logger.info("Running the evict task with compensationTime {}ms",compensationTimeMs);
evict(compensationTimeMs);
} catch (Throwable e) {
logger.error("Could not run the evict task", e);
}
}
1
2
3
4
5
6
7
8
9
 public void evict(long additionalLeaseMs) {
logger.debug("Running the evict task");
// 是否需要开启自我保护机制,如果需要,那么直接RETURE, 不需要继续往下执行了
if (!isLeaseExpirationEnabled()) {
logger.debug("DS: lease expiration is currently disabled.");
return;
}
//这下面主要是做服务自动下线的操作的
}

isLeaseExpirationEnabled

  • 是否开启了自我保护机制,如果没有,则跳过,默认是开启
  • 计算是否需要开启自我保护,判断最后一分钟收到的续约数量是否大于 numberOfRenewsPerMinThreshold
1
2
3
4
5
6
7
8
 public boolean isLeaseExpirationEnabled() {
if (!isSelfPreservationModeEnabled()) {
// The self preservation mode is disabled, hence allowing the instancesto expire.
return true;
}
return numberOfRenewsPerMinThreshold > 0 &&
getNumOfRenewsInLastMin() >numberOfRenewsPerMinThreshold;
}