前言

  • 由前面章节我们知道Ribbon功能主要是将负载(工作任务)进行平衡、分摊到多个操作单元上进行运行,那么如果要实现负载均衡,具体是要做哪些事呢?

    • 1、第一步是要维护哪些服务实例可用,需要处理临时新增了服务或者某个服务不可用了情况
    • 2、有了服务实例之后就是根据请求以及某种负载均衡规则选择服务实例
    • 3、然后就是将执行请求,响应处理
    • 4、如果调用失败是不是要重试
  • 下面将从Ribbon的源码来按照上面的步骤来分析其具体实现, 源码版本:

1
2
3
4
5
6
7
<!-- https://mvnrepository.com/artifact/com.netflix.ribbon/ribbon-core -->
<dependency>
<groupId>com.netflix.ribbon</groupId>
<artifactId>ribbon-core</artifactId>
<version>2.2.4</version>
<scope>runtime</scope>
</dependency>

解析

1、第一步是要维护哪些服务实例可用,需要处理临时新增了服务或者某个服务不可用了情况

1、1 定义服务实例Server
  • 首先先要定义服务实例,这里这里Ribboncom.netflix.loadbalancer.Server来定义,从成员变量可以看到hostport的定义

  • Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* Class that represents a typical Server (or an addressable Node) i.e. a
* Host:port identifier
*
* @author stonse
*
*/
public class Server {

public static final String UNKNOWN_ZONE = "UNKNOWN";
// host
private String host;
// 端口
private int port = 80;
private String scheme;
// 服务id
private volatile String id;
// 是否存活
private volatile boolean isAliveFlag;
private String zone = UNKNOWN_ZONE;
private volatile boolean readyToServe = true;

// 服务器的其他元信息
private MetaInfo simpleMetaInfo = new MetaInfo() {
@Override
public String getAppName() {
return null;
}

@Override
public String getServerGroup() {
return null;
}

@Override
public String getServiceIdForDiscovery() {
return null;
}

@Override
public String getInstanceId() {
return id;
}
};
...
  • 因为我们前面使用的是Consul来作为注册中心,所以我们先看看使用Consul是怎么定义服务实例的,由下面可以看到是添加了一个HealthService及存储服务实例元数据信息的metaInfo,metadata
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
/**
* @author Spencer Gibb
*/
public class ConsulServer extends Server {

private final MetaInfo metaInfo;
private final HealthService service;
private final Map<String, String> metadata;

public ConsulServer(final HealthService healthService) {
super(findHost(healthService), healthService.getService().getPort());
this.service = healthService;
this.metadata = ConsulServerUtils.getMetadata(this.service);
metaInfo = new MetaInfo() {
@Override
public String getAppName() {
return service.getService().getService();
}

@Override
public String getServerGroup() {
return getMetadata().get("group");
}

@Override
public String getServiceIdForDiscovery() {
return null;
}

@Override
public String getInstanceId() {
return service.getService().getId();
}
};

setAlive(isPassingChecks());
}

@Override
public MetaInfo getMetaInfo() {
return metaInfo;
}

public HealthService getHealthService() {
return this.service;
}

public Map<String, String> getMetadata() {
return metadata;
}

public boolean isPassingChecks() {
for (Check check : this.service.getChecks()) {
if (check.getStatus() != Check.CheckStatus.PASSING) {
return false;
}
}
return true;
}
}
1、2 定义服务器列表ServerList
  • 因为这里是要处理负载均衡,所以涉及到多个Server, 这里RibbonServerList接口来定义服务器列表相关的方法,可以看到这两个接口有两个方法,都是返回List<T extends Server>

    • getInitialListOfServers定义初次获取服务实例列表方法
    • getUpdatedListOfServers定义获取更新后服务实例列表方法
      • 这里会有个LoadbalancerPing处理会每30秒更新服务
  • ServerList

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* Interface that defines the methods sed to obtain the List of Servers
* @author stonse
*
* @param <T>
*/
public interface ServerList<T extends Server> {

public List<T> getInitialListOfServers();

/**
* Return updated list of servers. This is called say every 30 secs
* (configurable) by the Loadbalancer's Ping cycle
*
*/
public List<T> getUpdatedListOfServers();

}
  • 再看下哪些类实现了这个接口
  • 子类解析

