前言

  • 当我们为Spring Cloud Zuul构建的API网关服务引入Spring Cloud Consul之后,它会为Consul中的每个服务都自动创建一个默认路由规则,这些默认规则的path会使用ServiceId配置的服务名作为请求前缀
  • 比如上图我们在consul注册了cloudZuulcloudSsocloudUser三个服务,那么就可以通过服务名作为请求前缀将请求转发到不同服务上,这里我们cloudZuul服务地址为http://127.0.0.1:2000

    • 访问cloudUser服务接口,举例:http://127.0.0.1:2000/cloudUser/user/getById/1
    • 访问cloudSso服务接口,举例:http://127.0.0.1:2000/cloudSso/sso/createToken
  • 上面这个服务路由转发是怎么实现的呢,通过上面几章我们可以知道zuul内部工作是通过一系列的ZuulFilter来实现的,通过调试断点可以定位到这个RibbonRoutingFilter,这个ZuulFilterfilterTyperoute

解析

  • 查看RibbonRoutingFilter.java,查看注释可以看到是使用了RibbonHystrix
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
/**
* Route {@link ZuulFilter} that uses Ribbon, Hystrix and pluggable http clients to send requests.
* ServiceIds are found in the {@link RequestContext} attribute {@link org.springframework.cloud.netflix.zuul.filters.support.FilterConstants#SERVICE_ID_KEY}.
*
* @author Spencer Gibb
* @author Dave Syer
* @author Ryan Baxter
*/
public class RibbonRoutingFilter extends ZuulFilter {

private static final Log log = LogFactory.getLog(RibbonRoutingFilter.class);

protected ProxyRequestHelper helper;
protected RibbonCommandFactory<?> ribbonCommandFactory;
protected List<RibbonRequestCustomizer> requestCustomizers;
private boolean useServlet31 = true;

public RibbonRoutingFilter(ProxyRequestHelper helper,
RibbonCommandFactory<?> ribbonCommandFactory,
List<RibbonRequestCustomizer> requestCustomizers) {
this.helper = helper;
this.ribbonCommandFactory = ribbonCommandFactory;
this.requestCustomizers = requestCustomizers;
// To support Servlet API 3.1 we need to check if getContentLengthLong exists
try {
//TODO: remove in 2.0
HttpServletRequest.class.getMethod("getContentLengthLong");
} catch(NoSuchMethodException e) {
useServlet31 = false;
}
}

public RibbonRoutingFilter(RibbonCommandFactory<?> ribbonCommandFactory) {
this(new ProxyRequestHelper(), ribbonCommandFactory, null);
}

/* for testing */ boolean isUseServlet31() {
return useServlet31;
}

@Override
public String filterType() {
return ROUTE_TYPE;
}

@Override
public int filterOrder() {
return RIBBON_ROUTING_FILTER_ORDER;
}

@Override
public boolean shouldFilter() {
RequestContext ctx = RequestContext.getCurrentContext();
return (ctx.getRouteHost() == null && ctx.get(SERVICE_ID_KEY) != null
&& ctx.sendZuulResponse());
}

@Override
public Object run() {
RequestContext context = RequestContext.getCurrentContext();
this.helper.addIgnoredHeaders();
try {
RibbonCommandContext commandContext = buildCommandContext(context);
ClientHttpResponse response = forward(commandContext);
setResponse(response);
return response;
}
catch (ZuulException ex) {
throw new ZuulRuntimeException(ex);
}
catch (Exception ex) {
throw new ZuulRuntimeException(ex);
}
}

protected RibbonCommandContext buildCommandContext(RequestContext context) {
HttpServletRequest request = context.getRequest();

MultiValueMap<String, String> headers = this.helper
.buildZuulRequestHeaders(request);
MultiValueMap<String, String> params = this.helper
.buildZuulRequestQueryParams(request);
String verb = getVerb(request);
InputStream requestEntity = getRequestBody(request);
if (request.getContentLength() < 0 && !verb.equalsIgnoreCase("GET")) {
context.setChunkedRequestBody();
}

String serviceId = (String) context.get(SERVICE_ID_KEY);
Boolean retryable = (Boolean) context.get(RETRYABLE_KEY);
Object loadBalancerKey = context.get(LOAD_BALANCER_KEY);

String uri = this.helper.buildZuulRequestURI(request);

// remove double slashes
uri = uri.replace("//", "/");

long contentLength = useServlet31 ? request.getContentLengthLong(): request.getContentLength();

return new RibbonCommandContext(serviceId, verb, uri, retryable, headers, params,
requestEntity, this.requestCustomizers, contentLength, loadBalancerKey);
}

protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
Map<String, Object> info = this.helper.debug(context.getMethod(),
context.getUri(), context.getHeaders(), context.getParams(),
context.getRequestEntity());

RibbonCommand command = this.ribbonCommandFactory.create(context);
try {
ClientHttpResponse response = command.execute();
this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
return response;
}
catch (HystrixRuntimeException ex) {
return handleException(info, ex);
}

}

