javascript
集成源码深度剖析:Fescar x Spring Cloud
Fescar 簡介
常見的分布式事務方式有基于 2PC 的 XA (e.g. atomikos),從業務層入手的 TCC( e.g. byteTCC)、事務消息 ( e.g. RocketMQ Half Message) 等等。XA 是需要本地數據庫支持的分布式事務的協議,資源鎖在數據庫層面導致性能較差,而支付寶作為布道師引入的 TCC 模式需要大量的業務代碼保證,開發維護成本較高。
分布式事務是業界比較關注的領域,這也是短短時間 Fescar 能收獲6k Star的原因之一。Fescar 名字取自?Fast & Easy Commit And Rollback?,簡單來說Fescar通過對本地 RDBMS 分支事務的協調來驅動完成全局事務,是工作在應用層的中間件。主要優點是相對于XA模式是性能較好不長時間占用連接資源,相對于 TCC 方式開發成本和業務侵入性較低。
類似于 XA,Fescar 將角色分為 TC、RM、TM,事務整體過程模型如下:
1. TM 向 TC 申請開啟一個全局事務,全局事務創建成功并生成一個全局唯一的 XID。 2. XID 在微服務調用鏈路的上下文中傳播。 3. RM 向 TC 注冊分支事務,將其納入 XID 對應全局事務的管轄。 4. TM 向 TC 發起針對 XID 的全局提交或回滾決議。 5. TC 調度 XID 下管轄的全部分支事務完成提交或回滾請求。其中在目前的實現版本中 TC 是獨立部署的進程,維護全局事務的操作記錄和全局鎖記錄,負責協調并驅動全局事務的提交或回滾。TM RM 則與應用程序工作在同一應用進程。RM對 JDBC 數據源采用代理的方式對底層數據庫做管理,利用語法解析,在執行事務保留快照,并生成 undo log。大概的流程和模型劃分就介紹到這里,下面開始對 Fescar 事務傳播機制的分析。
Fescar 事務傳播機制
Fescar 事務傳播包括應用內事務嵌套調用和跨服務調用的事務傳播。Fescar 事務是怎么在微服務調用鏈中傳播的呢?Fescar 提供了事務 API 允許用戶手動綁定事務的 XID 并加入到全局事務中,所以我們根據不同的服務框架機制,將 XID 在鏈路中傳遞即可實現事務的傳播。
RPC 請求過程分為調用方與被調用方兩部分,我們將 XID 在請求與響應時做相應的處理即可。大致過程為:調用方即請求方將當前事務上下文中的 XID 取出,通過RPC協議傳遞給被調用方;被調用方從請求中的將 XID 取出,并綁定到自己的事務上下文中,納入全局事務。微服務框架一般都有相應的 Filter 和 Interceptor 機制,我們來分析下 Spring Cloud 與Fescar 的整合過程。
Fescar 與 Spring Cloud Alibaba 集成部分源碼解析
本部分源碼全部來自于 spring-cloud-alibaba-fescar. 源碼解析部分主要包括AutoConfiguration、微服務被調用方和微服務調用方三大部分。對于微服務調用方方式具體分為 RestTemplate 和 Feign,對于 Feign 請求方式又進一步細分為結合 Hystrix 和 Sentinel 的使用模式。
Fescar AutoConfiguration
對于 AutoConfiguration 部分的解析此處只介紹與 Fescar 啟動相關的部分,其他部分的解析將穿插于【微服務被調用方】和【微服務調用方】章節進行介紹。
Fescar 的啟動需要配置 GlobalTransactionScanner,GlobalTransactionScanner 負責初始化 Fescar 的 RM client、TM client 和 自動代理標注 GlobalTransactional 注解的類。
GlobalTransactionScanner bean 的啟動通過 GlobalTransactionAutoConfiguration 加載并注入FescarProperties。
FescarProperties 包含了 Fescar的重要屬性 txServiceGroup ,此屬性的可通過 application.properties 文件中的 key: spring.cloud.alibaba.fescar.txServiceGroup 讀取,默認值為 ${spring.application.name}-fescar-service-group 。txServiceGroup 表示Fescar 的邏輯事務分組名,此分組名通過配置中心(目前支持文件、Apollo)獲取邏輯事務分組名對應的 TC 集群名稱,進一步通過集群名稱構造出 TC 集群的服務名,通過注冊中心(目前支持Nacos、Redis、ZooKeeper和Eureka)和服務名找到可用的 TC 服務節點,然后 RM client、TM client 與 TC 進行 RPC 交互。
微服務被調用方
由于調用方的邏輯比較多一點,我們先分析被調用方的邏輯。針對于 Spring Cloud 項目,默認采用的 RPC 傳輸協議時 HTTP 協議,所以使用了 HandlerInterceptor 機制來對HTTP的請求做攔截。
HandlerInterceptor 是 Spring 提供的接口, 它有以下三個方法可以被覆寫。
/*** Intercept the execution of a handler. Called after HandlerMapping determined* an appropriate handler object, but before HandlerAdapter invokes the handler.*/default boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler)throws Exception {return true;}/*** Intercept the execution of a handler. Called after HandlerAdapter actually* invoked the handler, but before the DispatcherServlet renders the view.* Can expose additional model objects to the view via the given ModelAndView.*/default void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler,@Nullable ModelAndView modelAndView) throws Exception {}/*** Callback after completion of request processing, that is, after rendering* the view. Will be called on any outcome of handler execution, thus allows* for proper resource cleanup.*/default void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler,@Nullable Exception ex) throws Exception {}根據注釋,我們可以很明確的看到各個方法的作用時間和常用用途。對于 Fescar 集成來講,它需要重寫了 preHandle、afterCompletion 方法。
FescarHandlerInterceptor 的作用是將服務鏈路傳遞過來的 XID,綁定到服務節點的事務上下文中,并且在請求完成后清理相關資源。FescarHandlerInterceptorConfiguration 中配置了所有的 url 均進行攔截,對所有的請求過來均會執行該攔截器,進行 XID 的轉換與事務綁定。
/*** @author xiaojing** Fescar HandlerInterceptor, Convert Fescar information into* @see com.alibaba.fescar.core.context.RootContext from http request's header in* {@link org.springframework.web.servlet.HandlerInterceptor#preHandle(HttpServletRequest , HttpServletResponse , Object )},* And clean up Fescar information after servlet method invocation in* {@link org.springframework.web.servlet.HandlerInterceptor#afterCompletion(HttpServletRequest, HttpServletResponse, Object, Exception)}*/ public class FescarHandlerInterceptor implements HandlerInterceptor {private static final Logger log = LoggerFactory.getLogger(FescarHandlerInterceptor.class);@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response,Object handler) throws Exception {String xid = RootContext.getXID();String rpcXid = request.getHeader(RootContext.KEY_XID);if (log.isDebugEnabled()) {log.debug("xid in RootContext {} xid in RpcContext {}", xid, rpcXid);}if (xid == null && rpcXid != null) {RootContext.bind(rpcXid);if (log.isDebugEnabled()) {log.debug("bind {} to RootContext", rpcXid);}}return true;}@Overridepublic void afterCompletion(HttpServletRequest request, HttpServletResponse response,Object handler, Exception e) throws Exception {String rpcXid = request.getHeader(RootContext.KEY_XID);if (StringUtils.isEmpty(rpcXid)) {return;}String unbindXid = RootContext.unbind();if (log.isDebugEnabled()) {log.debug("unbind {} from RootContext", unbindXid);}if (!rpcXid.equalsIgnoreCase(unbindXid)) {log.warn("xid in change during RPC from {} to {}", rpcXid, unbindXid);if (unbindXid != null) {RootContext.bind(unbindXid);log.warn("bind {} back to RootContext", unbindXid);}}}}preHandle 在請求執行前被調用,xid 為當前事務上下文已經綁定的全局事務的唯一標識,rpcXid 為請求通過 HTTP Header 傳遞過來需要綁定的全局事務標識。preHandle 方法中判斷如果當前事務上下文中沒有 XID,且 rpcXid 不為空,那么就將 rpcXid 綁定到當前的事務上下文。
afterCompletion 在請求完成后被調用,該方法用來執行資源的相關清理動作。Fescar 通過 RootContext.unbind() 方法對事務上下文涉及到的 XID 進行解綁。下面 if 中的邏輯是為了代碼的健壯性考慮,如果遇到 rpcXid和 unbindXid 不相等的情況,再將 unbindXid 重新綁定回去。
對于 Spring Cloud 來講,默認采用的 RPC 方式是 HTTP 的方式,所以對被調用方來講,它的請求攔截方式不用做任何區分,只需要從 Header 中將 XID 就可以取出綁定到自己的事務上下文中即可。但是對于調用方由于請求組件的多樣化,包括熔斷隔離機制,所以要區分不同的情況做處理,后面我們來具體分析一下。
微服務調用方
Fescar 將請求方式分為:RestTemplate、Feign、Feign+Hystrix 和 Feign+Sentinel 。不同的組件通過 Spring Boot 的 Auto Configuration 來完成自動的配置,具體的配置類可以看 spring.factories ,下文也會介紹相關的配置類。
RestTemplate
先來看下如果調用方如果是是基于 RestTemplate 的請求,Fescar 是怎么傳遞 XID 的。
public class FescarRestTemplateInterceptor implements ClientHttpRequestInterceptor {@Overridepublic ClientHttpResponse intercept(HttpRequest httpRequest, byte[] bytes,ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {HttpRequestWrapper requestWrapper = new HttpRequestWrapper(httpRequest);String xid = RootContext.getXID();if (!StringUtils.isEmpty(xid)) {requestWrapper.getHeaders().add(RootContext.KEY_XID, xid);}return clientHttpRequestExecution.execute(requestWrapper, bytes);} }FescarRestTemplateInterceptor 實現了 ClientHttpRequestInterceptor 接口的 intercept 方法,對調用的請求做了包裝,在發送請求時若存在 Fescar 事務上下文 XID 則取出并放到 HTTP Header 中。
FescarRestTemplateInterceptor 通過 FescarRestTemplateAutoConfiguration 實現將 FescarRestTemplateInterceptor 配置到 RestTemplate 中去。
@Configuration public class FescarRestTemplateAutoConfiguration {@Beanpublic FescarRestTemplateInterceptor fescarRestTemplateInterceptor() {return new FescarRestTemplateInterceptor();}@Autowired(required = false)private Collection<RestTemplate> restTemplates;@Autowiredprivate FescarRestTemplateInterceptor fescarRestTemplateInterceptor;@PostConstructpublic void init() {if (this.restTemplates != null) {for (RestTemplate restTemplate : restTemplates) {List<ClientHttpRequestInterceptor> interceptors = new ArrayList<ClientHttpRequestInterceptor>(restTemplate.getInterceptors());interceptors.add(this.fescarRestTemplateInterceptor);restTemplate.setInterceptors(interceptors);}}}}init 方法遍歷所有的 restTemplate ,并將原來 restTemplate 中的攔截器取出,增加 fescarRestTemplateInterceptor 后置入并重排序。
Feign
接下來看下 Feign 的相關代碼,該包下面的類還是比較多的,我們先從其 AutoConfiguration 入手。
@Configuration @ConditionalOnClass(Client.class) @AutoConfigureBefore(FeignAutoConfiguration.class) public class FescarFeignClientAutoConfiguration {@Bean@Scope("prototype")@ConditionalOnClass(name = "com.netflix.hystrix.HystrixCommand")@ConditionalOnProperty(name = "feign.hystrix.enabled", havingValue = "true")Feign.Builder feignHystrixBuilder(BeanFactory beanFactory) {return FescarHystrixFeignBuilder.builder(beanFactory);}@Bean@Scope("prototype")@ConditionalOnClass(name = "com.alibaba.csp.sentinel.SphU")@ConditionalOnProperty(name = "feign.sentinel.enabled", havingValue = "true")Feign.Builder feignSentinelBuilder(BeanFactory beanFactory) {return FescarSentinelFeignBuilder.builder(beanFactory);}@Bean@ConditionalOnMissingBean@Scope("prototype")Feign.Builder feignBuilder(BeanFactory beanFactory) {return FescarFeignBuilder.builder(beanFactory);}@Configurationprotected static class FeignBeanPostProcessorConfiguration {@BeanFescarBeanPostProcessor fescarBeanPostProcessor(FescarFeignObjectWrapper fescarFeignObjectWrapper) {return new FescarBeanPostProcessor(fescarFeignObjectWrapper);}@BeanFescarContextBeanPostProcessor fescarContextBeanPostProcessor(BeanFactory beanFactory) {return new FescarContextBeanPostProcessor(beanFactory);}@BeanFescarFeignObjectWrapper fescarFeignObjectWrapper(BeanFactory beanFactory) {return new FescarFeignObjectWrapper(beanFactory);}}}FescarFeignClientAutoConfiguration 在存在 Client.class 時生效,且要求作用在 FeignAutoConfiguration 之前。由于FeignClientsConfiguration 是在 FeignAutoConfiguration 生成 FeignContext 生效的,所以根據依賴關系, FescarFeignClientAutoConfiguration 同樣早于 FeignClientsConfiguration。
FescarFeignClientAutoConfiguration 自定義了 Feign.Builder,針對于 feign.sentinel,feign.hystrix 和 feign 的情況做了適配,目的是自定義 feign 中 Client 的真正實現為 FescarFeignClient。
HystrixFeign.builder().retryer(Retryer.NEVER_RETRY).client(new FescarFeignClient(beanFactory)) SentinelFeign.builder().retryer(Retryer.NEVER_RETRY).client(new FescarFeignClient(beanFactory)); Feign.builder().client(new FescarFeignClient(beanFactory));FescarFeignClient 是對原來的 Feign 客戶端代理增強,具體代碼見下圖:
public class FescarFeignClient implements Client {private final Client delegate;private final BeanFactory beanFactory;FescarFeignClient(BeanFactory beanFactory) {this.beanFactory = beanFactory;this.delegate = new Client.Default(null, null);}FescarFeignClient(BeanFactory beanFactory, Client delegate) {this.delegate = delegate;this.beanFactory = beanFactory;}@Overridepublic Response execute(Request request, Request.Options options) throws IOException {Request modifiedRequest = getModifyRequest(request);try {return this.delegate.execute(modifiedRequest, options);}finally {}}private Request getModifyRequest(Request request) {String xid = RootContext.getXID();if (StringUtils.isEmpty(xid)) {return request;}Map<String, Collection<String>> headers = new HashMap<>();headers.putAll(request.headers());List<String> fescarXid = new ArrayList<>();fescarXid.add(xid);headers.put(RootContext.KEY_XID, fescarXid);return Request.create(request.method(), request.url(), headers, request.body(),request.charset());}上面的過程中我們可以看到,FescarFeignClient 對原來的 Request 做了修改,它首先將 XID 從當前的事務上下文中取出,如果 XID 不為空的情況下,將 XID 放到了 Header 中。
FeignBeanPostProcessorConfiguration 定義了3個bean:FescarContextBeanPostProcessor、FescarBeanPostProcessor 和 FescarFeignObjectWrapper。其中 FescarContextBeanPostProcessor FescarBeanPostProcessor 實現了Spring BeanPostProcessor 接口。
以下為 FescarContextBeanPostProcessor 實現。
@Overridepublic Object postProcessBeforeInitialization(Object bean, String beanName)throws BeansException {if (bean instanceof FeignContext && !(bean instanceof FescarFeignContext)) {return new FescarFeignContext(getFescarFeignObjectWrapper(),(FeignContext) bean);}return bean;}@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName)throws BeansException {return bean;}BeanPostProcessor 中的兩個方法可以對 Spring 容器中的 Bean 做前后處理,postProcessBeforeInitialization 處理時機是初始化之前,postProcessAfterInitialization 的處理時機是初始化之后,這2個方法的返回值可以是原先生成的實例 bean,或者使用 wrapper 包裝后的實例。
FescarContextBeanPostProcessor 將 FeignContext 包裝成 FescarFeignContext。
FescarBeanPostProcessor 將 FeignClient 根據是否繼承了LoadBalancerFeignClient 包裝成 FescarLoadBalancerFeignClient 和 FescarFeignClient。
FeignAutoConfiguration 中的 FeignContext 并沒有加 ConditionalOnXXX 的條件,所以 Fescar 采用預置處理的方式將 FeignContext 包裝成 FescarFeignContext。
@Beanpublic FeignContext feignContext() {FeignContext context = new FeignContext();context.setConfigurations(this.configurations);return context;}而對于 Feign Client,FeignClientFactoryBean 中會獲取 FeignContext 的實例對象。對于開發者采用 @Configuration 注解的自定義配置的 Feign Client 對象,這里會被配置到 builder,導致 FescarFeignBuilder 中增強后的 FescarFeignCliet 失效。FeignClientFactoryBean 中關鍵代碼如下:
/*** @param <T> the target type of the Feign client* @return a {@link Feign} client created with the specified data and the context information*/<T> T getTarget() {FeignContext context = applicationContext.getBean(FeignContext.class);Feign.Builder builder = feign(context);if (!StringUtils.hasText(this.url)) {if (!this.name.startsWith("http")) {url = "http://" + this.name;}else {url = this.name;}url += cleanPath();return (T) loadBalance(builder, context, new HardCodedTarget<>(this.type,this.name, url));}if (StringUtils.hasText(this.url) && !this.url.startsWith("http")) {this.url = "http://" + this.url;}String url = this.url + cleanPath();Client client = getOptional(context, Client.class);if (client != null) {if (client instanceof LoadBalancerFeignClient) {// not load balancing because we have a url,// but ribbon is on the classpath, so unwrapclient = ((LoadBalancerFeignClient)client).getDelegate();}builder.client(client);}Targeter targeter = get(context, Targeter.class);return (T) targeter.target(this, builder, context, new HardCodedTarget<>(this.type, this.name, url));}上述代碼根據是否指定了注解參數中的 URL 來選擇直接調用 URL 還是走負載均衡,targeter.target 通過動態代理創建對象。大致過程為:將解析出的feign方法放入map,再通過將其作為參數傳入生成InvocationHandler,進而生成動態代理對象。
FescarContextBeanPostProcessor 的存在,即使開發者對 FeignClient 自定義操作,依舊可以完成 Fescar 所需的全局事務的增強。
對于 FescarFeignObjectWrapper,我們重點關注下Wrapper方法:
Object wrap(Object bean) {if (bean instanceof Client && !(bean instanceof FescarFeignClient)) {if (bean instanceof LoadBalancerFeignClient) {LoadBalancerFeignClient client = ((LoadBalancerFeignClient) bean);return new FescarLoadBalancerFeignClient(client.getDelegate(), factory(),clientFactory(), this.beanFactory);}return new FescarFeignClient(this.beanFactory, (Client) bean);}return bean;}wrap 方法中,如果 bean 是 LoadBalancerFeignClient 的實例對象,那么首先通過 client.getDelegate() 方法將 LoadBalancerFeignClient 代理的實際 Client 對象取出后包裝成 FescarFeignClient,再生成 LoadBalancerFeignClient 的子類 FescarLoadBalancerFeignClient 對象。如果 bean 是 Client 的實例對象且不是 FescarFeignClient LoadBalancerFeignClient,那么 bean 會直接包裝生成 FescarFeignClient。
上面的流程設計還是比較巧妙的,首先根據 Spring boot 的 Auto Configuration 控制了配置的先后順序,同時自定義了 Feign Builder的Bean,保證了 Client 均是經過增強后的 FescarFeignClient 。再通過 BeanPostProcessor 對Spring 容器中的 Bean 做了一遍包裝,保證容器內的Bean均是增強后 FescarFeignClient ,避免 FeignClientFactoryBean getTarget 方法的替換動作。
Hystrix 隔離
下面我們再來看下 Hystrix 部分,為什么要單獨把 Hystrix 拆出來看呢,而且 Fescar 代碼也單獨實現了個策略類。目前事務上下文 RootContext 的默認實現是基于 ThreadLocal 方式的 ThreadLocalContextCore,也就是上下文其實是和線程綁定的。Hystrix 本身有兩種隔離狀態的模式,基于信號量或者基于線程池進行隔離。Hystrix 官方建議是采取線程池的方式來充分隔離,也是一般情況下在采用的模式:
Thread or Semaphore The default, and the recommended setting, is to run HystrixCommands using thread isolation (THREAD) and HystrixObservableCommands using semaphore isolation (SEMAPHORE).Commands executed in threads have an extra layer of protection against latencies beyond what network timeouts can offer.Generally the only time you should use semaphore isolation for HystrixCommands is when the call is so high volume (hundreds per second, per instance) that the overhead of separate threads is too high; this typically only applies to non-network calls.service 層的業務代碼和請求發出的線程肯定不是同一個,那么 ThreadLocal 的方式就沒辦法將 XID 傳遞給 Hystrix 的線程并傳遞給被調用方的。怎么處理這件事情呢,Hystrix 提供了個機制讓開發者去自定義并發策略,只需要繼承 HystrixConcurrencyStrategy 重寫 wrapCallable 方法即可。
public class FescarHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {private HystrixConcurrencyStrategy delegate;public FescarHystrixConcurrencyStrategy() {this.delegate = HystrixPlugins.getInstance().getConcurrencyStrategy();HystrixPlugins.reset();HystrixPlugins.getInstance().registerConcurrencyStrategy(this);}@Overridepublic <K> Callable<K> wrapCallable(Callable<K> c) {if (c instanceof FescarContextCallable) {return c;}Callable<K> wrappedCallable;if (this.delegate != null) {wrappedCallable = this.delegate.wrapCallable(c);}else {wrappedCallable = c;}if (wrappedCallable instanceof FescarContextCallable) {return wrappedCallable;}return new FescarContextCallable<>(wrappedCallable);}private static class FescarContextCallable<K> implements Callable<K> {private final Callable<K> actual;private final String xid;FescarContextCallable(Callable<K> actual) {this.actual = actual;this.xid = RootContext.getXID();}@Overridepublic K call() throws Exception {try {RootContext.bind(xid);return actual.call();}finally {RootContext.unbind();}}} }Fescar 也提供一個 FescarHystrixAutoConfiguration,在存在 HystrixCommand 的時候生成FescarHystrixConcurrencyStrategy。
@Configuration @ConditionalOnClass(HystrixCommand.class) public class FescarHystrixAutoConfiguration {@BeanFescarHystrixConcurrencyStrategy fescarHystrixConcurrencyStrategy() {return new FescarHystrixConcurrencyStrategy();}}
原文鏈接
 本文為云棲社區原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的集成源码深度剖析:Fescar x Spring Cloud的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: 如何衡量研发效能?阿里资深技术专家提出了
- 下一篇: 云栖专辑 | 阿里开发者们的第12个感悟