    • StaticServerList 定义静态的服务实例列表,用作服务实例固定的情况,具体有哪些服务实例由构造函数提供
    • AbstractServerList该类包含一个API方法,用于创建负载均衡器使用的过滤器
    • ConfigurationBasedServerList定义可以从配置中加载服务器列表
    • ConsulServerListConsul环境下的服务实例
  • 我们这里关注org.springframework.cloud.consul.discovery.ConsulServerList,可以看到成员变量有我们之前接触到的ConsulClient client,用于访问Consul Restful API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
/**
* @author Spencer Gibb
* @author Richard Kettelerij
*/
public class ConsulServerList extends AbstractServerList<ConsulServer> {
// 访问Consul Restful API的客户端
private final ConsulClient client;
// Consul的配置
private final ConsulDiscoveryProperties properties;
// 要实现负载均衡的服务id
private String serviceId;

public ConsulServerList(ConsulClient client, ConsulDiscoveryProperties properties) {
this.client = client;
this.properties = properties;
}

protected ConsulClient getClient() {
return client;
}

protected ConsulDiscoveryProperties getProperties() {
return properties;
}

protected String getServiceId() {
return serviceId;
}

@Override
public void initWithNiwsConfig(IClientConfig clientConfig) {
this.serviceId = clientConfig.getClientName();
}

@Override
public List<ConsulServer> getInitialListOfServers() {
return getServers();
}

@Override
public List<ConsulServer> getUpdatedListOfServers() {
return getServers();
}

...
  • 我们这里关注它是怎么实现getInitialListOfServers()getUpdatedListOfServers()方法的,可以看到都是直接调用了getServers()方法,里面的逻辑就是根据服务id调用Consul Restful API /v1/health/service/来获取服务实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
private List<ConsulServer> getServers() {
if (this.client == null) {
return Collections.emptyList();
}
String tag = getTag(); // null is ok
// 直接调用Consul Restful API
Response<List<HealthService>> response = this.client.getHealthServices(
this.serviceId, tag, this.properties.isQueryPassing(),
createQueryParamsForClientRequest(), this.properties.getAclToken());
if (response.getValue() == null || response.getValue().isEmpty()) {
return Collections.emptyList();
}
return transformResponse(response.getValue());
}

2、有了服务实例之后就是根据请求以及某种负载均衡规则选择服务实例

  • 上一块已经有了服务实例列表,那么现在就是要挑选具体是要调用哪个实例了,Ribbon是通过ILoadBalancer接口来抽象的
2、1 负责选取Server的接口ILoadBalancer
  • 该接口定义了一个客户端负债均衡器需要的一系列抽象操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public interface ILoadBalancer {

// 向负债均衡器维护的实例列表中添加服务实例
public void addServers(List<Server> newServers);

// 通过某种策略挑选一个具体的服务实例
public Server chooseServer(Object key);

// 通知和标记负债均衡器某个具体实例已经停止服务
public void markServerDown(Server server);

@Deprecated
public List<Server> getServerList(boolean availableOnly);

// 获取当前正常服务的实例列表
public List<Server> getReachableServers();

// 获取所有服务实例列表,包括正常与不正常的
public List<Server> getAllServers();

}
  • ILoadBalancer子类解析

    • 1、AbstractLoadBalancer抽象实现类

      • 该抽象类定义了一个关于服务实例的分组枚举类ServerGroup,包含

        1
        2
        3
        4
        5
        6
        7
        8
        public enum ServerGroup{
        // 所有服务实例
        ALL,
        // 正常服务的实例
        STATUS_UP,
        // 停止服务的实例
        STATUS_NOT_UP
        }
      • getServerList(ServerGroup serverGroup);添加了一个按分组来获取不同的服务实例的列表方法

      • LoadBalancerStats getLoadBalancerStats();获取服务实例当前的属性和统计信息
    • 2、BaseLoadBalancer 负债均衡器的基础实现类

