回顾

  • 由前面章节我们知道Ribbon功能主要是将负载(工作任务)进行平衡、分摊到多个操作单元上进行运行,那么如果要实现负载均衡,具体是要做哪些事呢?

    • 1、第一步是要维护哪些服务实例可用,需要处理临时新增了服务或者某个服务不可用了情况
    • 2、有了服务实例之后就是根据请求以及某种负载均衡规则选择服务实例
    • 3、然后就是将执行请求,响应处理
    • 4、如果调用失败是不是要重试
  • 下面将从Ribbon的源码来按照上面的步骤来分析其具体实现, 源码版本:

1
2
3
4
5
6
7
<!-- https://mvnrepository.com/artifact/com.netflix.ribbon/ribbon-core -->
<dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon-core</artifactId>
<version>2.2.4</version>
<scope>runtime</scope>
</dependency>
  • 下面将继续接上一章节的内容

解析

1、第一步是要维护哪些服务实例可用,需要处理临时新增了服务或者某个服务不可用了情况

1、1 定义服务实例Server
1、2 定义服务器列表ServerList

2、有了服务实例之后就是根据请求以及某种负载均衡规则选择服务实例

2、1 负责选取Server的接口ILoadBalancer

2、2 负债均衡策略IRule
  • Ribbon对于负债均衡策略的实现是通过IRule来定义的,比如前面我们接触了RoundRobinRule线性轮询策略,下面来介绍下IRule接口的各个实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/**
* Interface that defines a "Rule" for a LoadBalancer. A Rule can be thought of
* as a Strategy for loadbalacing. Well known loadbalancing strategies include
* Round Robin, Response Time based etc.
*
* @author stonse
*
*/
public interface IRule{
/*
* choose one alive server from lb.allServers or
* lb.upServers according to key
*
* @return choosen Server object. NULL is returned if none
* server is available
*/

public Server choose(Object key);

public void setLoadBalancer(ILoadBalancer lb);

public ILoadBalancer getLoadBalancer();
}
  • 子类实现

    • AbstractLoadBalancerRule

      • 负债均衡策略的抽象类,主要是定义了负债均衡器ILoadBalancer对象,该对象上一章节已经介绍了
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        /**
        * Class that provides a default implementation for setting and getting load balancer
        * @author stonse
        *
        */
        public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware {

        private ILoadBalancer lb;

        @Override
        public void setLoadBalancer(ILoadBalancer lb){
        this.lb = lb;
        }

        @Override
        public ILoadBalancer getLoadBalancer(){
        return lb;
        }
        }
    • RandomRule

      • 随机选择策略,使用rand.nextInt(serverCount);作为实例列表的索引值
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE")
        public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
        return null;
        }
        Server server = null;

        while (server == null) {
        if (Thread.interrupted()) {
        return null;
        }
        List<Server> upList = lb.getReachableServers();
        List<Server> allList = lb.getAllServers();

        int serverCount = allList.size();
        if (serverCount == 0) {
        /*
        * No servers. End regardless of pass, because subsequent passes
        * only get more restrictive.
        */
        return null;
        }

        int index = rand.nextInt(serverCount);
        server = upList.get(index);

        if (server == null) {
        /*
        * The only time this should happen is if the server list were
        * somehow trimmed. This is a transient condition. Retry after
        * yielding.
        */
        Thread.yield();
        continue;
        }

        if (server.isAlive()) {
        return (server);
        }

        // Shouldn't actually happen.. but must be transient or a bug.
        server = null;
        Thread.yield();
        }

        return server;

        }

        @Override
        public Server choose(Object key) {
        return choose(getLoadBalancer(), key);
        }
    • RoundRobinRule

      • 线性轮询策略,具体实现使用了AtomicInteger nextServerCyclicCounter;自增+1来取得下一个实例列表的索引值,并取模求余来归位
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
        log.warn("no load balancer");
        return null;
        }

        Server server = null;
        int count = 0;
        while (server == null && count++ < 10) {
        List<Server> reachableServers = lb.getReachableServers();
        List<Server> allServers = lb.getAllServers();
        int upCount = reachableServers.size();
        int serverCount = allServers.size();

        if ((upCount == 0) || (serverCount == 0)) {
        log.warn("No up servers available from load balancer: " + lb);
        return null;
        }

        int nextServerIndex = incrementAndGetModulo(serverCount);
        server = allServers.get(nextServerIndex);

        if (server == null) {
        /* Transient. */
        Thread.yield();
        continue;
        }

        if (server.isAlive() && (server.isReadyToServe())) {
        return (server);
        }

        // Next.
        server = null;
        }

        if (count >= 10) {
        log.warn("No available alive servers after 10 tries from load balancer: "
        + lb);
        }
        return server;
        }

        /**
        * Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
        *
        * @param modulo The modulo to bound the value of the counter.
        * @return The next value.
        */
        private int incrementAndGetModulo(int modulo) {
        for (;;) {
        int current = nextServerCyclicCounter.get();
        int next = (current + 1) % modulo;
        if (nextServerCyclicCounter.compareAndSet(current, next))
        return next;
        }
        }
    • RetryRule

      • 重试策略
        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        public Server choose(ILoadBalancer lb, Object key) {
        long requestTime = System.currentTimeMillis();
        long deadline = requestTime + maxRetryMillis;

        Server answer = null;

        answer = subRule.choose(key);

        if (((answer == null) || (!answer.isAlive()))
        && (System.currentTimeMillis() < deadline)) {

        InterruptTask task = new InterruptTask(deadline
        - System.currentTimeMillis());

        while (!Thread.interrupted()) {
        answer = subRule.choose(key);

        if (((answer == null) || (!answer.isAlive()))
        && (System.currentTimeMillis() < deadline)) {
        /* pause and retry hoping it's transient */
        Thread.yield();
        } else {
        break;
        }
        }

        task.cancel();
        }

        if ((answer == null) || (!answer.isAlive())) {
        return null;
        } else {
        return answer;
        }
        }
    • WeightedResponseTimeRule

      • RoundRobinRule的扩展,根据权重来挑选实例
    • BestAvailableRule
      • 找出最空闲的服务实例
    • ZoneAvoidanceRule
      • 复合判断server所在区域的性能和server的可用性选择server

