Netty心跳机制-长连接
前文需求回顧
完成對(duì)紅酒窖的室內(nèi)溫度采集及監(jiān)控功能。由本地應(yīng)用程序+溫度傳感器定時(shí)采集室內(nèi)溫度上報(bào)至服務(wù)器,如果溫度 >20 °C 則由服務(wù)器下發(fā)重啟空調(diào)指令,如果本地應(yīng)用長(zhǎng)時(shí)間不上傳溫度給服務(wù)器,則給戶主手機(jī)發(fā)送一條預(yù)警短信。
Netty入門篇-從雙向通信開始「上文」
上篇算是完成簡(jiǎn)單的雙向通信了,我們接著看看 “如果本地應(yīng)用長(zhǎng)時(shí)間不上傳溫度給服務(wù)器…”,很明顯客戶端有可能掛了嘛,所以怎么實(shí)現(xiàn)客戶端與服務(wù)端的長(zhǎng)連接就是本文要實(shí)現(xiàn)的了。
什么是心跳機(jī)制
百度百科:心跳機(jī)制是定時(shí)發(fā)送一個(gè)自定義的結(jié)構(gòu)體(心跳包),讓對(duì)方知道自己還活著,以確保連接的有效性的機(jī)制。
簡(jiǎn)單說,這個(gè)心跳機(jī)制是由客戶端主動(dòng)發(fā)起的消息,每隔一段時(shí)間就向服務(wù)端發(fā)送消息,告訴服務(wù)端自己還沒死,可不要給戶主發(fā)送預(yù)警短信啊。
如何實(shí)現(xiàn)心跳機(jī)制
1、客戶端代碼修改
我們需要改造一下上節(jié)中客戶端的代碼,首先是在責(zé)任鏈中增加一個(gè)心跳邏輯處理類HeartbeatHandler
public class NettyClient {private static String host = "127.0.0.1";public static void main(String[] args) {NioEventLoopGroup workerGroup = new NioEventLoopGroup();Bootstrap bootstrap = new Bootstrap();bootstrap// 1.指定線程模型.group(workerGroup)// 2.指定 IO 類型為 NIO.channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, true)// 3.IO 處理邏輯.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) {ch.pipeline().addLast(new IdleStateHandler(0, 10, 0)).addLast(new StringDecoder()).addLast(new StringEncoder()).addLast(new HeartbeatHandler()).addLast(new NettyClientHandler());}});// 4.建立連接bootstrap.connect(host, 8070).addListener(future -> {if (future.isSuccess()) {System.out.println("連接成功!");} else {System.err.println("連接失敗!");}});} }沒什么變化,主要是增加了HeartbeatHandler,我們來看看這個(gè)類:
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import java.nio.charset.Charset; import java.time.LocalTime;public class HeartbeatHandler extends ChannelInboundHandlerAdapter {@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (evt instanceof IdleStateEvent) {IdleStateEvent idleStateEvent = (IdleStateEvent) evt;if (idleStateEvent.state() == IdleState.WRITER_IDLE) {System.out.println("10秒了,需要發(fā)送消息給服務(wù)端了" + LocalTime.now());//向服務(wù)端送心跳包ByteBuf buffer = getByteBuf(ctx);//發(fā)送心跳消息,并在發(fā)送失敗時(shí)關(guān)閉該連接ctx.writeAndFlush(buffer).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}} else {super.userEventTriggered(ctx, evt);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("捕獲的異常:" + cause.getMessage());ctx.channel().close();}private ByteBuf getByteBuf(ChannelHandlerContext ctx) {// 1. 獲取二進(jìn)制抽象 ByteBufByteBuf buffer = ctx.alloc().buffer();String time = "heartbeat:客戶端心跳數(shù)據(jù):" + LocalTime.now();// 2. 準(zhǔn)備數(shù)據(jù),指定字符串的字符集為 utf-8byte[] bytes = time.getBytes(Charset.forName("utf-8"));// 3. 填充數(shù)據(jù)到 ByteBufbuffer.writeBytes(bytes);return buffer;}}還是繼承自ChannelInboundHandlerAdapter,不過這次重寫的是userEventTriggered()方法,這個(gè)方法在客戶端的所有ChannelHandler中,如果10s內(nèi)沒有發(fā)生write事件時(shí)觸發(fā),所以我們?cè)谠摲椒ㄖ薪o服務(wù)端發(fā)送心跳消息。
業(yè)務(wù)邏輯處理類NettyClientHandler沒有改動(dòng),代碼如下:
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.nio.charset.Charset; import java.util.Date; import java.util.Random;public class NettyClientHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {System.out.println(new Date() + ": 客戶端寫出數(shù)據(jù)");// 1. 獲取數(shù)據(jù)ByteBuf buffer = getByteBuf(ctx);// 2. 寫數(shù)據(jù)ctx.channel().writeAndFlush(buffer);}private ByteBuf getByteBuf(ChannelHandlerContext ctx) {// 1. 獲取二進(jìn)制抽象 ByteBufByteBuf buffer = ctx.alloc().buffer();Random random = new Random();double value = random.nextDouble() * 14 + 8;String temp = "獲取室內(nèi)溫度:" + value;// 2. 準(zhǔn)備數(shù)據(jù),指定字符串的字符集為 utf-8byte[] bytes = temp.getBytes(Charset.forName("utf-8"));// 3. 填充數(shù)據(jù)到 ByteBufbuffer.writeBytes(bytes);return buffer;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println(new Date() + ": 客戶端讀到數(shù)據(jù) -> " + msg.toString());}}對(duì)如上代碼不了解的可以回看上一節(jié):Netty入門篇-從雙向通信開始
2、服務(wù)端代碼修改
服務(wù)端代碼主要是開啟TCP底層心跳機(jī)制支持,.childOption(ChannelOption.SO_KEEPALIVE, true) ,其他的代碼并沒有改動(dòng):
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;public class NettyServer {public static void main(String[] args) {NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workerGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup)// 指定Channel.channel(NioServerSocketChannel.class)//服務(wù)端可連接隊(duì)列數(shù),對(duì)應(yīng)TCP/IP協(xié)議listen函數(shù)中backlog參數(shù).option(ChannelOption.SO_BACKLOG, 1024)//設(shè)置TCP長(zhǎng)連接,一般如果兩個(gè)小時(shí)內(nèi)沒有數(shù)據(jù)的通信時(shí),TCP會(huì)自動(dòng)發(fā)送一個(gè)活動(dòng)探測(cè)數(shù)據(jù)報(bào)文.childOption(ChannelOption.SO_KEEPALIVE, true)//將小的數(shù)據(jù)包包裝成更大的幀進(jìn)行傳送,提高網(wǎng)絡(luò)的負(fù)載.childOption(ChannelOption.TCP_NODELAY, true).childHandler(new ChannelInitializer<NioSocketChannel>() {@Overrideprotected void initChannel(NioSocketChannel ch) {ch.pipeline().addLast(new NettyServerHandler());}});serverBootstrap.bind(8070);}}我們?cè)賮砜纯捶?wù)端的業(yè)務(wù)處理類 NettyServerHandler
import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.nio.charset.Charset; import java.util.Date;public class NettyServerHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {ByteBuf byteBuf = (ByteBuf) msg;String message = byteBuf.toString(Charset.forName("utf-8"));System.out.println(new Date() + ": 服務(wù)端讀到數(shù)據(jù) -> " + message);/** 心跳數(shù)據(jù)是不發(fā)送數(shù)據(jù) **/if(!message.contains("heartbeat")){ByteBuf out = getByteBuf(ctx);ctx.channel().writeAndFlush(out);}}private ByteBuf getByteBuf(ChannelHandlerContext ctx) {byte[] bytes = "我是發(fā)送給客戶端的數(shù)據(jù):請(qǐng)重啟冰箱!".getBytes(Charset.forName("utf-8"));ByteBuf buffer = ctx.alloc().buffer();buffer.writeBytes(bytes);return buffer;}}對(duì)channelRead() 方法增加了一個(gè) if 判斷,判斷如果包含heartbeat字符串就認(rèn)為這是客戶端發(fā)過來的心跳,這種判斷是非常low的,因?yàn)榈侥壳盀橹刮覀円恢笔怯煤?jiǎn)單字符串來傳遞數(shù)據(jù)的,上邊傳遞的數(shù)據(jù)就直接操作字符串;那么問題來了,如果我們想傳遞對(duì)象怎么搞呢?下節(jié)寫。我們先來看一下如上代碼客戶端與服務(wù)端運(yùn)行截圖:
服務(wù)端
 
 客戶端
 
至此,整個(gè)心跳機(jī)制就完成了,這樣每隔10秒客戶端就會(huì)給服務(wù)端發(fā)送一個(gè)心跳消息,下節(jié)我們通過了解通協(xié)議以完善心跳機(jī)制的代碼。
18年專科畢業(yè)后,我創(chuàng)建了一個(gè)java相關(guān)的公眾號(hào),用來記錄自己的學(xué)習(xí)之路,感興趣的小伙伴可以關(guān)注一下:小偉后端筆記
總結(jié)
以上是生活随笔為你收集整理的Netty心跳机制-长连接的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
                            
                        - 上一篇: OC中的点语法
 - 下一篇: Python天气查询系统(连接数据库版)