protected ClientHttpResponse handleException(Map<String, Object> info,
HystrixRuntimeException ex) throws ZuulException {
int statusCode = HttpStatus.INTERNAL_SERVER_ERROR.value();
Throwable cause = ex;
String message = ex.getFailureType().toString();

ClientException clientException = findClientException(ex);
if (clientException == null) {
clientException = findClientException(ex.getFallbackException());
}

if (clientException != null) {
if (clientException
.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) {
statusCode = HttpStatus.SERVICE_UNAVAILABLE.value();
}
cause = clientException;
message = clientException.getErrorType().toString();
}
info.put("status", String.valueOf(statusCode));
throw new ZuulException(cause, "Forwarding error", statusCode, message);
}

protected ClientException findClientException(Throwable t) {
if (t == null) {
return null;
}
if (t instanceof ClientException) {
return (ClientException) t;
}
return findClientException(t.getCause());
}

protected InputStream getRequestBody(HttpServletRequest request) {
InputStream requestEntity = null;
try {
requestEntity = (InputStream) RequestContext.getCurrentContext()
.get(REQUEST_ENTITY_KEY);
if (requestEntity == null) {
requestEntity = request.getInputStream();
}
}
catch (IOException ex) {
log.error("Error during getRequestBody", ex);
}
return requestEntity;
}

protected String getVerb(HttpServletRequest request) {
String method = request.getMethod();
if (method == null) {
return "GET";
}
return method;
}

protected void setResponse(ClientHttpResponse resp)
throws ClientException, IOException {
RequestContext.getCurrentContext().set("zuulResponse", resp);
this.helper.setResponse(resp.getRawStatusCode(),
resp.getBody() == null ? null : resp.getBody(), resp.getHeaders());
}

}
  • 我们关注run()方法,可以发现可以划分为三个步骤
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Override
public Object run() {
// 获取RequestContext
RequestContext context = RequestContext.getCurrentContext();
this.helper.addIgnoredHeaders();
try {
// 构建执行Ribbon命令的参数
RibbonCommandContext commandContext = buildCommandContext(context);
// 执行请求(核心)
ClientHttpResponse response = forward(commandContext);
// 设置返回体
setResponse(response);
return response;
}
catch (ZuulException ex) {
throw new ZuulRuntimeException(ex);
}
catch (Exception ex) {
throw new ZuulRuntimeException(ex);
}
}
1、构建执行Ribbon命令的参数
  • 下图是http://127.0.0.1:2000/cloudSso/sso/createToken链接构造的RibbonCommandContext,可以看到获取到了serviceId、请求链接及方式、请求头的一些信息
  • buildCommandContext()方法就是通过RequestContext来构造RibbonCommandContextRequestContext通过之前的ZuulFilter已经设置了一些属性,那么buildCommandContext()方法就可以很方便的获取一些参数信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