      • 这个类维护了存储服务实例的列表对象,并通过IPing对象来检查服务的状态
      • 下面是其成员变量

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        34
        35
        36
        37
        38
        39
        40
        41
        42
        43
        44
        45
        46
        47
        48
        49
        50
        51
        52
        53
        54
        55
        56
        57
        58
        /**
        * A basic implementation of the load balancer where an arbitrary list of
        * servers can be set as the server pool. A ping can be set to determine the
        * liveness of a server. Internally, this class maintains an "all" server list
        * and an "up" server list and use them depending on what the caller asks for.
        *
        * @author stonse
        *
        */
        public class BaseLoadBalancer extends AbstractLoadBalancer implements
        PrimeConnections.PrimeConnectionListener, IClientConfigAware {

        private static Logger logger = LoggerFactory
        .getLogger(BaseLoadBalancer.class);
        private final static IRule DEFAULT_RULE = new RoundRobinRule();
        private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy();
        private static final String DEFAULT_NAME = "default";
        private static final String PREFIX = "LoadBalancer_";
        // 负债均衡的处理规则
        protected IRule rule = DEFAULT_RULE;
        // 检查服务实例操作的执行策略对象,默认使用SerialPingStrategy,该策略采用线性遍历`ping`服务实例的方式检查
        protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY;
        // 检查服务实例是否正常
        protected IPing ping = null;
        // 所有服务实例列表
        @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
        protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
        // 正常服务实例列表
        @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
        protected volatile List<Server> upServerList = Collections.synchronizedList(new ArrayList<Server>());

        protected ReadWriteLock allServerLock = new ReentrantReadWriteLock();
        protected ReadWriteLock upServerLock = new ReentrantReadWriteLock();

        protected String name = DEFAULT_NAME;

        protected Timer lbTimer = null;
        protected int pingIntervalSeconds = 10;
        protected int maxTotalPingTimeSeconds = 5;
        protected Comparator<Server> serverComparator = new ServerComparator();

        protected AtomicBoolean pingInProgress = new AtomicBoolean(false);
        //
        protected LoadBalancerStats lbStats;
        // 用于跟踪事件发生频率的监控
        private volatile Counter counter = Monitors.newCounter("LoadBalancer_ChooseServer");

        private PrimeConnections primeConnections;

        private volatile boolean enablePrimingConnections = false;

        private IClientConfig config;

        private List<ServerListChangeListener> changeListeners = new CopyOnWriteArrayList<ServerListChangeListener>();

        private List<ServerStatusChangeListener> serverStatusListeners = new CopyOnWriteArrayList<ServerStatusChangeListener>();

        ...
      • 从构造函数可以看到,主要做了如下事情

        • this.ping对象默认赋值为空,
        • 负债均衡的处理规则默认是线性轮循
        • 启动一个用于定时检查Server是否健康的任务,该任务默认的执行间隔是10秒
        • 1
          2
          3
          4
          5
          6
          7
          public BaseLoadBalancer() {
          this.name = DEFAULT_NAME;
          this.ping = null;
          setRule(DEFAULT_RULE);
          setupPingTask();
          lbStats = new LoadBalancerStats(DEFAULT_NAME);
          }
      • addServer()方法可向负债均衡器增加新的服务实例列表,查看注释可以看到方法不保证其唯一性也就是说可以通过添加服务的服务实例来提高此服务的命中几率

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        public void addServer(Server newServer) {
        if (newServer != null) {
        try {
        ArrayList<Server> newList = new ArrayList<Server>();

        newList.addAll(allServerList);
        newList.add(newServer);
        setServersList(newList);
        } catch (Exception e) {
        logger.error("LoadBalancer [{}]: Error adding newServer {}", name, newServer.getHost(), e);
        }
        }
        }
      • chooseServer(Object key)选择服务实例,可以看到实际上是使用IRule对象来选择实例,此类将在下面详细介绍,这里只要知道是调用这个对象的方来获取实例的

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        /*
        * Get the alive server dedicated to key
        *
        * @return the dedicated server
        */
        public Server chooseServer(Object key) {
        if (counter == null) {
        counter = createCounter();
        }
        // 用于跟踪事件发生频率的监控
        // private volatile Counter counter = Monitors.newCounter("LoadBalancer_ChooseServer");
        counter.increment();
        if (rule == null) {
        return null;
        } else {
        try {
        return rule.choose(key);
        } catch (Exception e) {
        logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e);
        return null;
        }
        }
        }
    • 3、DynamicServerListLoadBalancerBaseLoadBalancer的扩展

