回顾

  • 由前面章节我们知道Ribbon功能主要是将负载(工作任务)进行平衡、分摊到多个操作单元上进行运行,那么如果要实现负载均衡,具体是要做哪些事呢?

    • 1、第一步是要维护哪些服务实例可用,需要处理临时新增了服务或者某个服务不可用了情况
    • 2、有了服务实例之后就是根据请求以及某种负载均衡规则选择服务实例
    • 3、然后就是将执行请求,响应处理
    • 4、如果调用失败是不是要重试
  • 下面将从Ribbon的源码来按照上面的步骤来分析其具体实现, 源码版本:

1
2
3
4
5
6
7
<!-- https://mvnrepository.com/artifact/com.netflix.ribbon/ribbon-core -->
<dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon-core</artifactId>
<version>2.2.4</version>
<scope>runtime</scope>
</dependency>
  • 下面将继续接上一章节的内容

解析

1、第一步是要维护哪些服务实例可用,需要处理临时新增了服务或者某个服务不可用了情况

1、1 定义服务实例Server
1、2 定义服务器列表ServerList

2、有了服务实例之后就是根据请求以及某种负载均衡规则选择服务实例

2、1 负责选取Server的接口ILoadBalancer
2、2 负债均衡策略IRule

3、然后就是将执行请求,响应处理

3.1 执行请求客户端AbstractLoadBalancerAwareClient
3.2 执行命令 LoadBalancerCommand

4、如果调用失败是不是要重试

  • 这个构造的LoadBalancerCommand是一个RxJava风格的,从下面可以看到具体执行流程就是通过选取Server的接口ILoadBalancer来获取Server,然后就是执行请求了,它包含了重试和异常处理机制、同时还记录了执行状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123

public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();

if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}

// 获取在每个服务实例重试的的次数
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
// 最多尝试几个服务实例
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

// 对于每个服务实例的调用逻辑
// 默认field server是null,通过selectServer()方法获取一个Server
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// 对于每个Server,按顺序映射为对于每个Server包含重试逻辑的请求调用
public Observable<T> call(Server server) {
// 设置上下文
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);

// 每个Server包含重试逻辑的请求调用
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
context.incAttemptCount();
// 增加Server正在处理的请求计数
loadBalancerContext.noteOpenConnection(stats);

// 监听器
if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}

// 计时器
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
// operation.call(server)就是刚刚分析的AbstractLoadBalancerAwareClient传过来的ServerOperation,就是直接对这个Server调用请求
// doOnEach的操作就是记录请求前后的一些数据用于负载均衡数据统计
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
// 记录请求完成
recordStats(tracer, stats, entity, null);
}

@Override
public void onError(Throwable e) {
// 记录请求结束
recordStats(tracer, stats, null, e);
logger.debug("Got error {} when executed on server {}", e, server);
// 发生了错误,通知listener
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}

@Override
public void onNext(T entity) {
// 因为只有调用请求成功只有一个结果(只有一个请求), 这里的entity就是结果,只要收到结果就代表请求成功
this.entity = entity;
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}

private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});

if (maxRetrysSame > 0)
// 是否retry
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});

if (maxRetrysNext > 0 && server == null)
// 是否retry,如果retry回调用selectServer()返回下一个Server
o = o.retry(retryPolicy(maxRetrysNext, false));

// 异常处理
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
@Override
public Observable<T> call(Throwable e) {
if (context.getAttemptCount() > 0) {
// 如果超过重试次数,则抛异常
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"Number of retries on next server exceeded max " + maxRetrysNext
+ " retries, while making a call for: " + context.getServer(), e);
}
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"Number of retries exceeded max " + maxRetrysSame
+ " retries, while making a call for: " + context.getServer(), e);
}
}
if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
}
});
}
  • 下面来拆解
4.1 获取重试设置参数 RetryHandler
1
2
3
4
// 获取在每个服务实例重试的的次数
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
// 最多尝试几个服务实例
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
  • 这个参数可以在配置文件中配置
1
2
3
4
5
ribbon:
ReadTimeout: 60000
ConnectTimeout: 60000
MaxAutoRetries: 1
MaxAutoRetriesNextServer: 1
  • 上面配置的参数获取到的结果如下
4.1 通过选取Server的接口ILoadBalancer来获取Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 返回一个只包含一个Server的Observable,但是每次从负载均衡器中获取一个
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
4.2 重试机制看代码是采用了RxJava重试方法
  • 一开始默认进入到下面这行判断,这里 server == null
