扩展CXF, 支持LoadBalance负载均衡
原文:?http://scud.blogjava.net
CXF是一個比較流行的Web Service框架. ( 當然如果追求更高效, 還可以去搜索ice, thrift, protobuff之類的)
近一個月, 斷斷續續地又好好看了看CXF的一些代碼, CXF的文檔還是很欠缺,特別是關于內部實現的東西. 從我的感覺來說, 內部實現還是挺復雜的. Inteceptor, Feature, ConduitSelector 這些概念一大堆, 又差不多可以做類似的事情, 真是讓人頭暈.
CXF本身提供了一個FailoverFeature, 可以在調用服務出錯時切換到其他服務器, 但是無法做到負載均衡, 我研究了幾天, 在FailoverFeature的基礎上改出來一個LoadBalanceFeature, 當然也同時支持Failover.
首先我們來看看如何使用CXF的FailoverFeature: (下載示例中包括使用xml和代碼兩種方式, 當然CXF自己還提供了使用wsdl內部定義的方式)
?? ?我們需要先準備一個HelloService, 非常簡單的一個Web Service, 這里不在貼出, 具體可以看下載包
?? ?調用代碼示例:
import?org.apache.cxf.clustering.FailoverFeature;
import?org.apache.cxf.clustering.RandomStrategy;
import?org.apache.cxf.feature.AbstractFeature;
import?org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import?org.javascud.extensions.cxf.service.Hello;
import?java.util.ArrayList;
import?java.util.List;
public?class?HelloServiceFailOverClient
{
????public?static?void?main(String[]?args)
????{
????????String?helloFirst?=?"http://localhost:8080/service/Hello";
????????String?helloSecond?=?"http://localhost:8081/service/Hello";
????????String?helloThird?=?"http://localhost:8082/service/Hello";
????????String?helloFour?=?"http://localhost:8083/service/Hello";
????????List<String>?serviceList?=?new?ArrayList<String>();
????????serviceList.add(helloFirst);
????????serviceList.add(helloSecond);
????????serviceList.add(helloThird);
????????//serviceList.add(helloFour);
????????RandomStrategy?strategy?=?new?RandomStrategy();
????????strategy.setAlternateAddresses(serviceList);
????????FailoverFeature?ff?=?new?FailoverFeature();
????????ff.setStrategy(strategy);
????????JaxWsProxyFactoryBean?factory?=?new?JaxWsProxyFactoryBean();
????????List<AbstractFeature>?features?=?new?ArrayList<AbstractFeature>();
????????features.add(ff);
????????factory.setFeatures(features);
????????factory.initFeatures();
????????factory.setServiceClass(Hello.class);
????????//factory.setAddress("http://localhost:8080/service/Hello");
????????Hello?client?=?(Hello)?factory.create();
????????String?result?=?client.sayHello("felix");
????????System.out.println("result?is:?"?+?result);
????}
}
在遇到錯誤時可以自動使用下一個服務器, 但是必須要自己設置一個地址, 如果不設置的話也可以, 但是會出錯然后failover.
下面我們自己來看看我們的 LoadBalanceFeature
1. 首先我們創建一個LoadBalanceFeature (完全和FailoverFeature一樣)
?? Feature是用來定制Server, Client, Bus的一個組件, 具體可以查看AbstractFeature, 我們使用initialize方法來定制Client, 修改Client的Conduit選擇器達到負載均衡的目的.
?? LoadBalanceFeature代碼如下:
?*?This?feature?may?be?applied?to?a?Client?so?as?to?enable
?*?load?balance?,?use?any?compatible?endpoint?for?the?target?service.
?*
?*?@author?Felix?Zhang???Date:2010-10-3?22:58
?*?@see?org.apache.cxf.clustering.FailoverFeature
?*/
public?class?LoadBalanceFeature?extends?AbstractFeature?{
????private?LoadBalanceStrategy?loadBalanceStrategy;
????@Override
????public?void?initialize(Client?client,?Bus?bus)?{
????????LoadBalanceTargetSelector?selector?=?new?LoadBalanceTargetSelector();
????????selector.setEndpoint(client.getEndpoint());
????????selector.setStrategy(getStrategy());
????????client.setConduitSelector(selector);
????}
????public?void?setStrategy(LoadBalanceStrategy?strategy)?{
????????loadBalanceStrategy?=?strategy;
????}
????public?LoadBalanceStrategy?getStrategy()?{
????????return?loadBalanceStrategy;
????}
}
2. 定制一個LoadBalanceStrategy 負載均衡策略
負載均衡策略有很多種, 例如隨機選擇, 順序選擇等, FailoverFeature提供了三種策略, 總之很簡單, 我們在這里就先實現隨機策略, 其他的策略都很簡單, 幾行代碼就可以實現了.
?? ?這個類主要用來設置/獲取所有的提供服務的地址列表, 為了方便控制, 我新增了2個選項:
?? ?A: alwaysChangeEndpoint 是否每次請求都切換地址: 如果只有一個客戶端, 可以分擔負載. 缺省為true
?? ?B: removeFailedEndpoint 是否從全局的地址列表中移除失敗服務地址 -- 如果你沒有監測服務器狀態的程序
?? 關于動態增刪服務地址
- 可以使用zookeeper等服務實時監測服務器狀態, 或者自己寫程序實現, 調用strategy.setAlternateAddresses即可.
- removeFailedEndpoint 如果設置為true, 但沒有監測服務器狀態的程序, 新增的或者復活的服務器則無法被恢復到地址列表中.
- 考慮到效率和支持failover, 設置地址列表, 移除地址等沒有同步鎖.
- 自動移除失敗服務地址時, 目前僅支持手動地址列表, 沒有考慮wsdl中的多服務地址.
- 后續我會寫一個使用zookeeper增刪服務地址列表的示例. (最近也在看zookeeper)
?? 主要的代碼都在AbstractLoadBalanceStrategy 中, 基本和 AbstractStaticFailoverStrategy 一樣, 添加了一個removeAlternateAddress 用于移除失敗的服務地址.
?? ?LoadBalanceStrategy 接口的代碼如下:
?*?Supports?pluggable?strategies?for?alternate?endpoint?selection?on
?*?load?balance.
?*?<p/>
?*?Random,?Retries,?Mod?(later)
?*?<p/>
?*?1.?support?load?balance??2.support?fail?over.
?*
?*?@author?Felix?Zhang???Date:2010-10-1?18:14
?*?@see?org.apache.cxf.clustering.FailoverStrategy
?*/
public?interface?LoadBalanceStrategy?{
????/**
?????*?Get?the?alternate?endpoints?for?this?invocation.
?????*
?????*?@param?exchange?the?current?Exchange
?????*?@return?a?failover?endpoint?if?one?is?available
?????*/
????List<Endpoint>?getAlternateEndpoints(Exchange?exchange);
????/**
?????*?Select?one?of?the?alternate?endpoints?for?a?retried?invocation.
?????*
?????*?@param?alternates?List?of?alternate?endpoints?if?available
?????*?@return?the?selected?endpoint
?????*/
????Endpoint?selectAlternateEndpoint(List<Endpoint>?alternates);
????/**
?????*?Get?the?alternate?addresses?for?this?invocation.
?????*?These?addresses?over-ride?any?addresses?specified?in?the?WSDL.
?????*
?????*?@param?exchange?the?current?Exchange
?????*?@return?a?failover?endpoint?if?one?is?available
?????*/
????List<String>?getAlternateAddresses(Exchange?exchange);
????/**
?????*?Select?one?of?the?alternate?addresses?for?a?retried?invocation.
?????*
?????*?@param?addresses?List?of?alternate?addresses?if?available
?????*?@return?the?selected?address
?????*/
????String?selectAlternateAddress(List<String>?addresses);
????/**
?????*?should?remove?failed?endpoint?or?not.
?????*?only?work?for?user?defined?addresses?list.
?????*?@return?true?or?false
?????*/
????boolean?isRemoveFailedEndpoint();
????/**
?????*?change?endpoint?every?time?or?not.
?????*?@return?boolean
?????*/
????boolean?isAlwaysChangeEndpoint();
????/**
?????*?remove?failed?address?from?list.
?????*?@param?address?the?failed?address
?????*/
????void?removeAlternateAddress(String?address);
}
?? ?RandomLoadBalanceStrategy繼承自 AbstractLoadBalanceStrategy, 和 RandomStrategy的區別就是獲取下一個服務地址時并不從列表中移除此地址, 否則就做不到負載均衡了.
3. 最重要的 LoadBalanceTargetSelector
?? ?A: 這個類比較復雜, 我們為了實現負載均衡, 修改了prepare來動態設置調用的endpoint, 替換策略取決于LoadBalanceStrategy
?? ?主要代碼如下:
????????????//check?current?endpoint?is?not?null
????????????Endpoint?theEndpoint?=?exchange.get(Endpoint.class);
????????????if?(theEndpoint.getEndpointInfo().getAddress()?!=?null)?{
????????????????existsEndpoint?=?true;
????????????}
????????????Endpoint?nextEndpoint;
????????????if?(getStrategy().isAlwaysChangeEndpoint()?||?!existsEndpoint)?{
????????????????//get?a?endpoint?and?set?to?current?endpoint
????????????????Endpoint?loadBalanceTarget?=?getLoadBalanceTarget(exchange);
????????????????if?(loadBalanceTarget?!=?null)?{
????????????????????logger.info("switch?to?next?target:?"?+?loadBalanceTarget.getEndpointInfo().getAddress());
????????????????????setEndpoint(loadBalanceTarget);
????????????????????//update?exchange.org.apache.cxf.message.Message.ENDPOINT_ADDRESS?---?不設置這個就用上次的奇怪
????????????????????message.put(Message.ENDPOINT_ADDRESS,?loadBalanceTarget.getEndpointInfo().getAddress());
????????????????}
????????????????nextEndpoint?=?loadBalanceTarget;
????????????}?else?{
????????????????//use?current?endpoint
????????????????nextEndpoint?=?theEndpoint;
????????????}
???
?? ?B:為了和原有Failover特性兼容, 我們修改了 getFailoverTarget函數, 在此函數中要移除失敗的服務地址, 因為在之前我們修改了LoadBalanceStrategy, 它在獲取地址時不再移除當前地址, 所以我們需要手動移除.
?? ?部分代碼如下:???
????????????//failover?should?remove?current?endpoint?first,?then?get?next?--?根據定義的策略來決定是否從全局地址列表中移除
????????????if?(getStrategy().isRemoveFailedEndpoint())?{
????????????????logger.warn("remove?current?failed?address:?"?+?currentAddress);
????????????????//remove?for?client,?not?for?current?invocation?--?沒有同步鎖
????????????????getStrategy().removeAlternateAddress(currentAddress);
????????????}
????????????//remove?for?current?invocation:?當前請求中總是移除失敗服務地址
????????????alternateAddresses.remove(currentAddress);
????????????String?alternateAddress?=
????????????????????getStrategy().selectAlternateAddress(alternateAddresses);
4. 調用實例:
?? 此處我們采用XML定義方式:
<beans?xmlns="http://www.springframework.org/schema/beans"
???????xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
???????xmlns:jaxws="http://cxf.apache.org/jaxws"
???????xmlns:clustering="http://cxf.apache.org/clustering"
???????xmlns:util="http://www.springframework.org/schema/util"
???????xsi:schemaLocation="
http://cxf.apache.org/jaxws?http://cxf.apache.org/schemas/jaxws.xsd
http://www.springframework.org/schema/beans?http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util?http://www.springframework.org/schema/util/spring-util.xsd">
????<util:list?id="addressList">
????????<value>http://localhost:8081/service/Hello</value>
????????<value>http://localhost:8082/service/Hello</value>
????????<value>http://localhost:8083/service/Hello</value>
????????<value>http://localhost:8086/service/Hello</value>
????????<value>http://localhost:8087/service/Hello</value>
????????<value>http://localhost:8088/service/Hello</value>
????</util:list>
????<bean?id="SequentialAddresses"?class="org.apache.cxf.clustering.SequentialStrategy">
????????<property?name="alternateAddresses">
????????????<ref?bean="addressList"/>
????????</property>
????</bean>
????<bean?id="randomAddresses"?class="org.javascud.extensions.cxf.RandomLoadBalanceStrategy">
????????<property?name="alternateAddresses">
????????????<ref?bean="addressList"/>
????????</property>
????????<property?name="removeFailedEndpoint"?value="true"?/>
????</bean>
????<bean?id="loadBalanceFeature"?class="org.javascud.extensions.cxf.LoadBalanceFeature">
????????<property?name="strategy"?ref="randomAddresses"?/>
????</bean>
????<jaxws:client?name="helloClient"
??????????????????serviceClass="org.javascud.extensions.cxf.service.Hello"????????????>
????????<jaxws:features>
????????????<ref?bean="loadBalanceFeature"?/>
????????</jaxws:features>
????</jaxws:client>
</beans>
?
8081, 8082, 8083是實際存在的服務, 其他的不存在.
調用的Java代碼:
import?org.apache.cxf.endpoint.Client;
import?org.apache.cxf.frontend.ClientProxy;
import?org.javascud.extensions.cxf.LoadBalanceStrategy;
import?org.javascud.extensions.cxf.service.Hello;
import?org.springframework.context.support.ClassPathXmlApplicationContext;
public?class?HelloLoadBalanceAndFailOverClientByXML
{
????public?static?void?main(String[]?args)
????{
????????ClassPathXmlApplicationContext?context
????????????????=?new?ClassPathXmlApplicationContext(new?String[]
????????????????{"org/javascud/extensions/cxf/loadbalance/loadbalance_fail.xml"});
????????Hello?client?=?(Hello)?context.getBean("helloClient");
????????LoadBalanceStrategy?strategy?=?(LoadBalanceStrategy)?context.getBean("randomAddresses");
????????Client?myclient?=?ClientProxy.getClient(client);
????????String?address?=?myclient.getEndpoint().getEndpointInfo().getAddress();
????????System.out.println(address);
????????for(int?i=1;?i<=20;?i++)
????????{
????????????String?result1?=?client.sayHello("Felix"?+i);
????????????System.out.println("Call?"?+?i?+":?"?+?result1);
????????????int?left?=?strategy.getAlternateAddresses(null).size();
????????????System.out.println("==================?left?"?+?left?+?"?===========================");
????????}
????}
}
??? 此處僅僅為模擬測試.
5. 關于測試用例
?? ?沒想好如何寫單元測試, test里面目前都是隨意測試的代碼, 基本照顧到所有功能.
?? ?
6. 下載
代碼下載: http://cnscud.googlecode.com/files/extensions-cxf_20101015.zip
源碼位置: http://cnscud.googlecode.com/svn/trunk/extensions/? 其中cxf目錄是此文章相關的源碼.
7. 有任何問題請留言.
轉載于:https://www.cnblogs.com/yangjin-55/archive/2012/07/16/2786528.html
總結
以上是生活随笔為你收集整理的扩展CXF, 支持LoadBalance负载均衡的全部內容,希望文章能夠幫你解決所遇到的問題。