3、然后就是将执行请求,响应处理

  • 经过上面的逻辑处理,我们已经得到了对应的服务实例Server,所以我们现在就是要执行请求了,这个处理在Ribbon是通过IClient来定义的,可以看里面就一个execute()执行方法
    • 接口上面两个泛型对象一个对应请求对象ClientRequest,另一个对应响应对象IResponse
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* A client that can execute a single request.
*
* @author awang
*
*/
public interface IClient<S extends ClientRequest, T extends IResponse> {

/**
* Execute the request and return the response. It is expected that there is no retry and all exceptions
* are thrown directly.
*/
public T execute(S request, IClientConfig requestConfig) throws Exception;
}
  • 类继承关系图
3.1 执行请求客户端AbstractLoadBalancerAwareClient
  • 这个类是IClient抽象实现类,主要是执行请求,还有重试及异常处理,代码使用了RxJava风格,所以看起来比较陌生,下面可以看到构造了一个LoadBalancerCommand并传入了一个ServerOperation对象
    • ServerOperation实现了其call方法,这里是个接口,哇,第一次见,call()方法是具体执行请求的方法
    • LoadBalancerCommandsubmit()方法是个入口统筹方法,里面不仅会调用ServerOperationcall方法,还做了重试及异常处理,这种设计方式可以学习学习
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
// 获取重试处理器,这个由其他实现类动态实现
RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
// 构造LoadBalancerCommand,RxJava风格
LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder()
.withLoadBalancerContext(this)
.withRetryHandler(handler)
.withLoadBalancerURI(request.getUri())
.build();