1
2
if (maxRetrysNext > 0 && server == null) 
o = o.retry(retryPolicy(maxRetrysNext, false));
  • 然后就是进入到下面这执行体方法,可以得到以下信息
    • 第一个可以看到主体逻辑就是执行operation.call(server)方法,这个就是刚刚分析的AbstractLoadBalancerAwareClient传过来的ServerOperation,就是直接对这个Server调用请求
    • 第二个就是使用ServerStats来记录服务执行状态,这个是负载均衡选择策略时计算需要
    • 第三个就是使用if (maxRetrysSame > 0) {o = o.retry(retryPolicy(maxRetrysSame, true)); } 来控制是否重试
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 设置上下文
context.setServer(server);
// 记录服务请求执行状态
final ServerStats stats = loadBalancerContext.getServerStats(server);

// 每个Server包含重试逻辑的请求调用
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
context.incAttemptCount();
// 增加Server正在处理的请求计数
loadBalancerContext.noteOpenConnection(stats);

// 监听器
if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}

// 计时器
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
// operation.call(server)就是刚刚分析的AbstractLoadBalancerAwareClient传过来的ServerOperation,就是直接对这个Server调用请求
// doOnEach的操作就是记录请求前后的一些数据用于负载均衡数据统计
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
// 记录请求完成
recordStats(tracer, stats, entity, null);
}

@Override
public void onError(Throwable e) {
// 记录请求结束
recordStats(tracer, stats, null, e);
logger.debug("Got error {} when executed on server {}", e, server);
// 发生了错误,通知listener
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}

@Override
public void onNext(T entity) {
// 因为只有调用请求成功只有一个结果(只有一个请求), 这里的entity就是结果,只要收到结果就代表请求成功
this.entity = entity;
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}

private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});

if (maxRetrysSame > 0)
// 是否retry
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
  • Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate) 方法
1
2
3
public final Observable<T> retry(Func2<Integer, Throwable, Boolean> predicate) {
return nest().lift(new OperatorRetryWithPredicate<T>(predicate));
}
  • 我们看一下设置重试的回调的详细回调代码retryPolicy(maxRetrysNext, false)方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
return new Func2<Integer, Throwable, Boolean>() {
// 只有返回为true的时候才会retry
@Override
public Boolean call(Integer tryCount, Throwable e) {
// 抛出的异常是AbortExecutionException则不重试
if (e instanceof AbortExecutionException) {
return false;
}

// 超过最大重试次数则不重试
if (tryCount > maxRetrys) {
return false;
}

if (e.getCause() != null && e instanceof RuntimeException) {
e = e.getCause();
}
// 判断是否是可以重试的exception
return retryHandler.isRetriableException(e, same);
}
};
}
  • 这个判断是否是可以重试的exception里面的逻辑是:
    • 如果已经配置了ribbon.okToRetryOnAllErrorstrue,则不论什么异常都会重试,我们没有这么配置,一般也不会这么配置
    • 其他情况,就是连接失败的判断。首先需要配置ribbon.okToRetryOnConnectErrorstrue,这个默认就是true;然后通过isConnectionException判断
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public boolean isRetriableException(Throwable e, boolean sameServer) {
if (okToRetryOnAllErrors) {
return true;
}
else if (e instanceof ClientException) {
ClientException ce = (ClientException) e;
if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
return !sameServer;
} else {
return false;
}
}
else {
return okToRetryOnConnectErrors && isConnectionException(e);
}
}
  • isConnectionException(e)这个方法其实就看这个异常以及Cause中是否有SocketException,如果有则返回true
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// SocketException异常
protected List<Class<? extends Throwable>> connectionRelated = Lists.<Class<? extends Throwable>>newArrayList(SocketException.class);

public boolean isConnectionException(Throwable e) {
return Utils.isPresentAsCause(e, connectionRelated);
}

public class Utils {
public static boolean isPresentAsCause(Throwable throwableToSearchIn, Collection<Class<? extends Throwable>> throwableToSearchFor) {
// 循环10次 ?
int infiniteLoopPreventionCounter = 10;
while (throwableToSearchIn != null && infiniteLoopPreventionCounter > 0) {
infiniteLoopPreventionCounter--;
for (Class<? extends Throwable> c: throwableToSearchFor) {
if (c.isAssignableFrom(throwableToSearchIn.getClass())) {
return true;
}
}
throwableToSearchIn = throwableToSearchIn.getCause();
}
return false;
}
}

总结

  • Ribbon的重试是在服务连接的时候报SocketException进行重试,具体逻辑是看这个异常以及Cause中是否有SocketException

参考