SpringCloud-Ribbon实现源码分析

客户端的负载均衡解决的问题:从配置文件或注册中心获取需要调用的服务列表,使用指定的算法选择一个服务实例,发送调用请求。

流程总结:

  • RestTemplate.getObject方法开始流程,执行到doExcute()方法中,该方法创建一个Http请求发送请求
  • 创建http请求是一个InterceptingClientHttpRequest对象,表示这是一个正在被拦截的客户端http请求,ribbon负载均衡就是基于拦截器实现的
  • InterceptingClientHttpRequest有方法excute(),此方法对请求执行拦截处理,最后发送出去。
  • excute()方法中,获取到LoadBalancerInterceptor即负载均衡拦截器,执行该拦截器的intercept()方法进行拦截处理。该拦截器对象有两个成员
    • LoadBalancerRequestFactory,用于构建负载均衡请求的工厂对象
    • LoadBalancerClient,主要用于执行负载均衡请求,Ribbon中的实现类是RibbonLoadBalanceClient
  • RibbonLoadBalanceClient的核心逻辑就是执行负载均衡算法,先获取负载均衡器,再使用负载均衡器获取真正的服务器地址,再使用服务器信息创建一个RibbonServer,执行负载均衡请求的apply方法
  • 负载均衡请求LoadBalancerRequest接口只定义了一个apply()方法,该方法主要负责递归执行InterceptingClientHttpRequest.execute()方法,执行所有拦截器拦截。ribbon只在一个匿名类中定义了apply方法,主要逻辑就是创建了一个请求包装类对象ServiceRequestWrapper,执行拦截。
  • 最后使用SimpleClientHttpRequestFactory创建一个标准的http请求发送出去

采用ribbon做客户端负载,一般会搭配RestTemplate一起使用,在RestTemplate上注解@LoadBalanced,则调用

RestTemplate方法时,ribbon会自动生效实现客户端的负载均衡。

配置负载均衡的RestTemplate

1
2
3
4
5
@Bean
@LoadBalanced
public RestTemplate getRestTemplate(){
return new RestTemplate();
}

使用RestTemplate进行远程服务访问:

1
2
3
4
5
6
7
8
public class UserServiceImpl implements UserService {
@Autowired
private RestTemplate restTemplate;
public User getUser(Long id) {
String url = "http://USER-PROVIDER"+"/user/"+id;
return restTemplate.getForObject(url,User.class);
}
}

以上代码使用服务名称访问远程主机,因此肯定有一个服务名称解析的过程,可以大致猜测执行流程:

  1. 通过服务名称USER-PROVIDER从注册中心拿到该服务的主机列表;
  2. 根据配置的负载均衡策略选择一个主机;
  3. 重构一个http请求,发送给指定的主机,接收响应数据。

到底是不是这样,下面我将从源代码中找到答案。

1. 注解@LoadBalanced

要使用ribbon实现负载均衡,就需要使用到该注解,那么这个注解的是在什么地方进行处理的呢?

定位到自动配置类:LoadBalancerAutoConfiguration,在该类中也有一个成员变量注解了@LoadBalanced

1
2
3
@LoadBalanced
@Autowired(required = false)
private List<RestTemplate> restTemplates = Collections.emptyList();

看@LoadBalanced注解的声明类:

1
2
3
4
5
6
7
8
@Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Qualifier
public @interface LoadBalanced {

}

注解类中没有定义属性,且类上有一个@Qualifier注解,因此该注解实质上就是一个标志,并不需要注解处理类。在spring中,注解@Qualifier用于在自动注入时指定一个bean(在同一个类存在不止一个bean的情况下)。@Qualifier还有一种用法就是,在一个列表上注解@Qualifier,那么凡是注解了@Qualifier的此类bean都会被注入到该列表中。

所以,@LoadBalanced实质是是一个@Qualifier,在负载均衡场景中换了个名字而已。所以LoadBalancerAutoConfiguration的成员restTemplates将自动注入所有注解了@LoadBalancedRestTemplate类型的bean。

2. 自动装配