protected RibbonCommandContext buildCommandContext(RequestContext context) {
HttpServletRequest request = context.getRequest();

MultiValueMap<String, String> headers = this.helper
.buildZuulRequestHeaders(request);
MultiValueMap<String, String> params = this.helper
.buildZuulRequestQueryParams(request);
String verb = getVerb(request);
InputStream requestEntity = getRequestBody(request);
if (request.getContentLength() < 0 && !verb.equalsIgnoreCase("GET")) {
context.setChunkedRequestBody();
}
// 获取serviceId
String serviceId = (String) context.get(SERVICE_ID_KEY);
// 是否重试
Boolean retryable = (Boolean) context.get(RETRYABLE_KEY);
// 负载均衡key
Object loadBalancerKey = context.get(LOAD_BALANCER_KEY);

String uri = this.helper.buildZuulRequestURI(request);

// remove double slashes
uri = uri.replace("//", "/");

long contentLength = useServlet31 ? request.getContentLengthLong(): request.getContentLength();

return new RibbonCommandContext(serviceId, verb, uri, retryable, headers, params,
requestEntity, this.requestCustomizers, contentLength, loadBalancerKey);
}
2、执行请求(核心)
  • forward()方法是执行的主体逻辑,可以看到主要逻辑是构建RibbonCommand然后执行command.execute()方法,由RibbonCommand完成HTTP请求的发送并的得到响应结果ClientHttpResponse
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected ClientHttpResponse forward(RibbonCommandContext context) throws Exception {
Map<String, Object> info = this.helper.debug(context.getMethod(),
context.getUri(), context.getHeaders(), context.getParams(),
context.getRequestEntity());
RibbonCommand command = this.ribbonCommandFactory.create(context);
try {
ClientHttpResponse response = command.execute();
this.helper.appendDebug(info, response.getRawStatusCode(), response.getHeaders());
return response;
}
catch (HystrixRuntimeException ex) {
return handleException(info, ex);
}

}
  • 进入command.execute(); 方法,可以看到注释这个方法是同步执行方法,但里面又调用了queue().get()方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/**
* Used for synchronous execution of command.
*
* @return R
* Result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason.
* @throws HystrixRuntimeException
* if a failure occurs and a fallback cannot be retrieved
* @throws HystrixBadRequestException
* if invalid arguments or state were used representing a user failure, not a system failure
* @throws IllegalStateException
* if invoked more than once
*/
public R execute() {
try {
return queue().get();
} catch (Exception e) {
throw Exceptions.sneakyThrow(decomposeException(e));
}
}

/**
* Used for asynchronous execution of command.
* <p>
* This will queue up the command on the thread pool and return an {@link Future} to get the result once it completes.
* <p>
* NOTE: If configured to not run in a separate thread, this will have the same effect as {@link #execute()} and will block.
* <p>
* We don't throw an exception but just flip to synchronous execution so code doesn't need to change in order to switch a command from running on a separate thread to the calling thread.
*
* @return {@code Future<R>} Result of {@link #run()} execution or a fallback from {@link #getFallback()} if the command fails for any reason.
* @throws HystrixRuntimeException
* if a fallback does not exist
* <p>
* <ul>
* <li>via {@code Future.get()} in {@link ExecutionException#getCause()} if a failure occurs</li>
* <li>or immediately if the command can not be queued (such as short-circuited, thread-pool/semaphore rejected)</li>
* </ul>
* @throws HystrixBadRequestException
* via {@code Future.get()} in {@link ExecutionException#getCause()} if invalid arguments or state were used representing a user failure, not a system failure
* @throws IllegalStateException
* if invoked more than once
*/
public Future<R> queue() {
/*
* The Future returned by Observable.toBlocking().toFuture() does not implement the
* interruption of the execution thread when the "mayInterrupt" flag of Future.cancel(boolean) is set to true;
* thus, to comply with the contract of Future, we must wrap around it.
*/
final Future<R> delegate = toObservable().toBlocking().toFuture();

final Future<R> f = new Future<R>() {

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (delegate.isCancelled()) {
return false;
}

if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
/*
* The only valid transition here is false -> true. If there are two futures, say f1 and f2, created by this command
* (which is super-weird, but has never been prohibited), and calls to f1.cancel(true) and to f2.cancel(false) are
* issued by different threads, it's unclear about what value would be used by the time mayInterruptOnCancel is checked.
* The most consistent way to deal with this scenario is to say that if *any* cancellation is invoked with interruption,
* than that interruption request cannot be taken back.
*/
interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
}

final boolean res = delegate.cancel(interruptOnFutureCancel.get());

if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
final Thread t = executionThread.get();
if (t != null && !t.equals(Thread.currentThread())) {
t.interrupt();
}
}