      • 该类实现了服务实例清单在运行期的动态更新能力,同时还添加了对服务实例清单的过滤功能
      • 下面是其成员变量

        1
        2
        3
        4
        5
        6
        7
        8
        9
        10
        11
        12
        13
        14
        15
        16
        17
        18
        19
        20
        21
        22
        23
        24
        25
        26
        27
        28
        29
        30
        31
        32
        33
        /**
        * A LoadBalancer that has the capabilities to obtain the candidate list of
        * servers using a dynamic source. i.e. The list of servers can potentially be
        * changed at Runtime. It also contains facilities wherein the list of servers
        * can be passed through a Filter criteria to filter out servers that do not
        * meet the desired criteria.
        *
        * @author stonse
        *
        */
        public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
        private static final Logger LOGGER = LoggerFactory.getLogger(DynamicServerListLoadBalancer.class);

        boolean isSecure = false;
        boolean useTunnel = false;

        // to keep track of modification of server lists
        protected AtomicBoolean serverListUpdateInProgress = new AtomicBoolean(false);
        // 服务实例清单
        volatile ServerList<T> serverListImpl;
        // 服务实例清单的过滤
        volatile ServerListFilter<T> filter;
        // 该对象实现对serverListImpl服务实例清单的更新
        protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
        @Override
        public void doUpdate() {
        updateListOfServers();
        }
        };

        protected volatile ServerListUpdater serverListUpdater;

        ...
      • com.netflix.loadbalancer.ServerListUpdater为服务更新器