ribbon实现负载均衡是基于一种拦截器的设计,ribbon对需要实现负载均衡的请求进行拦截,并做负载均衡处理

在上面的例子中,RestTemplate.getForObject(...)发起rest请求,所以这里是本文源码分析的入口,具体的执行流程在下文分析,此处要分析的是ribbon的自动装配流程,在rest请求过程中,会调用到RibbonLoadBalancerClient.execute()方法,那么我们要找到RibbonLoadBalancerClient在哪里实例化的

Ribbon的自动配置类中,对其进行了实例化:

LoadeBalancerIntercepter

RibbonLoadBalanceClient

3. 执行流程

我们直接进入到RestTemplate这个类的doExecute方法,因为前面部分的代码都比较简单没有太多逻 辑。

这段代码中有一个很重要的逻辑,就是createRequest,这个是构建客户端请求的一个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//RestTemplate
protected <T> T doExecute(URI url, @Nullable HttpMethod method,
@Nullable RequestCallback requestCallback,
@Nullable ResponseExtractor<T> responseExtractor)
throws RestClientException {
...
ClientHttpResponse response = null;
try {
ClientHttpRequest request = createRequest(url, method); //>>
if (requestCallback != null) {
requestCallback.doWithRequest(request);
}
response = request.execute(); //>>
handleResponse(url, method, response);
return (responseExtractor != null ? responseExtractor.extractData(response) : null);
}
...
}
createRequest()

这个方法是用来创建一个请求对象,其中getRequestFactory(),调用的是InterceptingHttpAccessor 中的getRequestFactory方法,因为InterceptingHttpAccessor继承了HttpAccessor这个类,重写了 getRequestFactory方法,而RestTemplateInterceptingHttpAccessor的子类。

1
2
3
4
5
6
7
//HttpAccessor
protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
ClientHttpRequest request = getRequestFactory().createRequest(url, method);
initialize(request);
...
return request;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//InterceptingHttpAccessor
public ClientHttpRequestFactory getRequestFactory() {
// 获取该客户端请求所有的拦截器,虽然是列表,此处只有LoadBalancerInterceptor
List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
if (!CollectionUtils.isEmpty(interceptors)) {
ClientHttpRequestFactory factory = this.interceptingRequestFactory;
if (factory == null) {
// 构建一个拦截的http请求工厂,将拦截器传入
factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
this.interceptingRequestFactory = factory;
}
return factory;
}
else {
return super.getRequestFactory();
}
}

讲一下getInterceptors()方法

这个方法中返回的拦截器列表,是从InterceptingHttpAccessor.setInterceptors()方法来设置的,而这 个setInterceptors()调用的地方正好是在 LoadBalancerAutoConfiguration

1
2
3
4
5
6
7
8
9
10
11
12
// LoadBalancerAutoConfiguration.LoadBalancerInterceptorConfig
@Bean
@ConditionalOnMissingBean
public RestTemplateCustomizer restTemplateCustomizer(
final LoadBalancerInterceptor loadBalancerInterceptor) {
return restTemplate -> {
List<ClientHttpRequestInterceptor> list = new ArrayList<>(
restTemplate.getInterceptors());
list.add(loadBalancerInterceptor);
restTemplate.setInterceptors(list);
};
}

此处调用了restTemplate.setInterceptors这个方法设置拦截器,其中RestTemplate又继承了 InterceptingHttpAccessor

所以InterceptingHttpAccessor可以直接获取到负载均衡拦截器。

再回到createRequest方法中,getRequestFactory()方法返回的是 InterceptingClientHttpRequestFactory,而createRequest()方法,最终返回的是 InterceptingClientHttpRequest这个类。

request.execute()

继续跳回到RestTemplate.doExecute()方法,最终会调用request.execute()。那么这个时候,request.execute调用谁呢?于是我们看一下InterceptingClientHttpRequest的类关系图,我们发现它有两个父类。这是一种模版方法的设计。

最终,我们进入到InterceptingClientHttpRequest.executeInternal方法 InterceptingRequestExecution.execute()