return res;
}

@Override
public boolean isCancelled() {
return delegate.isCancelled();
}

@Override
public boolean isDone() {
return delegate.isDone();
}

@Override
public R get() throws InterruptedException, ExecutionException {
return delegate.get();
}

@Override
public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}

};

// 特殊处理立即抛出的错误状态
/* special handling of error states that throw immediately */
if (f.isDone()) {
try {
f.get();
return f;
} catch (Exception e) {
Throwable t = decomposeException(e);
if (t instanceof HystrixBadRequestException) {
return f;
} else if (t instanceof HystrixRuntimeException) {
HystrixRuntimeException hre = (HystrixRuntimeException) t;
switch (hre.getFailureType()) {
case COMMAND_EXCEPTION:
case TIMEOUT:
// we don't throw these types from queue() only from queue().get() as they are execution errors
return f;
default:
// these are errors we throw from queue() as they as rejection type errors
throw hre;
}
} else {
throw Exceptions.sneakyThrow(t);
}
}
}

return f;
}
  • 这个queue()方法

    • 查看注释可以知道这里采用了异步执行命令,这将在线程池上排队命令并返回{@link Future}以在结果完成后获得结果。
    • 可以看到final Future<R> delegate = toObservable().toBlocking().toFuture();,这里使用了Future来执行处理,这个在多线程中可以经常看见,使用这个类可以得到线程的执行结果、什么Observable 观察者模式、toBlocking 队列形式
    • toObservable()方法,这里使用了ReactiveX,这块感觉比较复杂,加个TODO
  • ReactiveX这里简单百度了下

    • RxJavaReactiveXJVM上的一个实现,ReactiveX使用Observable序列组合异步和基于事件的程序。
    • 相关链接:官网 中文文档
  • 经过上面异步和基于事件的处理,我们可以来到下面这个方法

  • 进入com.netflix.loadbalancer.reactive.LoadBalancerCommand#selectServer()方法,这个方法是根据负载均衡器返回一个Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Return an Observable that either emits only the single requested server
