java thrift连接池_由浅入深了解Thrift之客户端连接池化
一、問題描述
在上一篇《由淺入深了解Thrift之服務(wù)模型和序列化機制》文章中,我們已經(jīng)了解了thrift的基本架構(gòu)和網(wǎng)絡(luò)服務(wù)模型的優(yōu)缺點。如今的互聯(lián)網(wǎng)圈中,RPC服務(wù)化的思想如火如荼。我們又該如何將thrift服務(wù)化應(yīng)用到我們的項目中哪?實現(xiàn)thrift服務(wù)化前,我們先想想這幾個問題:服務(wù)注冊、服務(wù)發(fā)現(xiàn)、服務(wù)健康檢測、服務(wù)“Load Balance”、隱藏client和server端的交互細(xì)節(jié)、服務(wù)調(diào)用端的對象池化。
服務(wù)的注冊、發(fā)現(xiàn)和健康檢測,我們使用zookeeper可以很好的解決
服務(wù)“Load Balance",我們可以使用簡單的算法“權(quán)重+隨機”,當(dāng)然也可以使用成熟復(fù)雜的算法
服務(wù)調(diào)用端的對象池化,我們可以使用common pool,使用簡單又可以滿足我們的需求
二、實現(xiàn)思路
1、thrift server端啟動時,每個實例向zk集群以臨時節(jié)點方式注冊(這樣,遍歷zk上/server下有多少個臨時節(jié)點就知道有哪些server實例)
thrift server端可以單機多端口多實例或多機部署多實例方式運行。
2、服務(wù)調(diào)用方實現(xiàn)一個連接池,連接池初始化時,通過zk將在線的server實例信息同步到本地并緩存,同時監(jiān)聽zk下的節(jié)點變化。
3、服務(wù)調(diào)用方與Server通訊時,從連接池中取一個可用的連接,用它實現(xiàn)RPC調(diào)用。
三、具體實現(xiàn)
1、thrift server端
thrift server端,向zk中注冊server address
packagecom.wy.thriftpool.commzkpool;importjava.lang.instrument.IllegalClassFormatException;importjava.lang.reflect.Constructor;importorg.apache.thrift.protocol.TBinaryProtocol;importorg.apache.thrift.protocol.TBinaryProtocol.Factory;importorg.apache.thrift.server.TServer;importorg.apache.thrift.server.TThreadedSelectorServer;importorg.apache.thrift.transport.TFramedTransport;importorg.apache.thrift.transport.TNonblockingServerSocket;importorg.springframework.beans.factory.InitializingBean;importcom.wy.thrift.service.UserService.Processor;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;importcom.wy.thriftpool.commzkpool.support.ThriftServerIpTransfer;importcom.wy.thriftpool.commzkpool.support.impl.LocalNetworkIpTransfer;/*** thrift server端,向zk中注冊server address
*
*@authorwy
**/
public class ThriftServiceServerFactory implementsInitializingBean {//thrift server 服務(wù)端口
privateInteger port;//default 權(quán)重
private Integer priority = 1;//service實現(xiàn)類
privateObject service;//thrift server 注冊路徑
privateString configPath;privateThriftServerIpTransfer ipTransfer;//thrift server注冊類
privateThriftServerAddressReporter addressReporter;//thrift server開啟服務(wù)
privateServerThread serverThread;
@Overridepublic void afterPropertiesSet() throwsException {if (ipTransfer == null) {
ipTransfer= newLocalNetworkIpTransfer();
}
String ip=ipTransfer.getIp();if (ip == null) {throw new NullPointerException("cant find server ip...");
}
String hostname= ip + ":" + port + ":" +priority;
Class extends Object> serviceClass =service.getClass();
ClassLoader classLoader=Thread.currentThread().getContextClassLoader();
Class>[] interfaces =serviceClass.getInterfaces();if (interfaces.length == 0) {throw new IllegalClassFormatException("service-class should implements Iface");
}//reflect,load "Processor";
Processor> processor = null;for (Class>clazz : interfaces) {
String cname=clazz.getSimpleName();if (!cname.equals("Iface")) {continue;
}
String pname= clazz.getEnclosingClass().getName() + "$Processor";try{
Class> pclass =classLoader.loadClass(pname);if (!pclass.isAssignableFrom(Processor.class)) {continue;
}
Constructor> constructor =pclass.getConstructor(clazz);
processor= (Processor>) constructor.newInstance(service);break;
}catch(Exception e) {//TODO
}
}if (processor == null) {throw new IllegalClassFormatException("service-class should implements Iface");
}//需要單獨的線程,因為serve方法是阻塞的.
serverThread = newServerThread(processor, port);
serverThread.start();//report
if (addressReporter != null) {
addressReporter.report(configPath, hostname);
}
}class ServerThread extendsThread {privateTServer server;
ServerThread(Processor> processor, int port) throwsException {//設(shè)置傳輸通道
TNonblockingServerSocket serverTransport = newTNonblockingServerSocket(port);//設(shè)置二進制協(xié)議
Factory protocolFactory = newTBinaryProtocol.Factory();
TThreadedSelectorServer.Args tArgs= newTThreadedSelectorServer.Args(serverTransport);
tArgs.processor(processor);
tArgs.transportFactory(newTFramedTransport.Factory());
tArgs.protocolFactory(protocolFactory);int num = Runtime.getRuntime().availableProcessors() * 2 + 1;
tArgs.selectorThreads(num);
tArgs.workerThreads(num* 10);//網(wǎng)絡(luò)服務(wù)模型
server = newTThreadedSelectorServer(tArgs);
}
@Overridepublic voidrun() {try{
server.serve();
}catch(Exception e) {//TODO
}
}public voidstopServer() {
server.stop();
}
}public voidclose() {
serverThread.stopServer();
}public voidsetService(Object service) {this.service =service;
}public voidsetPriority(Integer priority) {this.priority =priority;
}public voidsetPort(Integer port) {this.port =port;
}public voidsetIpTransfer(ThriftServerIpTransfer ipTransfer) {this.ipTransfer =ipTransfer;
}public voidsetAddressReporter(ThriftServerAddressReporter addressReporter) {this.addressReporter =addressReporter;
}public voidsetConfigPath(String configPath) {this.configPath =configPath;
}
}
View Code
thrift server address注冊到zk
packagecom.wy.thriftpool.commzkpool.support.impl;importorg.apache.curator.framework.CuratorFramework;importorg.apache.curator.framework.imps.CuratorFrameworkState;importorg.apache.zookeeper.CreateMode;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;/*** thrift server address注冊到zk
*
*@authorwy
**/
public class DynamicAddressReporter implementsThriftServerAddressReporter {privateCuratorFramework zookeeper;publicDynamicAddressReporter() {
}publicDynamicAddressReporter(CuratorFramework zookeeper) {this.zookeeper =zookeeper;
}public voidsetZookeeper(CuratorFramework zookeeper) {this.zookeeper =zookeeper;
}
@Overridepublic void report(String service, String address) throwsException {if (zookeeper.getState() ==CuratorFrameworkState.LATENT) {
zookeeper.start();
zookeeper.newNamespaceAwareEnsurePath(service);
}
zookeeper.create().creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(service+ "/i_", address.getBytes("utf-8"));
}public voidclose() {
zookeeper.close();
}
}
View Code
。。。
spring配置文件
View Code
2、服務(wù)調(diào)用端
連接池實現(xiàn)
杯了個具,為啥就不能提交。代碼在評論中。
連接池工廠,負(fù)責(zé)與Thrift server通信
packagecom.wy.thriftpool.commzkconnpool;importjava.net.InetSocketAddress;importorg.apache.commons.pool.PoolableObjectFactory;importorg.apache.thrift.transport.TSocket;importorg.apache.thrift.transport.TTransport;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importcom.wy.thriftpool.commzkpool.support.ThriftServerAddressProvider;/*** 連接池工廠,負(fù)責(zé)與Thrift server通信
*
*@authorwy
**/
public class ThriftPoolFactory implements PoolableObjectFactory{private final Logger logger =LoggerFactory.getLogger(getClass());//超時設(shè)置
public inttimeOut;private finalThriftServerAddressProvider addressProvider;privatePoolOperationCallBack callback;publicThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback) {super();this.addressProvider =addressProvider;this.callback =callback;
}public ThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback, inttimeOut) {super();this.addressProvider =addressProvider;this.callback =callback;this.timeOut =timeOut;
}/*** 創(chuàng)建對象*/@Overridepublic TTransport makeObject() throwsException {try{
InetSocketAddress address=addressProvider.selector();
TTransport transport= new TSocket(address.getHostName(), address.getPort(), this.timeOut);
transport.open();if (callback != null) {
callback.make(transport);
}returntransport;
}catch(Exception e) {
logger.error("creat transport error:", e);throw newRuntimeException(e);
}
}/*** 銷毀對象*/@Overridepublic void destroyObject(TTransport transport) throwsException {if (transport != null &&transport.isOpen()) {
transport.close();
}
}/*** 檢驗對象是否可以由pool安全返回*/@Overridepublic booleanvalidateObject(TTransport transport) {try{if (transport != null && transport instanceofTSocket) {
TSocket thriftSocket=(TSocket) transport;if(thriftSocket.isOpen()) {return true;
}else{return false;
}
}else{return false;
}
}catch(Exception e) {return false;
}
}
@Overridepublic void activateObject(TTransport obj) throwsException {//TODO Auto-generated method stub
}
@Overridepublic void passivateObject(TTransport obj) throwsException {//TODO Auto-generated method stub
}public static interfacePoolOperationCallBack {//創(chuàng)建成功是執(zhí)行
voidmake(TTransport transport);//銷毀之前執(zhí)行
voiddestroy(TTransport transport);
}
}
View Code
連接池管理
packagecom.wy.thriftpool.commzkconnpool;importorg.apache.thrift.transport.TSocket;importorg.slf4j.Logger;importorg.slf4j.LoggerFactory;importorg.springframework.beans.factory.annotation.Autowired;importorg.springframework.stereotype.Service;/*** 連接池管理
*
*@authorwy
**/@Servicepublic classConnectionManager {private final Logger logger =LoggerFactory.getLogger(getClass());//保存local對象
ThreadLocal socketThreadSafe = new ThreadLocal();//連接提供池
@AutowiredprivateConnectionProvider connectionProvider;publicTSocket getSocket() {
TSocket socket= null;try{
socket=connectionProvider.getConnection();
socketThreadSafe.set(socket);returnsocketThreadSafe.get();
}catch(Exception e) {
logger.error("error ConnectionManager.invoke()", e);
}finally{
connectionProvider.returnCon(socket);
socketThreadSafe.remove();
}returnsocket;
}
}
View Code
spring配置文件
View Code
參考:http://www.cnblogs.com/mumuxinfei/p/3876187.html
由于本人經(jīng)驗有限,文章中難免會有錯誤,請瀏覽文章的您指正或有不同的觀點共同探討!
總結(jié)
以上是生活随笔為你收集整理的java thrift连接池_由浅入深了解Thrift之客户端连接池化的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何在C++中将filetime时间转化
- 下一篇: android:configchange