InterceptingRequestExecution.execute方法中,有两个处理逻辑 如果有配置多个客户端拦截器,则调用拦截器方法,对请求进行拦截

否则,按照正常的处理逻辑进行远程调用。 而在当前的场景中,自然是调用LoadBalancerInterceptor.intercept方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//InterceptingClientHttpRequest
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
// 获取拦截器执行拦截操作
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this); // 进入LoadBalancerInterceptor >>
}
else {
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
}
LoadBalancerInterceptor.intercept()

这里就是负载均衡行为开始发生的地方了

1
2
3
4
5
6
7
8
9
10
11
12
13
// LoadBalancerInterceptor
private LoadBalancerClient loadBalancer; // 该对象执行负载均衡算法
private LoadBalancerRequestFactory requestFactory; //

public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) throws IOException {
final URI originalUri = request.getURI(); // 此处获取到还是未经处理的url
String serviceName = originalUri.getHost(); // 服务名称
Assert.state(serviceName != null,
"Request URI does not contain a valid hostname: " + originalUri);
return this.loadBalancer.execute(serviceName,
this.requestFactory.createRequest(request, body, execution));
}

this.loadBalancer就是RibbonLoadBalanceClient实例,此实例在自动装配阶段被实例化。

先看createRequest(request, body, execution))方法:

  • 参数request是InterceptingClientHttpRequest实例,顾名思义就是正在被拦截的客户端http请求。

  • 参数body是请求体,get方法请求此参数为空数组

  • 参数execution是InterceptingClientHttpRequest的内部类InterceptingRequestExecution的实例,该类封装了拦截器列表以及执行所有拦截器操作的执行逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// LoadBalancerRequestFactory
public LoadBalancerRequest<ClientHttpResponse> createRequest(
final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) {
return instance -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
this.loadBalancer);
if (this.transformers != null) {
for (LoadBalancerRequestTransformer transformer : this.transformers) {
serviceRequest = transformer.transformRequest(serviceRequest,
instance);
}
}
return execution.execute(serviceRequest, body);
};
}

该方法返回了一个负载均衡请求对象,此处是匿名类实现。

注意返回的不是instance对象,这里用拉姆达表达式创建了一个匿名类的对象,instance是返回的对象中唯一方法的形参,在之后被调用的时候才会有值传递进来。

往下看;

创建了LoadBalance请求之后,接着执行this.loadBalancer.execute(..)方法,参数有服务名称serviceName和上面创建的LoadBalance请求对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//RibbonLoadBalancerClient
public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
throws IOException {
//获取负载均衡器
ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
Server server = getServer(loadBalancer, hint); // 使用负载均衡器获取服务器
。。。
// ribbonServer传递给上面形参instance的参数值
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));
// 执行
return execute(serviceId, ribbonServer, request);
}

getLoadBalancer(serviceId)可以深入分析,此处使用serviceId将不用的服务进行了上下文的隔离,设计方法值得借鉴。

getServer() 根据负载均衡算法选择一个服务器

1
2
3
4
5
6
7
//RibbonLoadBalancerClient
protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
if (loadBalancer == null) {
return null;
}
return loadBalancer.chooseServer(hint != null ? hint : "default");
}

loadBalancer是ZoneAwareLoadBalancer实例,进入方法,这里只有一个zone所以执行父类的逻辑

1
2
3
4
5
6
7
8
//`ZoneAwareLoadBalancer`
public Server chooseServer(Object key) {
if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
logger.debug("Zone aware logic disabled or there is only one zone");
return super.chooseServer(key);
}
...
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//BaseLoadBalancer
public Server chooseServer(Object key) {
if (counter == null) {
counter = createCounter();
}
counter.increment();
if (rule == null) {
return null;
} else {
try {
return rule.choose(key); // 执行到rule的选择方法
} catch (Exception e) {
logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
return null;
}
}
}

此处的rule默认是RoundRobinRule,即轮询策略。

此处是一个扩展点,rule可以在配置文件中进行配置,也可以配置自定义的rule

