Netty的断线重连
因?yàn)楣ぷ髦薪?jīng)常使用到TCP,所以會(huì)頻繁使用到諸如Mina或Netty之類(lèi)的通信框架,為了方便項(xiàng)目的邏輯調(diào)用,經(jīng)常會(huì)在框架的基礎(chǔ)上再一次進(jìn)行封裝,這樣做其實(shí)有畫(huà)蛇添足的嫌疑,但也是無(wú)奈之舉。
這里主要記載使用Mina和Netty,構(gòu)建適合項(xiàng)目的一個(gè)完整的重連邏輯。
當(dāng)然,都是作為客戶(hù)端,畢竟一般只有客戶(hù)端才會(huì)做重連。
在這之前,需要考慮幾個(gè)問(wèn)題:
- 連接行為的結(jié)果可以較為方便地獲得,成功或失敗,最好直接有接口回調(diào),可以在回調(diào)中進(jìn)行后續(xù)邏輯處理
- 當(dāng)前通信連接的活躍狀態(tài)需要準(zhǔn)確實(shí)時(shí)而方便地獲得,這樣有利于重連時(shí)對(duì)連接的判斷
- 能夠較為靈活的配置Listener或Handler或Filter
- 支持計(jì)數(shù),無(wú)論是首次連接失敗多次后不再?lài)L試連接,還是中途斷開(kāi)后斷線(xiàn)重連多次后不再?lài)L試連接,一般不作無(wú)休止地重連
從代碼層面看,框架中最好有一個(gè)類(lèi)似Connector的類(lèi),能夠暴露合適的接口或方法,提供各種狀態(tài)與回調(diào),使通信連接的動(dòng)向能夠?qū)崟r(shí)把握,然而事情并不是那么美好。
連接結(jié)果
由于框架設(shè)計(jì)的一些原則,一個(gè)connector根本不足以暴露這些接口。
對(duì)于Mina而言,作為客戶(hù)端一般用于連接的連接器是NioSocketConnector;
對(duì)于Netty而言,則是Bootstrap;
下表是一些常見(jiàn)的定義在兩個(gè)框架中的對(duì)比,不一定準(zhǔn)確,但意義相近;
| 連接器 | SocketConnector | Bootstrap |
| 會(huì)話(huà) | IoSession | Channel |
| 連接結(jié)果 | ConnectFuture | ChannelFuture |
| 邏輯處理 | IoHandler | ChannelHandler |
| 過(guò)濾器 | IoFilter | ChannelHandler |
對(duì)于Mina而言,連接操作是這樣的:
ConnectFuture future = mConnector.connect();future.awaitUninterruptibly();if (future.isConnected()) {//得到會(huì)話(huà)mSession = future.getSession();}對(duì)于Netty來(lái)說(shuō),連接可以寫(xiě)成與Mina幾乎相同的形式:
ChannelFuture future = bootstrap.connect();future.awaitUninterruptibly();if(future.isSuccess()){mChannel = future.channel();}也可以不阻塞等待,兩種future都可以自行添加Listener監(jiān)聽(tīng)異步任務(wù)是否完成:
//Minafuture.addListener(new IoFutureListener<IoFuture>() {@Overridepublic void operationComplete(IoFuture ioFuture) {}}); //Nettyfuture.addListener(new GenericFutureListener<ChannelFuture>() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {}});畢竟是出自一人之手,部分API真是驚人的相似。
到這里,第一個(gè)連接返回結(jié)果問(wèn)題算是有所結(jié)論,兩種框架都可以正常返回連接的結(jié)果。
會(huì)話(huà)狀態(tài)
而上述代碼中,返回的mSession與mChannel就是得到的會(huì)話(huà),這兩種類(lèi)各自提供了一些接口,可以用于獲得通信連接的實(shí)時(shí)狀態(tài)。
Mina的IoSession這里只取部分方法:
再對(duì)比看下Netty提供的Channel,這里也只取部分方法展示:
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {EventLoop eventLoop();Channel parent();ChannelConfig config();boolean isOpen();boolean isRegistered();boolean isActive();SocketAddress localAddress();SocketAddress remoteAddress();boolean isWritable();Channel.Unsafe unsafe();ChannelPipeline pipeline();public interface Unsafe {SocketAddress localAddress();SocketAddress remoteAddress();void register(EventLoop var1, ChannelPromise var2);void bind(SocketAddress var1, ChannelPromise var2);void connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);void disconnect(ChannelPromise var1);void close(ChannelPromise var1);void write(Object var1, ChannelPromise var2);void flush();} }可以看出,無(wú)論是IoSession還是Channel,都有相關(guān)的API可以知曉通信是否活躍,所以第二個(gè)問(wèn)題在可以獲得IoSession或Channel的情況下,是沒(méi)有問(wèn)題的。
配置Handler
那么再看配置Listener或Handler的相差操作是否靈活。
二者在這方面的差別較為明顯。
對(duì)于Mina而言,添加Handler可以直接利用Connector,真正的邏輯Handler只能由setHandler方法添加,且只能為一個(gè),而相關(guān)的Filter則要通過(guò)getFilterChain()拿到的過(guò)濾器集合去添加;對(duì)于Mina來(lái)說(shuō),Handler和Filter是沒(méi)有交集的,他們分屬不同的接口IoHandler和IoFilter:
mConnector.setHandler(handler); mConnector.getFilterChain().addLast(CODEC, protocolCodecFilter);Netty有所不同,netty中所有的handler、filter都是ChannelHandller,這些handler都要在連接行為發(fā)生后才能生效,也就是掛載到Channel上的,而不是Bootstrap,一般添加是這樣的:
bootstrap.handler(handler); channel.pipeline().addLast(someHandler); channel.pipeline().addLast(someFilter);但handler依舊只能添加一個(gè),如果要添加多個(gè)handler或filter,就必須獲取到channel,然后進(jìn)行添加,netty本身提供了一個(gè)ChannelInitializer可以用于添加多個(gè)channelHandler,一般會(huì)這么寫(xiě):
bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(handler);channel.pipeline().addLast(someHandler);channel.pipeline().addLast(someFilter);}});對(duì)于Netty來(lái)說(shuō),Handler和Filter是同一個(gè)東西,都是ChannelHandler。
兩者在這方面的區(qū)別比較明顯:
一是netty將handler和filter都統(tǒng)一為handler了,
二是netty不能像mina一樣,在未連接之前就可以配置所有的Handler或Filter,netty必須獲得channel也就是連接成功后才能配置多個(gè)Filter。
這就造成了一個(gè)問(wèn)題,Mina可以提前就配置監(jiān)聽(tīng)器監(jiān)聽(tīng)連接的狀態(tài),可以正常監(jiān)聽(tīng)中途斷開(kāi),也就是在創(chuàng)建Connector后就可以?huà)燧d上監(jiān)聽(tīng):
mConnector.getFilterChain().addFirst("reconnect", new IoFilterAdapter() {@Overridepublic void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { //監(jiān)聽(tīng)到斷開(kāi),可接入回調(diào)接口,做進(jìn)一步的重連邏輯mConnector.connect();}});而Netty不能,創(chuàng)建Connector也就是Bootstrap并不能實(shí)現(xiàn)類(lèi)似的掛載,Bootstrap只能掛載一個(gè)Handler,而相關(guān)的過(guò)濾器或監(jiān)聽(tīng)只能在Channel出現(xiàn)后再進(jìn)行掛載,那么就會(huì)寫(xiě)成這樣:
bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {//添加其他Filter或Handler}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);//監(jiān)聽(tīng)到斷開(kāi),重連bootstrap.connect();}});這里initChannel方法永遠(yuǎn)是最先被調(diào)用的,因?yàn)樵谠创a中是這樣的:
//ChannelInitializer.javaprotected abstract void initChannel(C var1) throws Exception;public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {if (this.initChannel(ctx)) {ctx.pipeline().fireChannelRegistered();this.removeState(ctx);} else {ctx.fireChannelRegistered();}}在這種邏輯下,Mina可以在sessionClosed回調(diào)中使用SocketConnetor進(jìn)行重連,Netty可以在channelInactive回調(diào)中使用Bootstrap進(jìn)行重連。
看起來(lái)沒(méi)什么毛病。
但需要注意一點(diǎn),就是Handler的復(fù)用問(wèn)題,也就是對(duì)Handler或Filter的檢查,Mina和Netty都有對(duì)Handler的重復(fù)添加進(jìn)行過(guò)檢查,不過(guò)檢查邏輯有細(xì)微的差別。
Mina中是這樣檢查的:
可以看到,Mina只會(huì)檢查Filter在Map中對(duì)應(yīng)的key是否被使用過(guò),當(dāng)然理論上Filter掛載在SocketConnector的FilterChain中,只要配置過(guò)一次,就無(wú)需再進(jìn)行配置。
那么Netty呢?
Netty的Handler不是能隨意復(fù)用的,要復(fù)用必須標(biāo)明注解@Sharable,否則就會(huì)出現(xiàn)異常:
這是因?yàn)樵谠创a進(jìn)行檢查時(shí),是對(duì)Handler本身進(jìn)行檢查的,handler會(huì)有一個(gè)added的屬性,一旦被添加使用過(guò),就會(huì)置為true,而判斷邏輯會(huì)阻止為added=true的handler添加進(jìn)來(lái) 。這樣一來(lái),如果強(qiáng)行添加已經(jīng)添加過(guò)的handler就會(huì)拋出異常:
//DefaultChannelPipeline.javapublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {AbstractChannelHandlerContext newCtx;synchronized(this) {checkMultiplicity(handler);newCtx = this.newContext(group, this.filterName(name, handler), handler);//省略部分代碼}this.callHandlerAdded0(newCtx);return this;}private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter)handler;if (!h.isSharable() && h.added) {throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}}這也就說(shuō)明使用channel.pipeline().addLast(handler)這種方法添加handler時(shí),如果想不同的Channel添加同一個(gè)Handler實(shí)例,每種handler都必須注解了@Sharable,如果正好要使用IdleStateHandler這種源碼內(nèi)部的Handler,而IdleStateHandler是沒(méi)有注解過(guò)@Sharable,那么就會(huì)出現(xiàn)上面的異常。
而實(shí)際應(yīng)用中,為了實(shí)現(xiàn)心跳,IdleStateHandler是一般都會(huì)使用到的。
那么問(wèn)題來(lái)了,Mina每次重新連接,創(chuàng)建新的session,但只要SocketConnector沒(méi)有變,所有Handler和Filter自然就沒(méi)有變,仍然可用,因?yàn)樗蠬andler和Filter是掛載到SocketConnector的FilterChain中,算是只和Connector相關(guān)的;
而Netty,如果重新連接的話(huà),會(huì)創(chuàng)建新的Channel,然后會(huì)重新調(diào)用initChannel,然后利用channel.pipeline().addLast添加Handler,算是掛載到Channel上的,而不是Bootstrap上。
這樣顯示出兩者最大的區(qū)別就是,Mina中配置一次即可,而Netty則需要每次產(chǎn)生新的Channel時(shí)對(duì)其進(jìn)行重新配置。
所以Netty中的handler想復(fù)用的話(huà),就必須加注解,否則就會(huì)報(bào)異常。如果一定要用到無(wú)法注解@Sharable的Handler,比如上面的IdleStateHandler,那就要想辦法每次initChannel時(shí),也新建一個(gè)新的IdleStateHandler…
或者,繼承IdleStateHandler,然后加上注解也行,雖然也很丑就是了。
So Bad…
這樣的情況下,可以想辦法,每次都新建,類(lèi)似這種:
FunctionsChannelHandler functionsChannelHandler = new FunctionsChannelHandler(bootstrap){@Overridepublic ChannelHandler[] handlers() {return new ChannelHandler[]{new NormalClientEncoder(),new IdleStateHandler(20, 10, 20),this,new NormalClientHandler()};}};bootstrap.handler(new ChannelInitializer<Channel>() {@Overrideprotected void initChannel(Channel channel) throws Exception {//添加各種handlerchannel.pipeline().addLast(functionsChannelHandler.handlers());}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {super.channelInactive(ctx);//監(jiān)聽(tīng)到斷開(kāi)}});因?yàn)閚etty把所有監(jiān)聽(tīng)器過(guò)濾器邏輯處理都?xì)w為ChannelHandler的原因,把一個(gè)handler擴(kuò)展成一個(gè)功能較為豐富的handler是一種不錯(cuò)的方法 。或者沿用這種思路,使其每次新加Handler時(shí),都是new過(guò)的Handler。
應(yīng)對(duì)框架自帶的一些未注解@Sharable的類(lèi),也可以繼承之,自行加注解:
這樣一來(lái),配置Handler也勉強(qiáng)可算靈活了。
連接計(jì)數(shù)
對(duì)連接計(jì)數(shù)一般都是開(kāi)發(fā)者編寫(xiě)的邏輯,主要是應(yīng)對(duì)無(wú)休止地連接。
主要應(yīng)用在兩種場(chǎng)景:
一是首次連接,如果多次連接不成功,那么停止連接,或者另有邏輯;
二是斷線(xiàn)重連,如果多次重連不成功,那么停止連接并銷(xiāo)毀,或者另有邏輯。
因?yàn)镸ina和Netty都是多線(xiàn)程模型的緣故,計(jì)數(shù)為了求穩(wěn)可以直接使用Atom類(lèi),當(dāng)然覺(jué)得大材小用也可以直接使用普通int值,畢竟理論上兩次連接中間應(yīng)該會(huì)有一定延時(shí)才對(duì)。
應(yīng)用示例
所以最后,都可以對(duì)各自的連接器進(jìn)行二次封裝,然后編寫(xiě)對(duì)自己有利的邏輯。
對(duì)于Mina,大概可以寫(xiě)成這樣:
使用起來(lái)是這樣的:
HigherGateWayHandler higherGateWayHandler = new HigherGateWayHandler();TCPConnector higherGateWayClient = new TCPConnector.Builder().setExecutor(ThreadPool.singleThread("higher_gateway_client")).setHost(NC.GATEWAT_HIGHER_HOST).setPort(NC.LOWER_PORT).setConnectTimeoutMillis(10 * 1000).setReadBuffer(10 * 1024).setHandlerAdapter(higherGateWayHandler).setProtocolCodecFilter(new HigherGateWayCodecFactory()).setKeepAliveFilter(new KeepAliveHigherGateWay(), higherGateWayHandler, 10, 20).setConnectListener(new TCPConnector.IConnectorListener() {@Overridepublic void connectSuccess(IoSession session) {//連接成功后}@Overridepublic void connectFailed() {//重連失敗if (higherGateWayClient.getRecconnectCounter() == 3) {//重連失敗后}//非重連失敗,優(yōu)先級(jí)連接情況下if (higherGateWayClient.getRecconnectCounter() == 0 && higherGateWayClient.getConnectCounter() > 2) {higherGateWayClient.resetConnectCounter();} else {higherGateWayClient.connectInThread();}}@Overridepublic void sessionClosed(IoSession session) {executors.execute(new Runnable() {@Overridepublic void run() {//重連邏輯higherGateWayClient.reconnect(10 * 1000, 3);}});}}).build();而Netty,封裝起來(lái)會(huì)有一點(diǎn)花里胡哨,目前遇到的問(wèn)題是當(dāng)重連以后復(fù)用IdleStateHandler這種Handler時(shí),就會(huì)使得其中的計(jì)時(shí)機(jī)制失效,也就是說(shuō),心跳沒(méi)用了,暫時(shí)不明原因,大概率是其中的線(xiàn)程被銷(xiāo)毀無(wú)法再起的原因。那么當(dāng)前就只能想辦法每次調(diào)用initChannel時(shí),創(chuàng)建新的Handler才行:
public class NettyConnector {/*** 連接器*/private Bootstrap bootstrap;/*** 地址*/private String host;private int port;/*** 會(huì)話(huà)*/private Channel channel;private static final long TIME_OUT = 10;private long connectTimeoutMills;/*** 重連次數(shù)*/private AtomicInteger recconnectCounter;/*** 首次連接次數(shù)*/private AtomicInteger connectCounter;/*** 以接口引出通信狀態(tài)*/public interface IChannelStateListener {void onConnectSuccess(Channel channel);void onConnectFailed();void onDisconnect();}private IChannelStateListener channelStateListener;private NettyConnector(final Builder builder) {recconnectCounter = new AtomicInteger(0);connectCounter = new AtomicInteger(0);connectTimeoutMills = builder.timeoutMills;bootstrap = builder.bootstrap;bootstrap.handler(new ChannelInitializer() {@Overrideprotected void initChannel(Channel channel) throws Exception {channel.pipeline().addLast(new ChannelDisconnectHandler());channel.pipeline().addLast(builder.handlerSet.handlers());}});}public void setRemoteAddress(String host, int port) {L.d("設(shè)置地址與端口-" + host + ":" + port);this.host = host;this.port = port;}public void setChannelStateListener(IChannelStateListener listener) {channelStateListener = listener;}public void connect() {if (channel == null || !channel.isActive()) {bootstrap.remoteAddress(this.host, this.port);L.d("第" + (connectCounter.get() + 1) + "次連接" + host + ":" + port + "中......");final long startMills = System.currentTimeMillis();ChannelFuture channelFuture = bootstrap.connect();channelFuture.addListener(new GenericFutureListener<ChannelFuture>() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {L.d("連接(" + bootstrap.config().remoteAddress() + ")成功");channel = f.channel();if (channelStateListener != null) {connectCounter.set(0);channelStateListener.onConnectSuccess(channel);}} else {long delay = System.currentTimeMillis() - startMills;if (delay > 0) {TimeUnit.MILLISECONDS.sleep(connectTimeoutMills - delay);}L.d("連接(" + bootstrap.config().remoteAddress() + ")失敗");if (channelStateListener != null) {connectCounter.incrementAndGet();channelStateListener.onConnectFailed();}}}});}}private void reconnect() {if (bootstrap == null)throw new IllegalArgumentException("bootstrap cannot be null");//如果已經(jīng)連接,則直接【連接成功】if (channel == null || !channel.isActive()) {//連接channel = bootstrap.connect().awaitUninterruptibly().channel();}}/*** 重連* @param reconnectTimeoutMills 重連超時(shí)時(shí)間* @param reconnectTimes 重連次數(shù)*/public void reconnect(final long reconnectTimeoutMills, final int reconnectTimes) {try {recconnectCounter.set(0);while (channel != null && !channel.isActive() && recconnectCounter.getAndIncrement() < reconnectTimes) {L.d(Thread.currentThread().getName() + "," + "重連" + bootstrap.config().remoteAddress() + "(" + recconnectCounter.get() + ")次...");reconnect();if (channel.isActive()) {break;} else {TimeUnit.MILLISECONDS.sleep(reconnectTimeoutMills);}L.d(channel.isActive() + "");}} catch (InterruptedException e) {e.printStackTrace();} finally {if (channel != null && channel.isActive()) {if (channelStateListener != null) {channelStateListener.onConnectSuccess(channel);}} else {if (channelStateListener != null) {channelStateListener.onConnectFailed();}}}}public Channel getChannel() {return channel;}public boolean isConnected() {return channel != null && channel.isActive();}public String getAddress() {return host + ":" + port;}public int getConnectFailedTimes() {return connectCounter.get();}public int getReconnectFailedTimes() {return recconnectCounter.get();}public static class Builder {private Bootstrap bootstrap = new Bootstrap();private HandlerSet handlerSet;private long timeoutMills = 10 * 1000;public Builder group(EventLoopGroup loopGroup) {bootstrap.group(loopGroup);return this;}@Deprecatedpublic Builder remoteAddress(String inetHost, int inetPort) {bootstrap.remoteAddress(inetHost, inetPort);return this;}public Builder setConnectTimeoutMills(long timeout) {timeoutMills = timeout;return this;}public Builder handler(HandlerSet handlers) {handlerSet = handlers;return this;}public NettyConnector build() {bootstrap.channel(NioSocketChannel.class);return new NettyConnector(this);}}/*** 主要用于監(jiān)聽(tīng)斷開(kāi)*/class ChannelDisconnectHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {ctx.fireChannelInactive();if (channelStateListener != null) {channelStateListener.onDisconnect();}}}/*** 主要用于創(chuàng)建新的handler,避免復(fù)用帶來(lái)的一些問(wèn)題*/@ChannelHandler.Sharablepublic static abstract class HandlerSet extends ChannelInboundHandlerAdapter {public abstract ChannelHandler[] handlers();}}因?yàn)镹etty不能像Mina直接在Connector上掛載監(jiān)聽(tīng)sessionClosed,只能用一個(gè)ChannelDisconnectHandler這樣的東西去監(jiān)聽(tīng)是否已經(jīng)斷開(kāi),并通過(guò)接口引出結(jié)果;
并且因?yàn)橹荒茉贑hannel.Pipeline中才能添加多個(gè)Handler的原因,這里用一個(gè)HandlerSet強(qiáng)行將所有需要的Handler集合,然后在創(chuàng)建Bootstrap的時(shí)候一次性添加進(jìn)去,想要保證每次都新建,這里就使用抽象方法,讓使用的時(shí)候可以自行創(chuàng)建。
注意,由于這里的抽象類(lèi)HandlerSet每次其實(shí)并不是新建的,所有是需要復(fù)用的,所以需要加注解@Sharable,但也只需要加它一個(gè)就行了,其他都是新建出來(lái)的,無(wú)需理會(huì)。
寫(xiě)出來(lái)就是這樣:
其中的HeartHandler是繼承自IdleStateHandler的。
整個(gè)封裝顯得花里胡哨…卻又很丑,不過(guò)勉強(qiáng)能用,水平有限。
就這樣吧。
總結(jié)
以上是生活随笔為你收集整理的Netty的断线重连的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: MM41/MM42/MM43零售物料主数
- 下一篇: mGBA-0.9.2 免费开源的gba模