        • 可以看到创建了ServerListUpdater接口的一个内部类UpdateAction并实现了其doUpdate()方法,里面又调用了updateListOfServers();方法,此定义是下面定时任务的执行方法

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          19
          20
          21
          22
          23
          24
          25
          26
          protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
          @Override
          public void doUpdate() {
          updateListOfServers();
          }
          };

          ...

          @VisibleForTesting
          public void updateListOfServers() {
          List<T> servers = new ArrayList<T>();
          if (serverListImpl != null) {
          // 调用ConsulServerList 的getUpdatedListOfServers()
          servers = serverListImpl.getUpdatedListOfServers();
          LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
          getIdentifier(), servers);

          if (filter != null) {
          servers = filter.getFilteredListOfServers(servers);
          LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
          getIdentifier(), servers);
          }
          }
          updateAllServerList(servers);
          }
        • ServerListUpdater该对象实现对serverListImpl服务实例清单的更新

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          13
          14
          15
          16
          17
          18
          public interface ServerListUpdater {
          // 内部类,doUpdate()实现对ServerList的具体更新操作
          public interface UpdateAction {
          void doUpdate();
          }
          // 启动服务更新器
          void start(UpdateAction updateAction);
          // 停止
          void stop();
          // 获取最近的更新时间戳
          String getLastUpdate();
          // 获取上一次更新到现在的时间间隔
          long getDurationSinceLastUpdateMs();
          // 获取错过的更新周期数
          int getNumberMissedCycles();
          // 获取核心线程数
          int getCoreThreads();
          }
        • com.netflix.loadbalancer.PollingServerListUpdater此类是动态服务列表更新的默认策略,也就是说DynamicServerListLoadBalancer负债均衡器中的默认实现就是它,它通过定时任务的方式对服务列表进行更新

          • 先从用于启动服务更新器start函数源码看起,可以看到先创建了Runnable线程,并在实现中调用了上面提到的具体更新服务实例列表的new ServerListUpdater.UpdateAction().doUpdate()方法,定时器默认是服务实例初始化延迟1秒后开始执行,30秒周期循环
            1
            2
            3
            4
            5
            6
            7
            8
            9
            10
            11
            12
            13
            14
            15
            16
            17
            18
            19
            20
            21
            22
            23
            24
            25
            26
            27
            28
            29
            30
            31
            32
            33
            @Override
            public synchronized void start(final UpdateAction updateAction) {
            if (isActive.compareAndSet(false, true)) {
            final Runnable wrapperRunnable = new Runnable() {
            @Override
            public void run() {
            if (!isActive.get()) {
            if (scheduledFuture != null) {
            scheduledFuture.cancel(true);
            }
            return;
            }
            try {
            updateAction.doUpdate();
            lastUpdated = System.currentTimeMillis();
            } catch (Exception e) {
            logger.warn("Failed one update cycle", e);
            }
            }
            };

            scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
            wrapperRunnable,
            // 默认服务实例初始化延迟1秒后开始执行
            initialDelayMs,
            // 30秒周期循环
            refreshIntervalMs,
            TimeUnit.MILLISECONDS
            );
            } else {
            logger.info("Already active, no-op");
            }
            }
      • 回到DynamicServerListLoadBalancer类寻找是怎么启动ServerListUpdater服务更新器的

        • 先看下构造函数

          1
          2
          3
          4
          5
          6
          7
          8
          9
          10
          11
          12
          public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
          ServerList<T> serverList, ServerListFilter<T> filter,
          ServerListUpdater serverListUpdater) {
          super(clientConfig, rule, ping);
          this.serverListImpl = serverList;
          this.filter = filter;
          this.serverListUpdater = serverListUpdater;
          if (filter instanceof AbstractServerListFilter) {
          ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
          }
          restOfInit(clientConfig);
          }
        • 我们关心restOfInit(clientConfig);方法用于初始化

          • 进入该方法

            1
            2
            3
            4
            5
            6
            7
            8
            9
            10
            11
            12
            13
            14
            15
            16
            void restOfInit(IClientConfig clientConfig) {
            boolean primeConnection = this.isEnablePrimingConnections();
            // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
            this.setEnablePrimingConnections(false);
            // 开启定时com.netflix.loadbalancer.ServerListUpdater#start()
            enableAndInitLearnNewServersFeature();

            // 更新服务实例
            updateListOfServers();
            if (primeConnection && this.getPrimeConnections() != null) {
            this.getPrimeConnections()
            .primeConnections(getReachableServers());
            }
            this.setEnablePrimingConnections(primeConnection);
            LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
            }
          • enableAndInitLearnNewServersFeature();方法调用了前面的com.netflix.loadbalancer.ServerListUpdater#start()方法用于开启定时

            1
            2
            3
            4
            public void enableAndInitLearnNewServersFeature() {
            LOGGER.info("Using serverListUpdater {}", serverListUpdater.getClass().getSimpleName());
            serverListUpdater.start(updateAction);
            }
          • updateListOfServers();方法,更新服务实例,这里serverListImpl实现类为ConsulServerList,前面知道ConsulServerList会调用Consul Restful API获取服务实例信息

            1
            2
            3
            4
            5
            6
            7
            8
            9
            10
            11
            12
            13
            14
            15
            16
            17
            @VisibleForTesting
            public void updateListOfServers() {
            List<T> servers = new ArrayList<T>();
            if (serverListImpl != null) {
            // 调用ConsulServerList 的getUpdatedListOfServers()
            servers = serverListImpl.getUpdatedListOfServers();
            LOGGER.debug("List of Servers for {} obtained from Discovery client: {}",
            getIdentifier(), servers);

            if (filter != null) {
            servers = filter.getFilteredListOfServers(servers);
            LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}",
            getIdentifier(), servers);
            }
            }
            updateAllServerList(servers);
            }
        • 再此更新服务实例的定时任务已经启动了

        • updateListOfServers();方法里面的filter.getFilteredListOfServers(servers);方法用于过滤服务清单
    • 4、ZoneAwareLoadBalancer负债均衡器是对DynamicServerListLoadBalancer的扩展
      • DynamicServerListLoadBalancer中,我们看到它并没有重写选择具体实例的chooseServer()函数,所以它依旧使用BaseLoadBalancer的线性轮询方式来选择调用的服务实例

总结

参考