再回到execut方法,选定服务器之后,创建一个RibbonServer实例,

1
2
3
RibbonServer ribbonServer = new RibbonServer(serviceId, server,
isSecure(server, serviceId),
serverIntrospector(serviceId).getMetadata(server));

isSecure(server, serviceId)由于使用http协议,这里是false

serverIntrospector(serviceId).getMetadata(server)返回一个map,只有一对值:“manage.port->8001”

拿到创建好的RibbonServer,说明负载均衡操作已经完成了,应用已经确定了请求需要发送给哪一台服务器实例,接下来要做的事就是重新构造一个指向该服务器的http请求发送出去,往下看:

执行execute(serviceId, ribbonServer, request)方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public <T> T execute(String serviceId,  
ServiceInstance serviceInstance, // RibbonServer实例
LoadBalancerRequest<T> request) // 负载均衡请求,执行负载均衡
throws IOException {
Server server = null;
if (serviceInstance instanceof RibbonServer) {
server = ((RibbonServer) serviceInstance).getServer();
}
if (server == null) {
throw new IllegalStateException("No instances available for " + serviceId);
}

RibbonLoadBalancerContext context = this.clientFactory
.getLoadBalancerContext(serviceId); // 根据服务名称获取上下文
RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);
try { // 主要逻辑
T returnVal = request.apply(serviceInstance); // 此处执行apply方法
statsRecorder.recordStats(returnVal);
return returnVal;
}
...
return null;
}

执行负载均衡请求的apply方法,负载均衡请求是之前创建的,这里再贴一遍代码:

1
2
3
4
5
6
7
8
9
10
11
// LoadBalancerRequestFactory
public LoadBalancerRequest<ClientHttpResponse> createRequest(
final HttpRequest request, final byte[] body,
final ClientHttpRequestExecution execution) {
return instance -> {
HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance,
this.loadBalancer);
...
return execution.execute(serviceRequest, body);
};
}

apply方法中new 了一个ServiceRequestWrapper对象,该对象提供了重构请求url的方法,然后再进入InterceptingRequestExecution类的execute()方法,再贴一遍代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//InterceptingClientHttpRequest
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
if (this.iterator.hasNext()) {
ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
return nextInterceptor.intercept(request, body, this); // 进入LoadBalancerInterceptor >>
}
else { // 此次进入else分支
HttpMethod method = request.getMethod();
Assert.state(method != null, "No standard HTTP method");
ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
if (body.length > 0) {
if (delegate instanceof StreamingHttpOutputMessage) {
StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
}
else {
StreamUtils.copy(body, delegate.getBody());
}
}
return delegate.execute();
}
}
}

由于拦截器链表中没有未执行了拦截器了,所以这次进入else分支,注意看else第三行代码,创建delegate请求,新的请求url的就是在这里重构的,request.getURI(),request是上一步new出来的ServiceRequestWrapper

1
2
3
4
5
public URI getURI() {
// 这里重构了url
URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
return uri;
}

else分支执行完之后,就完成了请求。

4. 服务列表刷新

服务注册中心里注册的服务会发生变化,宕机或主动下线或添加服务器,这时候ribbon客户端需要更新本地的服务器列表,默认是30秒被动刷新一次。刷新过之后30秒请求服务不会调用刷新线程,两次调用之间间隔超过30秒则第二次请求之前会刷新服务器列表

5. ping

服务注册到注册中心之后,注册中心会通过发送心跳来check服务的存活状态,ribbon从服务中心获取服务器列表,默认是30s刷新一次,当某台服务器宕机,注册中心需要在一段时间之后才能发现其不在线,ribbon最多需要30秒才能刷新掉不在线的服务器,这个时间延迟有点大了

那么ribbon为了解决这个问题,加入了ping功能,即ribbon客户端自己发送心跳给服务器列表中的服务器,默认是10秒发送一次,一旦发现ping不通,直接剔除该服务器

这里的心跳机制可以借鉴。

此处又是另外的一个扩展点,可以自定义Ping的实现(使用指定的协议),配置到ribbon中