javascript
Spring Cloud源码分析(二)Ribbon
斷斷續續看Ribbon的源碼差不多也有7-8天了,總算告一段落。本文記錄了這些天對源碼的閱讀過程與一些分析理解,如有不對還請指出。
友情提示:本文較長,請選擇一個較為舒適的姿勢來閱讀
在之前介紹使用Ribbon進行服務消費的時候,我們用到了RestTemplate,但是熟悉Spring的同學們是否產生過這樣的疑問:RestTemplate不是Spring自己就有的嗎?跟Ribbon的客戶端負載均衡又有什么關系呢?下面在本文,我們來看RestTemplate和Ribbon是如何聯系起來并實現客戶端負載均衡的。
首先,回顧一下之前的消費者示例:我們是如何實現客戶端負載均衡的?仔細觀察一下代碼之前的代碼,我們可以發現在消費者的例子中,可能就是這個注解@LoadBalanced是之前沒有接觸過的,并且從命名上來看也與負載均衡相關。我們不妨以此為線索來看看源碼實現的機制。
從@LoadBalanced注解源碼的注釋中,我們可以知道該注解用來給RestTemplate標記,以使用負載均衡的客戶端(LoadBalancerClient)來配置它。
通過搜索LoadBalancerClient,我們可以發現這是Spring Cloud中定義的一個接口:
| public interface LoadBalancerClient { ServiceInstance choose(String serviceId); <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException; URI reconstructURI(ServiceInstance instance, URI original); } |
從該接口中,我們可以通過定義的抽象方法來了解到客戶端負載均衡器中應具備的幾種能力:
- ServiceInstance choose(String serviceId):根據傳入的服務名serviceId,從負載均衡器中挑選一個對應服務的實例。
- T execute(String serviceId, LoadBalancerRequest request) throws IOException:使用從負載均衡器中挑選出的服務實例來執行請求內容。
- URI reconstructURI(ServiceInstance instance, URI original):為系統構建一個合適的“host:port”形式的URI。在分布式系統中,我們使用邏輯上的服務名稱作為host來構建URI(替代服務實例的“host:port”形式)進行請求,比如:http://myservice/path/to/service。在該操作的定義中,前者ServiceInstance對象是帶有host和port的具體服務實例,而后者URI對象則是使用邏輯服務名定義為host的URI,而返回的URI內容則是通過ServiceInstance的服務實例詳情拼接出的具體“host:post”形式的請求地址。
順著LoadBalancerClient接口的所屬包org.springframework.cloud.client.loadbalancer,我們對其內容進行整理,可以得出如下圖的關系:
從類的命名上我們初步判斷LoadBalancerAutoConfiguration為實現客戶端負載均衡器的自動化配置類。通過查看源碼,我們可以驗證這一點假設:
(RestTemplate.class) (LoadBalancerClient.class) public class LoadBalancerAutoConfiguration { (required = false) private List<RestTemplate> restTemplates = Collections.emptyList(); public SmartInitializingSingleton loadBalancedRestTemplateInitializer( final List<RestTemplateCustomizer> customizers) { return new SmartInitializingSingleton() { public void afterSingletonsInstantiated() { for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) { for (RestTemplateCustomizer customizer : customizers) { customizer.customize(restTemplate); } } } }; } public RestTemplateCustomizer restTemplateCustomizer( final LoadBalancerInterceptor loadBalancerInterceptor) { return new RestTemplateCustomizer() { public void customize(RestTemplate restTemplate) { List<ClientHttpRequestInterceptor> list = new ArrayList<>( restTemplate.getInterceptors()); list.add(loadBalancerInterceptor); restTemplate.setInterceptors(list); } }; } public LoadBalancerInterceptor ribbonInterceptor( LoadBalancerClient loadBalancerClient) { return new LoadBalancerInterceptor(loadBalancerClient); } } |
從LoadBalancerAutoConfiguration類頭上的注解可以知道Ribbon實現的負載均衡自動化配置需要滿足下面兩個條件:
- @ConditionalOnClass(RestTemplate.class):RestTemplate類必須存在于當前工程的環境中。
- @ConditionalOnBean(LoadBalancerClient.class):在Spring的Bean工程中有必須有LoadBalancerClient的實現Bean。
在該自動化配置類中,主要做了下面三件事:
- 創建了一個LoadBalancerInterceptor的Bean,用于實現對客戶端發起請求時進行攔截,以實現客戶端負載均衡。
- 創建了一個RestTemplateCustomizer的Bean,用于給RestTemplate增加LoadBalancerInterceptor攔截器。
- 維護了一個被@LoadBalanced注解修飾的RestTemplate對象列表,并在這里進行初始化,通過調用RestTemplateCustomizer的實例來給需要客戶端負載均衡的RestTemplate增加LoadBalancerInterceptor攔截器。
接下來,我們看看LoadBalancerInterceptor攔截器是如何將一個普通的RestTemplate變成客戶端負載均衡的:
| public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor { private LoadBalancerClient loadBalancer; public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) { this.loadBalancer = loadBalancer; } public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); return this.loadBalancer.execute(serviceName, new LoadBalancerRequest<ClientHttpResponse>() { public ClientHttpResponse apply(final ServiceInstance instance) throws Exception { HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance); return execution.execute(serviceRequest, body); } }); } private class ServiceRequestWrapper extends HttpRequestWrapper { private final ServiceInstance instance; public ServiceRequestWrapper(HttpRequest request, ServiceInstance instance) { super(request); this.instance = instance; } public URI getURI() { URI uri = LoadBalancerInterceptor.this.loadBalancer.reconstructURI( this.instance, getRequest().getURI()); return uri; } } } |
通過源碼以及之前的自動化配置類,我們可以看到在攔截器中注入了LoadBalancerClient的實現。當一個被@LoadBalanced注解修飾的RestTemplate對象向外發起HTTP請求時,會被LoadBalancerInterceptor類的intercept函數所攔截。由于我們在使用RestTemplate時候采用了服務名作為host,所以直接從HttpRequest的URI對象中通過getHost()就可以拿到服務名,然后調用execute函數去根據服務名來選擇實例并發起實際的請求。
分析到這里,LoadBalancerClient還只是一個抽象的負載均衡器接口,所以我們還需要找到它的具體實現類來進一步分析。通過查看ribbon的源碼,我們可以很容易的在org.springframework.cloud.netflix.ribbon包下找到對應的實現類:RibbonLoadBalancerClient。
| public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException { ILoadBalancer loadBalancer = getLoadBalancer(serviceId); Server server = getServer(loadBalancer); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server); try { T returnVal = request.apply(ribbonServer); statsRecorder.recordStats(returnVal); return returnVal; } catch (IOException ex) { statsRecorder.recordStats(ex); throw ex; } catch (Exception ex) { statsRecorder.recordStats(ex); ReflectionUtils.rethrowRuntimeException(ex); } return null; } |
可以看到,在execute函數的實現中,第一步做的就是通過getServer根據傳入的服務名serviceId去獲得具體的服務實例:
| protected Server getServer(ILoadBalancer loadBalancer) { if (loadBalancer == null) { return null; } return loadBalancer.chooseServer("default"); } |
通過getServer函數的實現源碼,我們可以看到這里獲取具體服務實例的時候并沒有使用LoadBalancerClient接口中的choose函數,而是使用了ribbon自身的ILoadBalancer接口中定義的chooseServer函數。
我們先來認識一下ILoadBalancer接口:
| public interface ILoadBalancer { public void addServers(List<Server> newServers); public Server chooseServer(Object key); public void markServerDown(Server server); public List<Server> getReachableServers(); public List<Server> getAllServers(); } |
可以看到,在該接口中定義了一個軟負載均衡器需要的一系列抽象操作(未例舉過期函數):
- addServers:向負載均衡器中維護的實例列表增加服務實例。
- chooseServer:通過某種策略,從負載均衡器中挑選出一個具體的服務實例。
- markServerDown:用來通知和標識負載均衡器中某個具體實例已經停止服務,不然負載均衡器在下一次獲取服務實例清單前都會認為服務實例均是正常服務的。
- getReachableServers:獲取當前正常服務的實例列表。
- getAllServers:獲取所有已知的服務實例列表,包括正常服務和停止服務的實例。
在該接口定義中涉及到的Server對象定義的是一個傳統的服務端節點,在該類中存儲了服務端節點的一些元數據信息,包括:host、port以及一些部署信息等。
而對于該接口的實現,我們可以整理出如上圖所示的結構。我們可以看到BaseLoadBalancer類實現了基礎的負載均衡,而DynamicServerListLoadBalancer和ZoneAwareLoadBalancer在負載均衡的策略上做了一些功能的擴展。
那么在整合Ribbon的時候Spring Cloud默認采用了哪個具體實現呢?我們通過RibbonClientConfiguration配置類,可以知道在整合時默認采用了ZoneAwareLoadBalancer來實現負載均衡器。
public ILoadBalancer ribbonLoadBalancer(IClientConfig config, ServerList<Server> serverList, ServerListFilter<Server> serverListFilter, IRule rule, IPing ping) { ZoneAwareLoadBalancer<Server> balancer = LoadBalancerBuilder.newBuilder() .withClientConfig(config).withRule(rule).withPing(ping) .withServerListFilter(serverListFilter).withDynamicServerList(serverList) .buildDynamicServerListLoadBalancer(); return balancer; } |
下面,我們再回到RibbonLoadBalancerClient的execute函數邏輯,在通過ZoneAwareLoadBalancer的chooseServer函數獲取了負載均衡策略分配到的服務實例對象Server之后,將其內容包裝成RibbonServer對象(該對象除了存儲了服務實例的信息之外,還增加了服務名serviceId、是否需要使用HTTPS等其他信息),然后使用該對象再回調LoadBalancerInterceptor請求攔截器中LoadBalancerRequest的apply(final ServiceInstance instance)函數,向一個實際的具體服務實例發起請求,從而實現一開始以服務名為host的URI請求,到實際訪問host:post形式的具體地址的轉換。
apply(final ServiceInstance instance)函數中傳入的ServiceInstance接口是對服務實例的抽象定義。在該接口中暴露了服務治理系統中每個服務實例需要提供的一些基本信息,比如:serviceId、host、port等,具體定義如下:
| public interface ServiceInstance { String getServiceId(); String getHost(); int getPort(); boolean isSecure(); URI getUri(); Map<String, String> getMetadata(); } |
而上面提到的具體包裝Server服務實例的RibbonServer對象就是ServiceInstance接口的實現,可以看到它除了包含了Server對象之外,還存儲了服務名、是否使用https標識以及一個Map類型的元數據集合。
| protected static class RibbonServer implements ServiceInstance { private final String serviceId; private final Server server; private final boolean secure; private Map<String, String> metadata; protected RibbonServer(String serviceId, Server server) { this(serviceId, server, false, Collections.<String, String> emptyMap()); } protected RibbonServer(String serviceId, Server server, boolean secure, Map<String, String> metadata) { this.serviceId = serviceId; this.server = server; this.secure = secure; this.metadata = metadata; } // 省略實現ServiceInstance的一些獲取Server信息的get函數 ... } |
那么apply(final ServiceInstance instance)函數,在接收到了具體ServiceInstance實例后,是如何通過LoadBalancerClient接口中的reconstructURI操作來組織具體請求地址的呢?
public ClientHttpResponse apply(final ServiceInstance instance) throws Exception { HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance); return execution.execute(serviceRequest, body); } |
從apply的實現中,我們可以看到它具體執行的時候,還傳入了ServiceRequestWrapper對象,該對象繼承了HttpRequestWrapper并重寫了getURI函數,重寫后的getURI會通過調用LoadBalancerClient接口的reconstructURI函數來重新構建一個URI來進行訪問。
| private class ServiceRequestWrapper extends HttpRequestWrapper { private final ServiceInstance instance; ... public URI getURI() { URI uri = LoadBalancerInterceptor.this.loadBalancer.reconstructURI( this.instance, getRequest().getURI()); return uri; } } |
在LoadBalancerInterceptor攔截器中,ClientHttpRequestExecution的實例具體執行execution.execute(serviceRequest, body)時,會調用InterceptingClientHttpRequest下InterceptingRequestExecution類的execute函數,具體實現如下:
| public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException { if (this.iterator.hasNext()) { ClientHttpRequestInterceptor nextInterceptor = this.iterator.next(); return nextInterceptor.intercept(request, body, this); } else { ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), request.getMethod()); delegate.getHeaders().putAll(request.getHeaders()); if (body.length > 0) { StreamUtils.copy(body, delegate.getBody()); } return delegate.execute(); } } |
可以看到在創建請求的時候requestFactory.createRequest(request.getURI(), request.getMethod());,這里request.getURI()會調用之前介紹的ServiceRequestWrapper對象中重寫的getURI函數。此時,它就會使用RibbonLoadBalancerClient中實現的reconstructURI來組織具體請求的服務實例地址。
| public URI reconstructURI(ServiceInstance instance, URI original) { Assert.notNull(instance, "instance can not be null"); String serviceId = instance.getServiceId(); RibbonLoadBalancerContext context = this.clientFactory .getLoadBalancerContext(serviceId); Server server = new Server(instance.getHost(), instance.getPort()); boolean secure = isSecure(server, serviceId); URI uri = original; if (secure) { uri = UriComponentsBuilder.fromUri(uri).scheme("https").build().toUri(); } return context.reconstructURIWithServer(server, uri); } |
從reconstructURI函數中,我們可以看到,它通過ServiceInstance實例對象的serviceId,從SpringClientFactory類的clientFactory對象中獲取對應serviceId的負載均衡器的上下文RibbonLoadBalancerContext對象。然后根據ServiceInstance中的信息來構建具體服務實例信息的Server對象,并使用RibbonLoadBalancerContext對象的reconstructURIWithServer函數來構建服務實例的URI。
為了幫助理解,簡單介紹一下上面提到的SpringClientFactory和RibbonLoadBalancerContext:
- SpringClientFactory類是一個用來創建客戶端負載均衡器的工廠類,該工廠會為每一個不同名的ribbon客戶端生成不同的Spring上下文。
- RibbonLoadBalancerContext類是LoadBalancerContext的子類,該類用于存儲一些被負載均衡器使用的上下文內容和Api操作(reconstructURIWithServer就是其中之一)。
從reconstructURIWithServer的實現中我們可以看到,它同reconstructURI的定義類似。只是reconstructURI的第一個保存具體服務實例的參數使用了Spring Cloud定義的ServiceInstance,而reconstructURIWithServer中使用了Netflix中定義的Server,所以在RibbonLoadBalancerClient實現reconstructURI時候,做了一次轉換,使用ServiceInstance的host和port信息來構建了一個Server對象來給reconstructURIWithServer使用。從reconstructURIWithServer的實現邏輯中,我們可以看到,它從Server對象中獲取host和port信息,然后根據以服務名為host的URI對象original中獲取其他請求信息,將兩者內容進行拼接整合,形成最終要訪問的服務實例的具體地址。
| public class LoadBalancerContext implements IClientConfigAware { ... public URI reconstructURIWithServer(Server server, URI original) { String host = server.getHost(); int port = server .getPort(); if (host.equals(original.getHost()) && port == original.getPort()) { return original; } String scheme = original.getScheme(); if (scheme == null) { scheme = deriveSchemeAndPortFromPartialUri(original).first(); } try { StringBuilder sb = new StringBuilder(); sb.append(scheme).append("://"); if (!Strings.isNullOrEmpty(original.getRawUserInfo())) { sb.append(original.getRawUserInfo()).append("@"); } sb.append(host); if (port >= 0) { sb.append(":").append(port); } sb.append(original.getRawPath()); if (!Strings.isNullOrEmpty(original.getRawQuery())) { sb.append("?").append(original.getRawQuery()); } if (!Strings.isNullOrEmpty(original.getRawFragment())) { sb.append("#").append(original.getRawFragment()); } URI newURI = new URI(sb.toString()); return newURI; } catch (URISyntaxException e) { throw new RuntimeException(e); } } ... } |
另外,從RibbonLoadBalancerClient的execute的函數邏輯中,我們還能看到在回調攔截器中,執行具體的請求之后,ribbon還通過RibbonStatsRecorder對象對服務的請求還進行了跟蹤記錄,這里不再展開說明,有興趣的讀者可以繼續研究。
分析到這里,我們已經可以大致理清Spring Cloud中使用Ribbon實現客戶端負載均衡的基本脈絡。了解了它是如何通過LoadBalancerInterceptor攔截器對RestTemplate的請求進行攔截,并利用Spring Cloud的負載均衡器LoadBalancerClient將以邏輯服務名為host的URI轉換成具體的服務實例的過程。同時通過分析LoadBalancerClient的Ribbon實現RibbonLoadBalancerClient,可以知道在使用Ribbon實現負載均衡器的時候,實際使用的還是Ribbon中定義的ILoadBalancer接口的實現,自動化配置會采用ZoneAwareLoadBalancer的實例來進行客戶端負載均衡實現。
負載均衡器
通過之前的分析,我們已經對Spring Cloud如何使用Ribbon有了基本的了解。雖然Spring Cloud中定義了LoadBalancerClient為負載均衡器的接口,并且針對Ribbon實現了RibbonLoadBalancerClient,但是它在具體實現客戶端負載均衡時,則是通過Ribbon的ILoadBalancer接口實現。在上一節分析時候,我們對該接口的實現結構已經做了一些簡單的介紹,下面我們根據ILoadBalancer接口的實現類逐個看看它都是如何實現客戶端負載均衡的。
AbstractLoadBalancer
AbstractLoadBalancer是ILoadBalancer接口的抽象實現。在該抽象類中定義了一個關于服務實例的分組枚舉類ServerGroup,它包含了三種不同類型:ALL-所有服務實例、STATUS_UP-正常服務的實例、STATUS_NOT_UP-停止服務的實例;實現了一個chooseServer()函數,該函數通過調用接口中的chooseServer(Object key)實現,其中參數key為null,表示在選擇具體服務實例時忽略key的條件判斷;最后還定義了兩個抽象函數,getServerList(ServerGroup serverGroup)定義了根據分組類型來獲取不同的服務實例列表,getLoadBalancerStats()定義了獲取LoadBalancerStats對象的方法,LoadBalancerStats對象被用來存儲負載均衡器中各個服務實例當前的屬性和統計信息,這些信息非常有用,我們可以利用這些信息來觀察負載均衡器的運行情況,同時這些信息也是用來制定負載均衡策略的重要依據。
| public abstract class AbstractLoadBalancer implements ILoadBalancer { public enum ServerGroup{ ALL, STATUS_UP, STATUS_NOT_UP } public Server chooseServer() { return chooseServer(null); } public abstract List<Server> getServerList(ServerGroup serverGroup); public abstract LoadBalancerStats getLoadBalancerStats(); } |
BaseLoadBalancer
BaseLoadBalancer類是Ribbon負載均衡器的基礎實現類,在該類中定義很多關于均衡負載器相關的基礎內容:
定義并維護了兩個存儲服務實例Server對象的列表。一個用于存儲所有服務實例的清單,一個用于存儲正常服務的實例清單。
(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> allServerList = Collections
.synchronizedList(new ArrayList<Server>());
(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)
protected volatile List<Server> upServerList = Collections
.synchronizedList(new ArrayList<Server>());定義了之前我們提到的用來存儲負載均衡器各服務實例屬性和統計信息的LoadBalancerStats對象。
- 定義了檢查服務實例是否正常服務的IPing對象,在BaseLoadBalancer中默認為null,需要在構造時注入它的具體實現。
- 定義了檢查服務實例操作的執行策略對象IPingStrategy,在BaseLoadBalancer中默認使用了該類中定義的靜態內部類SerialPingStrategy實現。根據源碼,我們可以看到該策略采用線性遍歷ping服務實例的方式實現檢查。該策略在當我們實現的IPing速度不理想,或是Server列表過大時,可能變的不是很為理想,這時候我們需要通過實現IPingStrategy接口并實現pingServers(IPing ping, Server[] servers)函數去擴展ping的執行策略。
| private static class SerialPingStrategy implements IPingStrategy { public boolean[] pingServers(IPing ping, Server[] servers) { int numCandidates = servers.length; boolean[] results = new boolean[numCandidates]; if (logger.isDebugEnabled()) { logger.debug("LoadBalancer: PingTask executing [" + numCandidates + "] servers configured"); } for (int i = 0; i < numCandidates; i++) { results[i] = false; try { if (ping != null) { results[i] = ping.isAlive(servers[i]); } } catch (Throwable t) { logger.error("Exception while pinging Server:" + servers[i], t); } } return results; } } |
- 定義了負載均衡的處理規則IRule對象,從BaseLoadBalancer中chooseServer(Object key)的實現源碼,我們可以知道負載均衡器實際進行服務實例選擇任務是委托給了IRule實例中的choose函數來實現。而在這里,默認初始化了RoundRobinRule為IRule的實現對象。RoundRobinRule實現了最基本且常用的線性負載均衡規則。
| public Server chooseServer(Object key) { if (counter == null) { counter = createCounter(); } counter.increment(); if (rule == null) { return null; } else { try { return rule.choose(key); } catch (Throwable t) { return null; } } } |
- 啟動ping任務:在BaseLoadBalancer的默認構造函數中,會直接啟動一個用于定時檢查Server是否健康的任務。該任務默認的執行間隔為:10秒。
實現了ILoadBalancer接口定義的負載均衡器應具備的一系列基本操作:
addServers(List newServers):向負載均衡器中增加新的服務實例列表,該實現將原本已經維護著的所有服務實例清單allServerList和新傳入的服務實例清單newServers都加入到newList中,然后通過調用setServersList函數對newList進行處理,在BaseLoadBalancer中實現的時候會使用新的列表覆蓋舊的列表。而之后介紹的幾個擴展實現類對于服務實例清單更新的優化都是對setServersList函數的重寫來實現的。
public void addServers(List<Server> newServers) {
if (newServers != null && newServers.size() > 0) {
try {
ArrayList<Server> newList = new ArrayList<Server>();
newList.addAll(allServerList);
newList.addAll(newServers);
setServersList(newList);
} catch (Exception e) {
logger.error("Exception while adding Servers", e);
}
}
}chooseServer(Object key):挑選一個具體的服務實例,在上面介紹IRule的時候,已經做了說明,這里不再贅述。
markServerDown(Server server):標記某個服務實例暫停服務。
public void markServerDown(Server server) {
if (server == null) {
return;
}
if (!server.isAlive()) {
return;
}
logger.error("LoadBalancer: markServerDown called on ["
+ server.getId() + "]");
server.setAlive(false);
notifyServerStatusChangeListener(singleton(server));
}getReachableServers():獲取可用的服務實例列表。由于BaseLoadBalancer中單獨維護了一個正常服務的實例清單,所以直接返回即可。
public List<Server> getReachableServers() {
return Collections.unmodifiableList(upServerList);
}getAllServers():獲取所有的服務實例列表。由于BaseLoadBalancer中單獨維護了一個所有服務的實例清單,所以也直接返回它即可。
public List<Server> getAllServers() {
return Collections.unmodifiableList(allServerList);
}
DynamicServerListLoadBalancer
DynamicServerListLoadBalancer類繼承于BaseLoadBalancer類,它是對基礎負載均衡器的擴展。在該負載均衡器中,實現了服務實例清單的在運行期的動態更新能力;同時,它還具備了對服務實例清單的過濾功能,也就是說我們可以通過過濾器來選擇性的獲取一批服務實例清單。下面我們具體來看看在該類中增加了一些什么內容:
ServerList
從DynamicServerListLoadBalancer的成員定義中,我們馬上可以發現新增了一個關于服務列表的操作對象:ServerList<T> serverListImpl。其中泛型T從類名中對于T的限定DynamicServerListLoadBalancer<T extends Server>可以獲知它是一個Server的子類,即代表了一個具體的服務實例的擴展類。而ServerList接口定義如下所示:
| public interface ServerList<T extends Server> { public List<T> getInitialListOfServers(); public List<T> getUpdatedListOfServers(); } |
它定義了兩個抽象方法:getInitialListOfServers用于獲取初始化的服務實例清單,而getUpdatedListOfServers用于獲取更新的服務實例清單。那么該接口的實現有哪些呢?通過搜索源碼,我們可以整出如下圖的結構:
從圖中我們可以看到有很多個ServerList的實現類,那么在DynamicServerListLoadBalancer中的ServerList默認配置到底使用了哪個具體實現呢?既然在該負載均衡器中需要實現服務實例的動態更新,那么勢必需要ribbon具備訪問eureka來獲取服務實例的能力,所以我們從Spring Cloud整合ribbon與eureka的包org.springframework.cloud.netflix.ribbon.eureka下探索,可以找到配置類EurekaRibbonClientConfiguration,在該類中可以找到看到下面創建ServerList實例的內容:
public ServerList<?> ribbonServerList(IClientConfig config) { DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList( config); DomainExtractingServerList serverList = new DomainExtractingServerList( discoveryServerList, config, this.approximateZoneFromHostname); return serverList; } |
可以看到,這里創建的是一個DomainExtractingServerList實例,從下面它的源碼中我們可以看到在它內部還定義了一個ServerList list。同時,DomainExtractingServerList類中對getInitialListOfServers和getUpdatedListOfServers的具體實現,其實委托給了內部定義的ServerList list對象,而該對象是通過創建DomainExtractingServerList時候,由構造函數傳入的DiscoveryEnabledNIWSServerList實現的。
| public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> { private ServerList<DiscoveryEnabledServer> list; private IClientConfig clientConfig; private boolean approximateZoneFromHostname; public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list, IClientConfig clientConfig, boolean approximateZoneFromHostname) { this.list = list; this.clientConfig = clientConfig; this.approximateZoneFromHostname = approximateZoneFromHostname; } public List<DiscoveryEnabledServer> getInitialListOfServers() { List<DiscoveryEnabledServer> servers = setZones(this.list .getInitialListOfServers()); return servers; } public List<DiscoveryEnabledServer> getUpdatedListOfServers() { List<DiscoveryEnabledServer> servers = setZones(this.list .getUpdatedListOfServers()); return servers; } ... } |
那么DiscoveryEnabledNIWSServerList是如何實現這兩個服務實例的獲取的呢?我們從源碼中可以看到這兩個方法都是通過該類中的一個私有函數obtainServersViaDiscovery來通過服務發現機制來實現服務實例的獲取。
public List<DiscoveryEnabledServer> getInitialListOfServers(){ return obtainServersViaDiscovery(); } public List<DiscoveryEnabledServer> getUpdatedListOfServers(){ return obtainServersViaDiscovery(); } |
而obtainServersViaDiscovery的實現邏輯如下,主要依靠EurekaClient從服務注冊中心中獲取到具體的服務實例InstanceInfo列表(EurekaClient的具體實現,我們在分析Eureka的源碼時已經做了詳細的介紹,這里傳入的vipAddress可以理解為邏輯上的服務名,比如“USER-SERVICE”)。接著,對這些服務實例進行遍歷,將狀態為“UP”(正常服務)的實例轉換成DiscoveryEnabledServer對象,最后將這些實例組織成列表返回。
| private List<DiscoveryEnabledServer> obtainServersViaDiscovery() { List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>(); if (eurekaClientProvider == null || eurekaClientProvider.get() == null) { logger.warn("EurekaClient has not been initialized yet, returning an empty list"); return new ArrayList<DiscoveryEnabledServer>(); } EurekaClient eurekaClient = eurekaClientProvider.get(); if (vipAddresses!=null){ for (String vipAddress : vipAddresses.split(",")) { List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress( vipAddress, isSecure, targetRegion); for (InstanceInfo ii : listOfInstanceInfo) { if (ii.getStatus().equals(InstanceStatus.UP)) { // 省略了一些實例信息的加工邏輯 DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr); des.setZone(DiscoveryClient.getZone(ii)); serverList.add(des); } } if (serverList.size()>0 && prioritizeVipAddressBasedServers){ break; } } } return serverList; } |
在DiscoveryEnabledNIWSServerList中通過EurekaClient從服務注冊中心獲取到最新的服務實例清單后,返回的List到了DomainExtractingServerList類中,將繼續通過setZones函數進行處理,而這里的處理具體內容如下,主要完成將DiscoveryEnabledNIWSServerList返回的List列表中的元素,轉換成內部定義的DiscoveryEnabledServer的子類對象DomainExtractingServer,在該對象的構造函數中將為服務實例對象設置一些必要的屬性,比如id、zone、isAliveFlag、readyToServe等信息。
| private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) { List<DiscoveryEnabledServer> result = new ArrayList<>(); boolean isSecure = this.clientConfig.getPropertyAsBoolean( CommonClientConfigKey.IsSecure, Boolean.TRUE); boolean shouldUseIpAddr = this.clientConfig.getPropertyAsBoolean( CommonClientConfigKey.UseIPAddrForServer, Boolean.FALSE); for (DiscoveryEnabledServer server : servers) { result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr, this.approximateZoneFromHostname)); } return result; } |
ServerListUpdater
通過上面的分析我們已經知道了Ribbon與Eureka整合后,如何實現從Eureka Server中獲取服務實例清單。那么它又是如何觸發向Eureka Server去獲取服務實例清單以及如何在獲取到服務實例清單后更新本地的服務實例清單的呢?繼續來看DynamicServerListLoadBalancer中的實現內容,我們可以很容易的找到下面定義的關于ServerListUpdater的內容:
| protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { public void doUpdate() { updateListOfServers(); } }; protected volatile ServerListUpdater serverListUpdater; |
根據該接口的命名,我們基本就能猜到,這個對象實現的是對ServerList的更新,所以可以稱它為“服務更新器”,從下面的源碼中可以看到,在ServerListUpdater內部還定義了一個UpdateAction接口,上面定義的updateAction對象就是以匿名內部類的方式創建了一個它的具體實現,其中doUpdate實現的內容就是對ServerList的具體更新操作。除此之外,“服務更新器”中還定義了一系列控制它和獲取它一些信息的操作。
| public interface ServerListUpdater { public interface UpdateAction { void doUpdate(); } // 啟動服務更新器,傳入的UpdateAction對象為更新操作的具體實現。 void start(UpdateAction updateAction); // 停止服務更新器 void stop(); // 獲取最近的更新時間戳 String getLastUpdate(); // 獲取上一次更新到現在的時間間隔,單位為毫秒 long getDurationSinceLastUpdateMs(); // 獲取錯過的更新周期數 int getNumberMissedCycles(); // 獲取核心線程數 int getCoreThreads(); } |
而ServerListUpdater的實現類不多,具體下圖所示。
根據兩個類的注釋,我們可以很容易的知道它們的作用分別是:
- PollingServerListUpdater:動態服務列表更新的默認策略,也就是說DynamicServerListLoadBalancer負載均衡器中的默認實現就是它,它通過定時任務的方式進行服務列表的更新。
- EurekaNotificationServerListUpdater:該更新器也可服務于DynamicServerListLoadBalancer負載均衡器,但是它的觸發機制與PollingServerListUpdater不同,它需要利用Eureka的事件監聽器來驅動服務列表的更新操作。
下面我們來詳細看看它默認實現的PollingServerListUpdater。先從用于啟動“服務更新器”的start函數源碼看起,具體如下。我們可以看到start函數的實現內容驗證了之前提到的:以定時任務的方式進行服務列表的更新。它先創建了一個Runnable的線程實現,在該實現中調用了上面提到的具體更新服務實例列表的方法updateAction.doUpdate(),最后再為這個Runnable的線程實現啟動了一個定時任務來執行。
public synchronized void start(final UpdateAction updateAction) { if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { public void run() { if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } } |
繼續看PollingServerListUpdater的其他內容,我們可以找到用于啟動定時任務的2個重要參數initialDelayMs和refreshIntervalMs的默認定義分別為1000和30*1000,單位為毫秒。也就是說更新服務實例在初始化之后延遲1秒后開始執行,并以30秒為周期重復執行。除了這些內容之外,我們還能看到它還會記錄最后更新時間、是否存活等信息,同時也實現了ServerListUpdater中定義的一些其他操作內容,這些操作相對比較簡單,這里不再具體說明,有興趣的讀者可以自己查看源碼了解其實現原理。
ServerListFilter
在了解了更新服務實例的定時任務是如何啟動的之后,我們繼續回到updateAction.doUpdate()調用的具體實現位置,在DynamicServerListLoadBalancer中,它的實際實現委托給了updateListOfServers函數,具體實現如下:
| public void updateListOfServers() { List<T> servers = new ArrayList<T>(); if (serverListImpl != null) { servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } updateAllServerList(servers); } |
可以看到,這里終于用到了我們之前提到的ServerList的getUpdatedListOfServers,通過之前的介紹我們已經可以知道這一步實現了從Eureka Server中獲取服務可用實例的列表。在獲得了服務實例列表之后,這里又將引入一個新的對象filter,追朔該對象的定義,我們可以找到它是ServerListFilter定義的。
ServerListFilter接口非常簡單,該接口中之定義了一個方法List getFilteredListOfServers(List servers),主要用于實現對服務實例列表的過濾,通過傳入的服務實例清單,根據一些規則返回過濾后的服務實例清單。該接口的實現如下圖所示:
其中,除了ZonePreferenceServerListFilter的實現是Spring Cloud Netflix中對Ribbon的擴展實現外,其他均是Netflix Ribbon中的實現類。我們可以分別看看這些過濾器實現都有什么特點:
- AbstractServerListFilter:這是一個抽象過濾器,在這里定義了過濾時需要的一個重要依據對象LoadBalancerStats,我們在之前介紹過的,該對象存儲了關于負載均衡器的一些屬性和統計信息等。
| public abstract class AbstractServerListFilter<T extends Server> implements ServerListFilter<T> { private volatile LoadBalancerStats stats; public void setLoadBalancerStats(LoadBalancerStats stats) { this.stats = stats; } public LoadBalancerStats getLoadBalancerStats() { return stats; } } |
- ZoneAffinityServerListFilter:該過濾器基于“區域感知(Zone Affinity)”的方式實現服務實例的過濾,也就是說它會根據提供服務的實例所處區域(Zone)與消費者自身的所處區域(Zone)進行比較,過濾掉那些不是同處一個區域的實例。
| public List<T> getFilteredListOfServers(List<T> servers) { if (zone != null && (zoneAffinity || zoneExclusive) && servers !=null && servers.size() > 0){ List<T> filteredServers = Lists.newArrayList(Iterables.filter( servers, this.zoneAffinityPredicate.getServerOnlyPredicate())); if (shouldEnableZoneAffinity(filteredServers)) { return filteredServers; } else if (zoneAffinity) { overrideCounter.increment(); } } return servers; } |
從上面的源碼中我們可以看到,對于服務實例列表的過濾是通過Iterables.filter(servers, this.zoneAffinityPredicate.getServerOnlyPredicate())來實現的,其中判斷依據由ZoneAffinityPredicate實現服務實例與消費者的Zone比較。而在過濾之后,這里并不會馬上返回過濾的結果,而是通過shouldEnableZoneAffinity函數來判斷是否要啟用“區域感知”的功能,從下面shouldEnableZoneAffinity的實現中,我們可以看到,它使用了LoadBalancerStats的getZoneSnapshot方法來獲取這些過濾后的同區域實例的基礎指標(包含了:實例數量、斷路器斷開數、活動請求數、實例平均負載等),根據一系列的算法求出下面的幾個評價值并與設置的閾值對比(下面的為默認值),若有一個條件符合,就不啟用“區域感知”過濾的服務實例清單。這一算法實現對于集群出現區域故障時,依然可以依靠其他區域的實例進行正常服務提供了完善的高可用保障。同時,通過這里的介紹,我們也可以關聯著來理解之前介紹Eureka的時候提到的對于區域分配設計來保證跨區域故障的高可用問題。
- blackOutServerPercentage:故障實例百分比(斷路器斷開數 / 實例數量) >= 0.8
- activeReqeustsPerServer:實例平均負載 >= 0.6
- availableServers:可用實例數(實例數量 - 斷路器斷開數) < 2
| private boolean shouldEnableZoneAffinity(List<T> filtered) { if (!zoneAffinity && !zoneExclusive) { return false; } if (zoneExclusive) { return true; } LoadBalancerStats stats = getLoadBalancerStats(); if (stats == null) { return zoneAffinity; } else { logger.debug("Determining if zone affinity should be enabled with given server list: {}", filtered); ZoneSnapshot snapshot = stats.getZoneSnapshot(filtered); double loadPerServer = snapshot.getLoadPerServer(); int instanceCount = snapshot.getInstanceCount(); int circuitBreakerTrippedCount = snapshot.getCircuitTrippedCount(); if (((double) circuitBreakerTrippedCount) / instanceCount >= blackOutServerPercentageThreshold.get() || loadPerServer >= activeReqeustsPerServerThreshold.get() || (instanceCount - circuitBreakerTrippedCount) < availableServersThreshold.get()) { logger.debug("zoneAffinity is overriden. blackOutServerPercentage: {}, activeReqeustsPerServer: {}, availableServers: {}", new Object[] {(double) circuitBreakerTrippedCount / instanceCount, loadPerServer, instanceCount - circuitBreakerTrippedCount}); return false; } else { return true; } } } |
DefaultNIWSServerListFilter:該過濾器完全繼承自ZoneAffinityServerListFilter,是默認的NIWS(Netflix Internal Web Service)過濾器。
ServerListSubsetFilter:該過濾器也繼承自ZoneAffinityServerListFilter,它非常適用于擁有大規模服務器集群(上百或更多)的系統。因為它可以產生一個“區域感知”結果的子集列表,同時它還能夠通過比較服務實例的通信失敗數量和并發連接數來判定該服務是否健康來選擇性的從服務實例列表中剔除那些相對不夠健康的實例。該過濾器的實現主要分為三步:
- 獲取“區域感知”的過濾結果,來作為候選的服務實例清單
- 從當前消費者維護的服務實例子集中剔除那些相對不夠健康的實例(同時也將這些實例從候選清單中剔除,防止第三步的時候又被選入),不夠健康的標準如下:
a. 服務實例的并發連接數超過客戶端配置的值,默認為0,配置參數為:<clientName>.<nameSpace>.ServerListSubsetFilter.eliminationConnectionThresold
b. 服務實例的失敗數超過客戶端配置的值,默認為0,配置參數為:<clientName>.<nameSpace>.ServerListSubsetFilter.eliminationFailureThresold
c. 如果按符合上面任一規則的服務實例剔除后,剔除比例小于客戶端默認配置的百分比,默認為0.1(10%),配置參數為:<clientName>.<nameSpace>.ServerListSubsetFilter.forceEliminatePercent。那么就先對剩下的實例列表進行健康排序,再開始從最不健康實例進行剔除,直到達到配置的剔除百分比。 - 在完成剔除后,清單已經少了至少10%(默認值)的服務實例,最后通過隨機的方式從候選清單中選出一批實例加入到清單中,以保持服務實例子集與原來的數量一致,而默認的實例子集數量為20,其配置參數為:<clientName>.<nameSpace>.ServerListSubsetFilter.size。
ZonePreferenceServerListFilter:Spring Cloud整合時新增的過濾器。若使用Spring Cloud整合Eureka和Ribbon時會默認使用該過濾器。它實現了通過配置或者Eureka實例元數據的所屬區域(Zone)來過濾出同區域的服務實例。如下面的源碼所示,它的實現非常簡單,首先通過父類ZoneAffinityServerListFilter的過濾器來獲得“區域感知”的服務實例列表,然后遍歷這個結果取出根據消費者配置預設的區域Zone來進行過濾,如果過濾的結果是空的就直接返回父類獲取的結果,如果不為空就返回通過消費者配置的Zone過濾后的結果。
public List<Server> getFilteredListOfServers(List<Server> servers) { List<Server> output = super.getFilteredListOfServers(servers); if (this.zone != null && output.size() == servers.size()) { List<Server> local = new ArrayList<Server>(); for (Server server : output) { if (this.zone.equalsIgnoreCase(server.getZone())) { local.add(server); } } if (!local.isEmpty()) { return local; } } return output; } |
ZoneAwareLoadBalancer
ZoneAwareLoadBalancer負載均衡器是對DynamicServerListLoadBalancer的擴展。在DynamicServerListLoadBalancer中,我們可以看到它并沒有重寫選擇具體服務實例的chooseServer函數,所以它依然會采用在BaseLoadBalancer中實現的算法,使用RoundRobinRule規則,以線性輪詢的方式來選擇調用的服務實例,該算法實現簡單并沒有區域(Zone)的概念,所以它會把所有實例視為一個Zone下的節點來看待,這樣就會周期性的產生跨區域(Zone)訪問的情況,由于跨區域會產生更高的延遲,這些實例主要以防止區域性故障實現高可用為目的而不能作為常規訪問的實例,所以在多區域部署的情況下會有一定的性能問題,而該負載均衡器則可以避免這樣的問題。那么它是如何實現的呢?
首先,在ZoneAwareLoadBalancer中,我們可以發現,它并沒有重寫setServersList,說明實現服務實例清單的更新主邏輯沒有修改。但是我們可以發現它重寫了這個函數:setServerListForZones(Map<String, List<Server>> zoneServersMap)。看到這里可能會有一些陌生,因為它并不是接口中定義的必備函數,所以我們不妨去父類DynamicServerListLoadBalancer中尋找一下該函數,我們可以找到下面的定義了:
| public void setServersList(List lsrv) { super.setServersList(lsrv); List<T> serverList = (List<T>) lsrv; Map<String, List<Server>> serversInZones = new HashMap<String, List<Server>>(); ... setServerListForZones(serversInZones); } protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) { LOGGER.debug("Setting server list for zones: {}", zoneServersMap); getLoadBalancerStats().updateZoneServerMapping(zoneServersMap); } |
setServerListForZones函數的調用位于更新服務實例清單函數setServersList的最后,同時從其實現內容來看,它在父類DynamicServerListLoadBalancer中的作用是根據按區域Zone分組的實例列表,為負載均衡器中的LoadBalancerStats對象創建ZoneStats并放入Map zoneStatsMap集合中,每一個區域Zone會對應一個ZoneStats,它用于存儲每個Zone的一些狀態和統計信息。
在ZoneAwareLoadBalancer中對setServerListForZones的重寫如下:
| protected void setServerListForZones(Map<String, List<Server>> zoneServersMap) { super.setServerListForZones(zoneServersMap); if (balancers == null) { balancers = new ConcurrentHashMap<String, BaseLoadBalancer>(); } for (Map.Entry<String, List<Server>> entry: zoneServersMap.entrySet()) { String zone = entry.getKey().toLowerCase(); getLoadBalancer(zone).setServersList(entry.getValue()); } for (Map.Entry<String, BaseLoadBalancer> existingLBEntry: balancers.entrySet()) { if (!zoneServersMap.keySet().contains(existingLBEntry.getKey())) { existingLBEntry.getValue().setServersList(Collections.emptyList()); } } } |
可以看到,在該實現中創建了一個ConcurrentHashMap()類型的balancers對象,它將用來存儲每個Zone區域對應的負載均衡器,而具體的負載均衡器的創建則是通過下面的第一個循環中調用getLoadBalancer函數來完成,同時在創建負載均衡器的時候會創建它的規則(如果當前實現中沒有IRULE的實例,就創建一個AvailabilityFilteringRule規則;如果已經有具體實例,就clone一個),在創建完負載均衡器后又馬上調用setServersList函數為其設置對應Zone區域的實例清單。而第二個循環則是對Zone區域中實例清單的檢查,看看是否有Zone區域下已經沒有實例了,是的話就將balancers中對應Zone區域的實例列表清空,該操作的作用是為了后續選擇節點時,防止過時的Zone區域統計信息干擾具體實例的選擇算法。
在了解了該負載均衡器是如何擴展服務實例清單的實現后,我們來具體看看它是如何挑選服務實例,來實現對區域的識別的:
| public Server chooseServer(Object key) { if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) { logger.debug("Zone aware logic disabled or there is only one zone"); return super.chooseServer(key); } Server server = null; try { LoadBalancerStats lbStats = getLoadBalancerStats(); Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats); logger.debug("Zone snapshots: {}", zoneSnapshot); ... Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()); logger.debug("Available zones: {}", availableZones); if (availableZones != null && availableZones.size() < zoneSnapshot.keySet().size()) { String zone = ZoneAvoidanceRule.randomChooseZone(zoneSnapshot, availableZones); logger.debug("Zone chosen: {}", zone); if (zone != null) { BaseLoadBalancer zoneLoadBalancer = getLoadBalancer(zone); server = zoneLoadBalancer.chooseServer(key); } } } catch (Throwable e) { logger.error("Unexpected exception when choosing server using zone aware logic", e); } if (server != null) { return server; } else { logger.debug("Zone avoidance logic is not invoked."); return super.chooseServer(key); } } |
從源碼中我們可以看到,只有當負載均衡器中維護的實例所屬Zone區域個數大于1的時候才會執行這里的選擇策略,否則還是將使用父類的實現。當Zone區域個數大于1個的時候,它的實現步驟主要如下:
- 調用ZoneAvoidanceRule中的靜態方法createSnapshot(lbStats),為當前負載均衡器中所有的Zone區域分別創建快照,保存在Map zoneSnapshot中,這些快照中的數據將用于后續的算法。
- 調用ZoneAvoidanceRule中的靜態方法getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()),來獲取可用的Zone區域集合,在該函數中會通過Zone區域快照中的統計數據來實現可用區的挑選。
- 首先它會剔除符合這些規則的Zone區域:所屬實例數為零的Zone區域;Zone區域內實例平均負載小于零,或者實例故障率(斷路器斷開次數/實例數)大于等于閾值(默認為0.99999)。
- 然后根據Zone區域的實例平均負載來計算出最差的Zone區域,這里的最差指的是實例平均負載最高的Zone區域。
- 如果在上面的過程中沒有符合剔除要求的區域,同時實例最大平均負載小于閾值(默認為20%),就直接返回所有Zone區域為可用區域。否則,從最壞Zone區域集合中隨機的選擇一個,將它從可用Zone區域集合中剔除。
- 當獲得的可用Zone區域集合不為空,并且個數小于Zone區域總數,就隨機的選擇一個Zone區域。
- 在確定了某個Zone區域后,則獲取對應Zone區域的服務均衡器,并調用chooseServer來選擇具體的服務實例,而在chooseServer中將使用IRule接口的choose函數來選擇具體的服務實例。在這里IRule接口的實現會使用ZoneAvoidanceRule來挑選出具體的服務實例。
負載均衡策略
通過上面的源碼解讀,我們已經對Ribbon實現的負載均衡器以及其中包含的服務實例過濾器、服務實例信息的存儲對象、區域的信息快照等都有了深入的認識和理解,但是對于負載均衡器中的服務實例選擇策略只是講解了幾個默認實現的內容,而對于IRule的其他實現還沒有詳細的解讀,下面我們來看看在Ribbon中共提供了那些負載均衡的策略實現。
如上圖所示,我們可以看到在Ribbon中實現了非常多的選擇策略,其中也包含了我們在前面內容中提到過的:RoundRobinRule和ZoneAvoidanceRule。下面我們來詳細的解讀一下IRule接口的各個實現。
AbstractLoadBalancerRule
負載均衡策略的抽象類,在該抽象類中定義了負載均衡器ILoadBalancer對象,該對象能夠在具體實現選擇服務策略時,獲取到一些負載均衡器中維護的信息來作為分配依據,并以此設計一些算法來實現針對特定場景的高效策略。
| public abstract class AbstractLoadBalancerRule implements IRule, IClientConfigAware { private ILoadBalancer lb; public void setLoadBalancer(ILoadBalancer lb){ this.lb = lb; } public ILoadBalancer getLoadBalancer(){ return lb; } } |
RandomRule
該策略實現了從服務實例清單中隨機選擇一個服務實例的功能。它的具體實現如下,可以看到IRule接口的choose(Object key)函數實現,委托給了該類中的choose(ILoadBalancer lb, Object key),該方法增加了一個負載均衡器對象的參數。從具體的實現上看,它會使用傳入的負載均衡器來獲得可用實例列表upList和所有實例列表allList,并通過rand.nextInt(serverCount)函數來獲取一個隨機數,并將該隨機數作為upList的索引值來返回具體實例。同時,具體的選擇邏輯在一個while (server == null)循環之內,而根據選擇邏輯的實現,正常情況下每次選擇都應該能夠選出一個服務實例,如果出現死循環獲取不到服務實例時,則很有可能存在并發的Bug。
public Server choose(Object key) { return choose(getLoadBalancer(), key); } public Server choose(ILoadBalancer lb, Object key) { ... Server server = null; while (server == null) { if (Thread.interrupted()) { return null; } List<Server> upList = lb.getReachableServers(); List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int index = rand.nextInt(serverCount); server = upList.get(index); if (server == null) { Thread.yield(); continue; } if (server.isAlive()) { return (server); } server = null; Thread.yield(); } return server; } |
RoundRobinRule
該策略實現了按照線性輪詢的方式依次選擇每個服務實例的功能。它的具體實現如下,其詳細結構與RandomRule非常類似。除了循環條件不同外,就是從可用列表中獲取所謂的邏輯不同。從循環條件中,我們可以看到增加了一個count計數變量,該變量會在每次循環之后累加,也就是說如果一直選擇不到server超過10次,那么就會結束嘗試,并打印一個警告信息No available alive servers after 10 tries from load balancer: ...。而線性輪詢的實現則是通過AtomicInteger nextServerCyclicCounter對象實現,每次進行實例選擇時通過調用incrementAndGetModulo函數實現遞增。
| public Server choose(ILoadBalancer lb, Object key) { ... Server server = null; int count = 0; while (server == null && count++ < 10) { List<Server> reachableServers = lb.getReachableServers(); List<Server> allServers = lb.getAllServers(); int upCount = reachableServers.size(); int serverCount = allServers.size(); if ((upCount == 0) || (serverCount == 0)) { log.warn("No up servers available from load balancer: " + lb); return null; } int nextServerIndex = incrementAndGetModulo(serverCount); server = allServers.get(nextServerIndex); if (server == null) { Thread.yield(); continue; } if (server.isAlive() && (server.isReadyToServe())) { return (server); } server = null; } if (count >= 10) { log.warn("No available alive servers after 10 tries from load balancer: " + lb); } return server; } |
RetryRule
該策略實現了一個具備重試機制的實例選擇功能。從下面的實現中我們可以看到,在其內部還定義了一個IRule對象,默認使用了RoundRobinRule實例。而在choose方法中的則實現了對內部定義的策略進行反復嘗試的策略,若期間能夠選擇到具體的服務實例就返回,若選擇不到就根據設置的嘗試結束時間為閾值(maxRetryMillis參數定義的值 + choose方法開始執行的時間戳),當超過該閾值后就返回null。
| public class RetryRule extends AbstractLoadBalancerRule { IRule subRule = new RoundRobinRule(); long maxRetryMillis = 500; ... public Server choose(ILoadBalancer lb, Object key) { long requestTime = System.currentTimeMillis(); long deadline = requestTime + maxRetryMillis; Server answer = null; answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { InterruptTask task = new InterruptTask(deadline - System.currentTimeMillis()); while (!Thread.interrupted()) { answer = subRule.choose(key); if (((answer == null) || (!answer.isAlive())) && (System.currentTimeMillis() < deadline)) { Thread.yield(); } else { break; } } task.cancel(); } if ((answer == null) || (!answer.isAlive())) { return null; } else { return answer; } } ... } |
WeightedResponseTimeRule
該策略是對RoundRobinRule的擴展,增加了根據實例的運行情況來計算權重,并根據權重來挑選實例,以達到更優的分配效果,它的實現主要有三個核心內容:
定時任務
WeightedResponseTimeRule策略在初始化的時候會通過serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval)啟動一個定時任務,用來為每個服務實例計算權重,該任務默認30秒執行一次。
| class DynamicServerWeightTask extends TimerTask { public void run() { ServerWeight serverWeight = new ServerWeight(); try { serverWeight.maintainWeights(); } catch (Throwable t) { logger.error("Throwable caught while running DynamicServerWeightTask for " + name, t); } } } |
權重計算
在源碼中我們可以輕松找到用于存儲權重的對象:List<Double> accumulatedWeights = new ArrayList<Double>(),該List中每個權重值所處的位置對應了負載均衡器維護的服務實例清單中所有實例在清單中的位置。
維護實例權重的計算過程通過maintainWeights函數實現,具體如下源碼所示:
| public void maintainWeights() { ILoadBalancer lb = getLoadBalancer(); ... try { logger.info("Weight adjusting job started"); AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb; LoadBalancerStats stats = nlb.getLoadBalancerStats(); ... // 計算所有實例的平均響應時間的總和:totalResponseTime double totalResponseTime = 0; for (Server server : nlb.getAllServers()) { // this will automatically load the stats if not in cache ServerStats ss = stats.getSingleServerStat(server); totalResponseTime += ss.getResponseTimeAvg(); } // 逐個計算每個實例的權重:weightSoFar + totalResponseTime - 實例的平均響應時間 Double weightSoFar = 0.0; List<Double> finalWeights = new ArrayList<Double>(); for (Server server : nlb.getAllServers()) { ServerStats ss = stats.getSingleServerStat(server); double weight = totalResponseTime - ss.getResponseTimeAvg(); weightSoFar += weight; finalWeights.add(weightSoFar); } setWeights(finalWeights); } catch (Throwable t) { logger.error("Exception while dynamically calculating server weights", t); } finally { serverWeightAssignmentInProgress.set(false); } } |
該函數的實現主要分為兩個步驟:
- 根據LoadBalancerStats中記錄的每個實例的統計信息,累加所有實例的平均響應時間,得到總平均響應時間totalResponseTime,該值會用于后續的計算。
- 為負載均衡器中維護的實例清單逐個計算權重(從第一個開始),計算規則為:weightSoFar + totalResponseTime - 實例的平均響應時間,其中weightSoFar初始化為零,并且每計算好一個權重需要累加到weightSoFar上供下一次計算使用。totalResponseTime則的上計算結果。
舉個簡單的例子來理解這個計算過程:假設有4個實例A、B、C、D,他們的平均響應時間為:10、40、80、100,所以總響應時間是10 + 40 + 80 + 100 = 230,每個實例的權重為總響應時間與實例自身的平均響應時間的差的累積獲得,所以實例A、B、C、D的權重分別為:
- 實例A:230 - 10 = 220
- 實例B:220 + (230 - 40)= 410
- 實例C:410 + (230 - 80)= 560
- 實例D:560 + (230 - 100)= 690
需要注意的是,這里的權重值只是表示了各實例權重區間的上限,并非某個實例的優先級,所以不是數值越大被選中的概率就越大。那么什么是權重區間呢?以上面例子的計算結果為例,它實際上是為這4個實例構建了4個不同的區間,每個實例的區間下限是上一個實例的區間上限,而每個實例的區間上限則是我們上面計算并存儲于List accumulatedWeights中的權重值,其中第一個實例的下限默認為零。所以,根據上面示例的權重計算結果,我們可以得到每個實例的權重區間:
- 實例A:[0, 220]
- 實例B:(220, 410]
- 實例C:(410, 560]
- 實例D:(560,690)
我們不難發現,實際上每個區間的寬度就是:總的平均響應時間 - 實例的平均響應時間,所以實例的平均響應時間越短、權重區間的寬度越大,而權重區間的寬度越大被選中的概率就越高。可能很多讀者會問,這些區間邊界的開閉是如何確定的呢?為什么不那么規則?下面我們會通過實例選擇算法的解讀來解釋。
實例選擇
WeightedResponseTimeRule選擇實例的實現與之前介紹的算法結構類似,下面是它主體的算法(省略了循環體和一些判斷等處理):
| public Server choose(ILoadBalancer lb, Object key) { ... List<Double> currentWeights = accumulatedWeights; ... List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { return null; } int serverIndex = 0; // 獲取最后一個實例的權重 double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1); if (maxTotalWeight < 0.001d) { // 如果最后一個實例的權重值小于0.001,則采用父類實現的線性輪詢的策略 server = super.choose(getLoadBalancer(), key); if(server == null) { return server; } } else { // 如果最后一個實例的權重值大于等于0.001,就產生一個[0, maxTotalWeight)的隨機數 double randomWeight = random.nextDouble() * maxTotalWeight; int n = 0; for (Double d : currentWeights) { // 遍歷維護的權重清單,若權重大于等于隨機得到的數值,就選擇這個實例 if (d >= randomWeight) { serverIndex = n; break; } else { n++; } } server = allList.get(serverIndex); } ... return server; } |
從源碼中,我們可以看到,選擇實例的核心過程就兩步:
- 生產一個[0, 最大權重值)區間內的隨機數。
- 遍歷權重列表,比較權重值與隨機數的大小,如果權重值大于等于隨機數,就拿當前權重列表的索引值去服務實例列表中獲取具體實例。這就是在上一節中提到的服務實例會根據權重區間挑選的原理,而權重區間邊界的開閉原則根據算法,正常應該每個區間為(x, y]的形式,但是第一個實例和最后一個實例為什么不同呢?由于隨機數的最小取值可以為0,所以第一個實例的下限是閉區間,同時隨機數的最大值取不到最大權重值,所以最后一個實例的上限是開區間。
若繼續以上面的數據為例,進行服務實例的選擇,則該方法會從[0, 690)區間中選出一個隨機數,比如選出的隨機數為230,由于該值位于第二個區間,所以此時就會選擇實例B來進行請求。
ClientConfigEnabledRoundRobinRule
該策略較為特殊,我們一般不直接使用它。因為它本身并沒有實現什么特殊的處理邏輯,正如下面的源碼所示,在它的內部定義了一個RoundRobinRule策略,而choose函數的實現也正是使用了RoundRobinRule的線性輪詢機制,所以它實現的功能實際上與RoundRobinRule相同,那么定義它有什么特殊的用處呢?
雖然我們不會直接使用該策略,但是通過繼承該策略,那么默認的choose就實現了線性輪詢機制,在子類中做一些高級策略時通常都有可能會存在一些無法實施的情況,那么就可以通過父類的實現作為備選。在后文中我們將繼續介紹的高級策略均是基于ClientConfigEnabledRoundRobinRule的擴展。
| public class ClientConfigEnabledRoundRobinRule extends AbstractLoadBalancerRule { RoundRobinRule roundRobinRule = new RoundRobinRule(); ... public Server choose(Object key) { if (roundRobinRule != null) { return roundRobinRule.choose(key); } else { throw new IllegalArgumentException( "This class has not been initialized with the RoundRobinRule class"); } } } |
BestAvailableRule
該策略繼承自ClientConfigEnabledRoundRobinRule,在實現中它注入了負載均衡器的統計對象:LoadBalancerStats,同時在具體的choose算法中利用LoadBalancerStats保存的實例統計信息來選擇滿足要求的實例。從如下源碼中我們可以看到,它通過遍歷負載均衡器中維護的所有服務實例,會過濾掉故障的實例,并找出并發請求數最小的一個,所以該策略的特性是選出最空閑的實例。
| public Server choose(Object key) { if (loadBalancerStats == null) { return super.choose(key); } List<Server> serverList = getLoadBalancer().getAllServers(); int minimalConcurrentConnections = Integer.MAX_VALUE; long currentTime = System.currentTimeMillis(); Server chosen = null; for (Server server: serverList) { ServerStats serverStats = loadBalancerStats.getSingleServerStat(server); if (!serverStats.isCircuitBreakerTripped(currentTime)) { int concurrentConnections = serverStats.getActiveRequestsCount(currentTime); if (concurrentConnections < minimalConcurrentConnections) { minimalConcurrentConnections = concurrentConnections; chosen = server; } } } if (chosen == null) { return super.choose(key); } else { return chosen; } } |
同時,由于該算法的核心依據是統計對象loadBalancerStats,當其為空的時候,該策略是無法執行的。所以從源碼中我們可以看到,當loadBalancerStats為空的時候,它會采用父類的線性輪詢策略,正如我們在介紹ClientConfigEnabledRoundRobinRule時那樣,它的子類在無法滿足實現高級策略時候,可以使用線性輪詢策略的特性。后面將要介紹的策略因為也都繼承自ClientConfigEnabledRoundRobinRule,所以他們都會具有這樣的特性。
PredicateBasedRule
這是一個抽象策略,它也繼承了ClientConfigEnabledRoundRobinRule,從其命名中可以猜出他是一個基于Predicate實現的策略,Predicate是Google Guava Collection工具對集合進行過濾的條件接口。
如下源碼所示,它定義了一個抽象函數getPredicate來獲取AbstractServerPredicate對象的實現,而在choose函數中,通過AbstractServerPredicate的chooseRoundRobinAfterFiltering函數來選出具體的服務實例。從該函數的命名我們也大致能猜出它的基礎邏輯:先通過子類中實現的Predicate邏輯來過濾一部分服務實例,然后再以線性輪詢的方式從過濾后的實例清單中選出一個。
| public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule { public abstract AbstractServerPredicate getPredicate(); public Server choose(Object key) { ILoadBalancer lb = getLoadBalancer(); Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key); if (server.isPresent()) { return server.get(); } else { return null; } } } |
通過下面AbstractServerPredicate的源碼片段,可以證實我們上面所做的猜測。在上面choose函數中調用的chooseRoundRobinAfterFiltering方法先通過內部定義的getEligibleServers函數來獲取備選的實例清單(實現了過濾),如果返回的清單為空,則用Optional.absent()來表示不存在,反之則以線性輪詢的方式從備選清單中獲取一個實例。
| public abstract class AbstractServerPredicate implements Predicate<PredicateKey> { ... public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) { List<Server> eligible = getEligibleServers(servers, loadBalancerKey); if (eligible.size() == 0) { return Optional.absent(); } return Optional.of(eligible.get(nextIndex.getAndIncrement() % eligible.size())); } public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) { if (loadBalancerKey == null) { return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate())); } else { List<Server> results = Lists.newArrayList(); for (Server server: servers) { if (this.apply(new PredicateKey(loadBalancerKey, server))) { results.add(server); } } return results; } } } |
在了解了整體邏輯之后,我們來詳細看看實現過濾功能的getEligibleServers函數。從源碼上看,它的實現結構非常簡單清晰,通過遍歷服務清單,使用this.apply方法來判斷實例是否需要保留,是就添加到結果列表中。
可能到這里,不熟悉Google Guava Collections集合工具的讀者會比較困惑,這個apply在AbstractServerPredicate中并找不到它的定義,那么它是如何實現過濾的呢?實際上,AbstractServerPredicate實現了com.google.common.base.Predicate接口,而apply方法是該接口中的定義,主要用來實現過濾條件的判斷邏輯,它輸入的參數則是過濾條件需要用到的一些信息(比如源碼中的new PredicateKey(loadBalancerKey, server)),它傳入了關于實例的統計信息和負載均衡器的選擇算法傳遞過來的key)。既然在AbstractServerPredicate中我們未能找到apply的實現,所以這里的chooseRoundRobinAfterFiltering函數只是定義了一個模板策略:“先過濾清單,再輪詢選擇”。對于如何過濾,則需要我們在AbstractServerPredicate的子類去實現apply方法來確定具體的過濾策略了。
后面我們將要介紹的兩個策略就是基于此抽象策略實現,只是它們使用了不同的Predicate實現來完成過濾邏輯以達到不同的實例選擇效果。
Google Guava Collections是一個對Java Collections Framework增強和擴展的一個開源項目。雖然Java Collections Framework已經能夠 滿足了我們大多數情況下使用集合的要求,但是當遇到一些特殊的情況我們的代碼會比較冗長且容易出錯。Guava Collections 可以幫助我們的讓集合操作代碼更為簡短精煉并大大增強代碼的可讀性。
AvailabilityFilteringRule
該策略繼承自上面介紹的抽象策略PredicateBasedRule,所以它也繼承了“先過濾清單,再輪詢選擇”的基本處理邏輯,其中過濾條件使用了AvailabilityPredicate:
| public class AvailabilityPredicate extends AbstractServerPredicate { ... public boolean apply(@Nullable PredicateKey input) { LoadBalancerStats stats = getLBStats(); if (stats == null) { return true; } return !shouldSkipServer(stats.getSingleServerStat(input.getServer())); } private boolean shouldSkipServer(ServerStats stats) { if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped()) || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) { return true; } return false; } } |
從上述源碼中,我們可以知道它的主要過濾邏輯位于shouldSkipServer方法中,它主要判斷服務實例的兩項內容:
- 是否故障,即斷路器是否生效已斷開
- 實例的并發請求數大于閾值,默認值為$2^{31}$ - 1,該配置我們可通過參數..ActiveConnectionsLimit來修改
其中只要有一個滿足apply就返回false(代表該節點可能存在故障或負載過高),都不滿足就返回true。
在該策略中,除了實現了上面的過濾方法之外,對于choose的策略也做了一些改進優化,所以父類的實現對于它來說只是一個備用選項,其具體實現如下:
| public Server choose(Object key) { int count = 0; Server server = roundRobinRule.choose(key); while (count++ <= 10) { if (predicate.apply(new PredicateKey(server))) { return server; } server = roundRobinRule.choose(key); } return super.choose(key); } |
可以看到,它并沒有像父類中那樣,先遍歷所有的節點進行過濾,然后在過濾后的集合中選擇實例。而是先線性的方式選擇一個實例,接著用過濾條件來判斷該實例是否滿足要求,若滿足就直接使用該實例,若不滿足要求就再選擇下一個實例,并檢查是否滿足要求,如此循環進行,當這個過程重復了10次還是沒有找到符合要求的實例,就采用父類的實現方案。
簡單的說,該策略通過線性抽樣的方式直接嘗試尋找可用且較空閑的實例來使用,優化了父類每次都要遍歷所有實例的開銷。
ZoneAvoidanceRule
該策略我們在介紹負載均衡器ZoneAwareLoadBalancer時已經提到過了,它也是PredicateBasedRule的具體實現類。在之前的介紹中主要針對ZoneAvoidanceRule中用于選擇Zone區域策略的一些靜態函數,比如:createSnapshot、getAvailableZones。在這里我們將詳細的看看ZoneAvoidanceRule作為服務實例過濾條件的實現原理。從下面ZoneAvoidanceRule的源碼片段中我們可以看到,它使用了CompositePredicate來進行服務實例清單的過濾。這是一個組合過濾條件,在其構造函數中,它以ZoneAvoidancePredicate為主過濾條件,AvailabilityPredicate為次過濾條件初始化了組合過濾條件的實例。
| public class ZoneAvoidanceRule extends PredicateBasedRule { ... private CompositePredicate compositePredicate; public ZoneAvoidanceRule() { super(); ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this); AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this); compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate); } ... } |
ZoneAvoidanceRule在實現的時候并沒有像AvailabilityFilteringRule那樣重寫choose函數來優化,所以它完全遵循了父類的過濾主邏輯:“先過濾清單,再輪詢選擇”。其中過濾清單的條件就是我們上面提到的以ZoneAvoidancePredicate為主過濾條件、AvailabilityPredicate為次過濾條件的組合過濾條件CompositePredicate。從CompositePredicate的源碼片段中,我們可以看到它定義了一個主過濾條件AbstractServerPredicate delegate以及一組次過濾條件列表List fallbacks,所以它的次過濾列表是可以擁有多個的,并且由于它采用了List存儲所以次過濾條件是按順序執行的。
| public class CompositePredicate extends AbstractServerPredicate { private AbstractServerPredicate delegate; private List<AbstractServerPredicate> fallbacks = Lists.newArrayList(); private int minimalFilteredServers = 1; private float minimalFilteredPercentage = 0; public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) { List<Server> result = super.getEligibleServers(servers, loadBalancerKey); Iterator<AbstractServerPredicate> i = fallbacks.iterator(); while (!(result.size() >= minimalFilteredServers && result.size() > (int) (servers.size() * minimalFilteredPercentage)) && i.hasNext()) { AbstractServerPredicate predicate = i.next(); result = predicate.getEligibleServers(servers, loadBalancerKey); } return result; } } |
再來看看獲取過濾結果的實現函數getEligibleServers中,它的處理邏輯如下:
- 使用主過濾條件對所有實例過濾并返回過濾后的實例清單
- 依次使用次過濾條件列表中的過濾條件對主過濾條件的結果進行過濾
- 每次過濾之后(包括主過濾條件和次過濾條件),都需要判斷下面兩個條件,只要有一個符合就不再進行過濾,將當前結果返回供線性輪詢算法選擇:
- 過濾后的實例總數 >= 最小過濾實例數(minimalFilteredServers,默認為1)
- 過濾后的實例比例 > 最小過濾百分比(minimalFilteredPercentage,默認為0)
總結
以上是生活随笔為你收集整理的Spring Cloud源码分析(二)Ribbon的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Spring Cloud实战小贴士:版本
- 下一篇: 消失了一周的小夕在玩什么啦?