* or queries the load balancer for the next server on each subscription
*/
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
  • 进入com.netflix.loadbalancer.LoadBalancerContext#getServerFromLoadBalancer()方法,可以看到主要逻辑是从请求中的部分URI计算最终URI,下面有好几种情况
    • 如果主机丢失并且有负载均衡器,那么是从负载均衡器中选择的服务器获取主机/端口
    • 如果主机丢失且没有负载均衡器,请尝试从客户端设置的虚拟地址派生主机/端口
    • 如果主机存在并且URI的权限部分是为客户端设置的虚拟地址,并且存在负载均衡器,则从负载均衡器中选择的服务器获取主机/端口
    • 如果主机存在但上述情况均不适用,则将主机解释为实际物理地址
    • 如果主机丢失但以上都不适用,则抛出ClientException
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
/**
* Compute the final URI from a partial URI in the request. The following steps are performed:
* <ul>
* <li> if host is missing and there is a load balancer, get the host/port from server chosen from load balancer
* <li> if host is missing and there is no load balancer, try to derive host/port from virtual address set with the client
* <li> if host is present and the authority part of the URI is a virtual address set for the client,
* and there is a load balancer, get the host/port from server chosen from load balancer
* <li> if host is present but none of the above applies, interpret the host as the actual physical address
* <li> if host is missing but none of the above applies, throws ClientException
* </ul>
*
* @param original Original URI passed from caller
*/
public Server getServerFromLoadBalancer(@Nullable URI original, @Nullable Object loadBalancerKey) throws ClientException {
String host = null;
int port = -1;
if (original != null) {
host = original.getHost();
}
if (original != null) {
Pair<String, Integer> schemeAndPort = deriveSchemeAndPortFromPartialUri(original);
port = schemeAndPort.second();
}

// Various Supported Cases
// The loadbalancer to use and the instances it has is based on how it was registered
// In each of these cases, the client might come in using Full Url or Partial URL
ILoadBalancer lb = getLoadBalancer();
if (host == null) {
// Partial URI or no URI Case
// well we have to just get the right instances from lb - or we fall back
if (lb != null){
// 选择服务实例
Server svc = lb.chooseServer(loadBalancerKey);
if (svc == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Load balancer does not have available server for client: "
+ clientName);
}
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
}
logger.debug("{} using LB returned Server: {} for request {}", new Object[]{clientName, svc, original});
return svc;
} else {
// No Full URL - and we dont have a LoadBalancer registered to
// obtain a server
// if we have a vipAddress that came with the registration, we
// can use that else we
// bail out
if (vipAddresses != null && vipAddresses.contains(",")) {
throw new ClientException(
ClientException.ErrorType.GENERAL,
"Method is invoked for client " + clientName + " with partial URI of ("
+ original
+ ") with no load balancer configured."
+ " Also, there are multiple vipAddresses and hence no vip address can be chosen"
+ " to complete this partial uri");
} else if (vipAddresses != null) {
try {
Pair<String,Integer> hostAndPort = deriveHostAndPortFromVipAddress(vipAddresses);
host = hostAndPort.first();
port = hostAndPort.second();
} catch (URISyntaxException e) {
throw new ClientException(
ClientException.ErrorType.GENERAL,
"Method is invoked for client " + clientName + " with partial URI of ("
+ original
+ ") with no load balancer configured. "
+ " Also, the configured/registered vipAddress is unparseable (to determine host and port)");
}
} else {
throw new ClientException(
ClientException.ErrorType.GENERAL,
this.clientName
+ " has no LoadBalancer registered and passed in a partial URL request (with no host:port)."
+ " Also has no vipAddress registered");
}
}
} else {
// Full URL Case
// This could either be a vipAddress or a hostAndPort or a real DNS
// if vipAddress or hostAndPort, we just have to consult the loadbalancer
// but if it does not return a server, we should just proceed anyways
// and assume its a DNS
// For restClients registered using a vipAddress AND executing a request
// by passing in the full URL (including host and port), we should only
// consult lb IFF the URL passed is registered as vipAddress in Discovery
boolean shouldInterpretAsVip = false;

if (lb != null) {
shouldInterpretAsVip = isVipRecognized(original.getAuthority());
}
if (shouldInterpretAsVip) {
Server svc = lb.chooseServer(loadBalancerKey);
if (svc != null){
host = svc.getHost();
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,
"Invalid Server for :" + svc);
}
logger.debug("using LB returned Server: {} for request: {}", svc, original);
return svc;
} else {
// just fall back as real DNS
logger.debug("{}:{} assumed to be a valid VIP address or exists in the DNS", host, port);
}
} else {
// consult LB to obtain vipAddress backed instance given full URL
//Full URL execute request - where url!=vipAddress
logger.debug("Using full URL passed in by caller (not using load balancer): {}", original);
}
}
// end of creating final URL
if (host == null){
throw new ClientException(ClientException.ErrorType.GENERAL,"Request contains no HOST to talk to");
}
// just verify that at this point we have a full URL