try {

return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
// 修改原始url为实际的url
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
// 执行请求
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}

}
  • 我们看这行代码Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));,进入org.springframework.cloud.netflix.ribbon.apache.RibbonLoadBalancingHttpClient#execute()方法,可以看到实际上是CloseableHttpClient来发送我们的请求
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public RibbonApacheHttpResponse execute(RibbonApacheHttpRequest request, IClientConfig configOverride) throws Exception {
Builder builder = RequestConfig.custom();
IClientConfig config = configOverride != null ? configOverride : this.config;
// 连接超时时间
builder.setConnectTimeout(((Integer)config.get(CommonClientConfigKey.ConnectTimeout, this.connectTimeout)).intValue());
// 读取超时时间
builder.setSocketTimeout(((Integer)config.get(CommonClientConfigKey.ReadTimeout, this.readTimeout)).intValue());
builder.setRedirectsEnabled(((Boolean)config.get(CommonClientConfigKey.FollowRedirects, this.followRedirects)).booleanValue());
RequestConfig requestConfig = builder.build();
request = this.getSecureRequest(request, configOverride);
HttpUriRequest httpUriRequest = request.toRequest(requestConfig);
HttpResponse httpResponse = ((CloseableHttpClient)this.delegate).execute(httpUriRequest);
return new RibbonApacheHttpResponse(httpResponse, httpUriRequest.getURI());
}
  • 调试代码可以看到实际已经拼装好了具体请求的参数,包括请求链接
3.2 执行命令 LoadBalancerCommand
  • 这个构造的LoadBalancerCommand是一个RxJava风格的,从下面可以看到具体执行流程就是通过选取Server的接口ILoadBalancer来获取Server,然后就是执行请求了,它包含了重试和异常处理机制、同时还记录了执行状态
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138

// 返回一个只包含一个Server的Observable,但是每次从负载均衡器中获取一个
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
public Observable<T> submit(final ServerOperation<T> operation) {
final ExecutionInfoContext context = new ExecutionInfoContext();

if (listenerInvoker != null) {
try {
listenerInvoker.onExecutionStart();
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}

// 获取在每个服务实例重试的的次数
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
// 最多尝试几个服务实例
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

// 对 于每个服务实例的调用逻辑
// 默认field server是null,通过selectServer()方法获取一个Server
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// 对于每个Server,按顺序映射为对于每个Server包含重试逻辑的请求调用
public Observable<T> call(Server server) {
// 设置上下文
context.setServer(server);
final ServerStats stats = loadBalancerContext.getServerStats(server);

// 每个Server包含重试逻辑的请求调用
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
context.incAttemptCount();
// 增加Server正在处理的请求计数
loadBalancerContext.noteOpenConnection(stats);

// 监听器
if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}

// 计时器
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
// operation.call(server)就是刚刚分析的AbstractLoadBalancerAwareClient传过来的ServerOperation,就是直接对这个Server调用请求
// doOnEach的操作就是记录请求前后的一些数据用于负载均衡数据统计
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
// 记录请求完成
recordStats(tracer, stats, entity, null);
}

@Override
public void onError(Throwable e) {
// 记录请求结束
recordStats(tracer, stats, null, e);
logger.debug("Got error {} when executed on server {}", e, server);
// 发生了错误,通知listener
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}

@Override
public void onNext(T entity) {
// 因为只有调用请求成功只有一个结果(只有一个请求), 这里的entity就是结果,只要收到结果就代表请求成功
this.entity = entity;
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}

private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});

if (maxRetrysSame > 0)
// 是否retry
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});

if (maxRetrysNext > 0 && server == null)
// 是否retry,如果retry回调用selectServer()返回下一个Server
o = o.retry(retryPolicy(maxRetrysNext, false));

// 异常处理
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
@Override
public Observable<T> call(Throwable e) {
if (context.getAttemptCount() > 0) {
// 如果超过重试次数,则抛异常
if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
"Number of retries on next server exceeded max " + maxRetrysNext
+ " retries, while making a call for: " + context.getServer(), e);
}
else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
"Number of retries exceeded max " + maxRetrysSame
+ " retries, while making a call for: " + context.getServer(), e);
}
}
if (listenerInvoker != null) {
listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
}
return Observable.error(e);
}
});
}

4、如果调用失败是不是要重试

  • 下章研究

总结

  • 由这几章我们了解到Ribbon主要由如下几个组件组成:
    • 服务实例列表维护机制实现的接口ServerList
    • 负责选取Server的接口ILoadBalancer
    • 服务实例列表更新机制实现的接口ServerListUpdater
    • 服务实例列表过滤机制ServerListFilter
    • 负载均衡选取规则实现的接口IRule
    • 所有Ribbon负载均衡器需要实现的接口IClient

参考