return new Server(host, port);
}
  • 我们这里进入Server svc = lb.chooseServer(loadBalancerKey);方法

  • 进入com.netflix.loadbalancer.AbstractServerPredicate#chooseRoundRobinAfterFiltering()方法,我这里可以看到是有两个服务实例,那么就是需要从这两个中选择一个

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* Choose a server in a round robin fashion after the predicate filters a given list of servers and load balancer key.
*/
public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
// 从服务实例列表中筛选一下符合条件的服务
List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
if (eligible.size() == 0) {
return Optional.absent();
}
return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
}
/**
* Referenced from RoundRobinRule
* Inspired by the implementation of {@link AtomicInteger#incrementAndGet()}.
*
* @param modulo The modulo to bound the value of the counter.
* @return The next value.
*/
private int incrementAndGetModulo(int modulo) {
for (;;) {
int current = nextIndex.get();
int next = (current + 1) % modulo;
if (nextIndex.compareAndSet(current, next))
return current;
}
}
  • incrementAndGetModulo()这里默认是采用了简单轮询负载均衡(RoundRobin)策略,看注释这里是copyJDK里的{@link AtomicInteger#incrementAndGet()}

    • 我们可以测试下这个方法
    • 测试代码:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      public class IncrementAndGetModuloTest {

      private final AtomicInteger nextIndex = new AtomicInteger();

      public static void main(String[] args) {
      IncrementAndGetModuloTest incrementAndGetModuloTest = new IncrementAndGetModuloTest();
      List<String> eligible = new ArrayList<String>();
      eligible.add("服务1");
      eligible.add("服务2");
      eligible.add("服务3");
      eligible.add("服务4");
      eligible.add("服务5");
      for (int i = 0; i < 20; i++) {
      System.out.println(incrementAndGetModuloTest.getServer(eligible));
      }
      }

      private String getServer(List<String> eligible) {
      return eligible.get(incrementAndGetModulo(eligible.size()));
      }

      private int incrementAndGetModulo(int modulo) {
      for (; ; ) {
      int current = nextIndex.get();
      int next = (current + 1) % modulo;
      if (nextIndex.compareAndSet(current, next))
      return current;
      }
      }
      }
    • 测试结果:

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      服务1
      服务2
      服务3
      服务4
      服务5
      服务1
      服务2
      服务3
      服务4
      服务5
      服务1
      服务2
      服务3
      服务4
      服务5
      服务1
      服务2
      服务3
      服务4
      服务5
3、设置返回体
  • 执行完请求之后的得到结果, 可以看到是直接将ClientHttpResponse赋值到了RequestContext
1
2
3
4
5
6
protected void setResponse(ClientHttpResponse resp)
throws ClientException, IOException {
RequestContext.getCurrentContext().set("zuulResponse", resp);
this.helper.setResponse(resp.getRawStatusCode(),
resp.getBody() == null ? null : resp.getBody(), resp.getHeaders());
}

总结

  • RibbonRoutingFilter这个类会为Consul中的每个服务都自动创建一个默认路由规则,然后根据这个规则将请求转发到不同的服务实例上,这些默认规则的path会使用ServiceId配置的服务名作为请求前缀

  • RibbonRoutingFilter的类的主要功能

    • 1、先要挑选出具体调那个服务实例,如果是有多个服务具体选用的是哪种负载均衡策略,默认采用的是简单轮询负载均衡(RoundRobin)策略
    • 2、获取到实例之后就是就是拼接HTTP请求,涉及请求hostport,组装完成之后就是执行请求了
    • 3、执行请求之后就是把结果赋值到了RequestContext中,然后执行下一步ZuulFilter

参考