真强啊!建议每一位Java程序员都读读Dubbo心跳设计的源码...
# 前言
談到RPC肯定繞不開TCP通信,而主流的RPC框架都依賴于Netty等通信框架,這時(shí)候我們還要考慮是使用長(zhǎng)連接還是短連接:
- 短連接:每次通信結(jié)束后關(guān)閉連接,下次通信需要重新創(chuàng)建連接;優(yōu)點(diǎn)就是無需管理連接,無需保活連接;
- 長(zhǎng)連接:每次通信結(jié)束不關(guān)閉連接,連接可以復(fù)用,保證了性能;缺點(diǎn)就是連接需要統(tǒng)一管理,并且需要?;?#xff1b;
主流的RPC框架都會(huì)追求性能選擇使用長(zhǎng)連接,所以如何?;钸B接就是一個(gè)重要的話題,也是本文的主題,下面會(huì)重點(diǎn)介紹一些?;畈呗?#xff1b;
# 為什么需要?;?/strong>
上面介紹的長(zhǎng)連接、短連接并不是TCP提供的功能,所以長(zhǎng)連接是需要應(yīng)用端自己來實(shí)現(xiàn)的,包括:連接的統(tǒng)一管理,如何?;畹?#xff1b;如何?;钪拔覀兞私庖幌聻槭裁葱枰;?#xff1f;
主要原因是網(wǎng)絡(luò)不是100%可靠的,我們創(chuàng)建好的連接可能由于網(wǎng)絡(luò)原因?qū)е逻B接已經(jīng)不可用了,如果連接一直有消息往來,那么系統(tǒng)馬上可以感知到連接斷開;
但是我們系統(tǒng)可能長(zhǎng)時(shí)間沒有消息來往,導(dǎo)致系統(tǒng)不能及時(shí)感知到連接不可用,也就是不能及時(shí)處理重連或者釋放連接;常見的?;畈呗允褂眯奶鴻C(jī)制由應(yīng)用層來實(shí)現(xiàn),還有網(wǎng)絡(luò)層提供的TCP Keepalive?;钐綔y(cè)機(jī)制;
# TCP Keepalive機(jī)制
TCP Keepalive是操作系統(tǒng)實(shí)現(xiàn)的功能,并不是TCP協(xié)議的一部分,需要在操作系統(tǒng)下進(jìn)行相關(guān)配置,開啟此功能后,如果連接在一段時(shí)間內(nèi)沒有數(shù)據(jù)往來,TCP將發(fā)送Keepalive探針來確認(rèn)連接的可用性,Keepalive幾個(gè)內(nèi)核參數(shù)配置:
- tcp_keepalive_time:連接多長(zhǎng)時(shí)間沒有數(shù)據(jù)往來發(fā)送探針請(qǐng)求,默認(rèn)為7200s(2h);
- tcp_keepalive_probes:探測(cè)失敗重試的次數(shù)默認(rèn)為10次;
- tcp_keepalive_intvl:重試的間隔時(shí)間默認(rèn)75s;
以上參數(shù)可以修改到/etc/sysctl.conf文件中;是否使用Keepalive用來?;罹蛪蛄?#xff0c;其實(shí)還不夠,Keepalive只是在網(wǎng)絡(luò)層就行?;?#xff0c;如果網(wǎng)絡(luò)本身沒有問題,但是系統(tǒng)由于其他原因已經(jīng)不可用了,這時(shí)候Keepalive并不能發(fā)現(xiàn);所以往往還需要結(jié)合心跳機(jī)制來一起使用;
# 心跳機(jī)制
何為心跳機(jī)制,簡(jiǎn)單來講就是客戶端啟動(dòng)一個(gè)定時(shí)器用來定時(shí)發(fā)送請(qǐng)求,服務(wù)端接到請(qǐng)求進(jìn)行響應(yīng),如果多次沒有接受到響應(yīng),那么客戶端認(rèn)為連接已經(jīng)斷開,可以斷開半打開的連接或者進(jìn)行重連處理;下面以Dubbo為例來看看是如何具體實(shí)施的;
Dubbo2.6.X
在HeaderExchangeClient中啟動(dòng)了定時(shí)器ScheduledThreadPoolExecutor來定期執(zhí)行心跳請(qǐng)求
ScheduledThreadPoolExecutor scheduled = new ScheduledThreadPoolExecutor(2, new NamedThreadFactory("dubbo-remoting-client-heartbeat", true));在實(shí)例化HeaderExchangeClient時(shí)啟動(dòng)心跳定時(shí)器:
private void startHeartbeatTimer() {stopHeartbeatTimer(); if (heartbeat > 0) {heartbeatTimer = scheduled.scheduleWithFixedDelay(new HeartBeatTask(new HeartBeatTask.ChannelProvider() { @Overridepublic Collection<Channel> getChannels() { return Collections.<Channel>singletonList(HeaderExchangeClient.this);}}, heartbeat, heartbeatTimeout),heartbeat, heartbeat, TimeUnit.MILLISECONDS);} }heartbeat默認(rèn)為60秒,heartbeatTimeout默認(rèn)為heartbeat*3,可以理解至少出現(xiàn)三次心跳請(qǐng)求還未收到回復(fù)才會(huì)任務(wù)連接已經(jīng)斷開;HeartBeatTask為執(zhí)行心跳的任務(wù):
public void run() {long now = System.currentTimeMillis(); for (Channel channel : channelProvider.getChannels()) { if (channel.isClosed()) { continue;} Long lastRead = (Long) channel.getAttribute(HeaderExchangeHandler.KEY\_READ\_TIMESTAMP); Long lastWrite = (Long) channel.getAttribute(HeaderExchangeHandler.KEY\_WRITE\_TIMESTAMP); if ((lastRead != null && now - lastRead > heartbeat)|| (lastWrite != null && now - lastWrite > heartbeat)) { // 發(fā)送心跳} if (lastRead != null && now - lastRead > heartbeatTimeout) { if (channel instanceof Client) {((Client) channel).reconnect();} else {channel.close();}}} }因?yàn)镈ubbo雙端都會(huì)發(fā)送心跳請(qǐng)求,所以可以發(fā)現(xiàn)有兩個(gè)時(shí)間點(diǎn)分別是:
lastRead和lastWrite;當(dāng)然時(shí)間和最后讀取,最后寫的時(shí)間間隔大于heartbeat就會(huì)發(fā)送心跳請(qǐng)求;
如果多次心跳未返回結(jié)果,也就是最后讀取消息時(shí)間大于heartbeatTimeout會(huì)判定當(dāng)前是Client還是Server,如果是Client會(huì)發(fā)起reconnect,Server會(huì)關(guān)閉連接,這樣的考慮是合理的,客戶端調(diào)用是強(qiáng)依賴可用連接的,而服務(wù)端可以等待客戶端重新建立連接;
以上只是介紹的Client,同樣Server端也有相同的心跳處理,在可以查看HeaderExchangeServer;
Dubbo2.7.0
Dubbo2.7.0的心跳機(jī)制在2.6.X的基礎(chǔ)上得到了加強(qiáng),同樣在HeaderExchangeClient中使用HashedWheelTimer開啟心跳檢測(cè),這是Netty提供的一個(gè)時(shí)間輪定時(shí)器,在任務(wù)非常多,并且任務(wù)執(zhí)行時(shí)間很短的情況下,HashedWheelTimer比Schedule性能更好,特別適合心跳檢測(cè);
HashedWheelTimer heartbeatTimer = new HashedWheelTimer(new NamedThreadFactory("dubbo-client-heartbeat", true), tickDuration,TimeUnit.MILLISECONDS, Constants.TICKS\_PER\_WHEEL);分別啟動(dòng)了兩個(gè)定時(shí)任務(wù):startHeartBeatTask和startReconnectTask:
private void startHeartbeatTimer() {AbstractTimerTask.ChannelProvider cp = () -> Collections.singletonList(HeaderExchangeClient.this); long heartbeatTick = calculateLeastDuration(heartbeat); long heartbeatTimeoutTick = calculateLeastDuration(heartbeatTimeout);HeartbeatTimerTask heartBeatTimerTask = new HeartbeatTimerTask(cp, heartbeatTick, heartbeat);ReconnectTimerTask reconnectTimerTask = new ReconnectTimerTask(cp, heartbeatTimeoutTick, heartbeatTimeout); // init task and start timer.heartbeatTimer.newTimeout(heartBeatTimerTask, heartbeatTick, TimeUnit.MILLISECONDS);heartbeatTimer.newTimeout(reconnectTimerTask, heartbeatTimeoutTick, TimeUnit.MILLISECONDS); }HeartbeatTimerTask:用來定時(shí)發(fā)送心跳請(qǐng)求,心跳間隔時(shí)間默認(rèn)為60秒;這里重新計(jì)算了時(shí)間,其實(shí)就是在原來的基礎(chǔ)上除以3,其實(shí)就是縮短了檢測(cè)間隔時(shí)間,增大了及時(shí)發(fā)現(xiàn)死鏈的概率;分別看一下兩個(gè)任務(wù):
protected void doTask(Channel channel) {Long lastRead = lastRead(channel);Long lastWrite = lastWrite(channel); if ((lastRead != null && now() - lastRead > heartbeat)|| (lastWrite != null && now() - lastWrite > heartbeat)) {Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setEvent(Request.HEARTBEAT_EVENT);channel.send(req);} }同上檢測(cè)最后讀寫時(shí)間和heartbeat的大小,注:普通請(qǐng)求和心跳請(qǐng)求都會(huì)更新讀寫時(shí)間
protected void doTask(Channel channel) {Long lastRead = lastRead(channel);Long now = now(); if (lastRead != null && now - lastRead > heartbeatTimeout) { if (channel instanceof Client) {((Client) channel).reconnect();} else {channel.close();}} }同樣的在超時(shí)的情況下,Client重連,Server關(guān)閉連接;同樣Server端也有相同的心跳處理,在可以查看HeaderExchangeServer;
Dubbo2.7.1-X
在Dubbo2.7.1之后,借助了Netty提供的IdleStateHandler來實(shí)現(xiàn)心跳機(jī)制服務(wù)
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime,TimeUnit unit) {this(false, readerIdleTime, writerIdleTime, allIdleTime, unit); }- readerIdleTime:讀超時(shí)時(shí)間;
- writerIdleTime:寫超時(shí)時(shí)間;
- allIdleTime:所有類型的超時(shí)時(shí)間;
根據(jù)設(shè)置的超時(shí)時(shí)間,循環(huán)檢查讀寫事件多久沒有發(fā)生了,在pipeline中加入IdleSateHandler之后,可以在此pipeline的任意Handler的userEventTriggered方法之中檢測(cè)IdleStateEvent事件;下面看看具體Client和Server端添加的IdleStateHandler:
Client端
protected void initChannel(Channel ch) throws Exception { final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this); int heartbeatInterval = UrlUtils.getHeartbeat(getUrl());ch.pipeline().addLast("client-idle-handler", new IdleStateHandler(heartbeatInterval, 0, 0, MILLISECONDS)).addLast("handler", nettyClientHandler); }Client端在NettyClient中添加了IdleStateHandler,指定了讀寫超時(shí)時(shí)間默認(rèn)為60秒;60秒內(nèi)沒有讀寫事件發(fā)生,會(huì)觸發(fā)IdleStateEvent事件在NettyClientHandler處理:
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { try {NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);Request req = new Request();req.setVersion(Version.getProtocolVersion());req.setTwoWay(true);req.setEvent(Request.HEARTBEAT_EVENT);channel.send(req);} finally {NettyChannel.removeChannelIfDisconnected(ctx.channel());}} else { super.userEventTriggered(ctx, evt);} }可以發(fā)現(xiàn)接收到IdleStateEvent事件發(fā)送了心跳請(qǐng)求;至于Client端如何處理重連,同樣在HeaderExchangeClient中使用HashedWheelTimer定時(shí)器啟動(dòng)了兩個(gè)任務(wù):心跳任務(wù)和重連任務(wù),感覺這里已經(jīng)不需要心跳任務(wù)了,至于重連任務(wù)其實(shí)也可以放到userEventTriggered中處理;
Server端
protected void initChannel(NioSocketChannel ch) throws Exception { int idleTimeout = UrlUtils.getIdleTimeout(getUrl()); final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);ch.pipeline().addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS)).addLast("handler", nettyServerHandler); }Server端指定的超時(shí)時(shí)間默認(rèn)為60*3秒,在NettyServerHandler中處理
userEventTriggered
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) {NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler); try {channel.close();} finally {NettyChannel.removeChannelIfDisconnected(ctx.channel());}} super.userEventTriggered(ctx, evt); }Server端在指定的超時(shí)時(shí)間內(nèi)沒有發(fā)生讀寫,會(huì)直接關(guān)閉連接;相比之前現(xiàn)在只有Client發(fā)送心跳,單向發(fā)送心跳;
同樣的在HeaderExchangeServer中并沒有啟動(dòng)多個(gè)認(rèn)為,僅僅啟動(dòng)了一個(gè)CloseTimerTask,用來檢測(cè)超時(shí)時(shí)間關(guān)閉連接;感覺這個(gè)任務(wù)是不是也可以不需要了,IdleStateHandler已經(jīng)實(shí)現(xiàn)了此功能;
綜上:在使用IdleStateHandler的情況下來同時(shí)在HeaderExchangeClient啟動(dòng)心跳+重連機(jī)制,HeaderExchangeServer啟動(dòng)了關(guān)閉連接機(jī)制;主要是因?yàn)镮dleStateHandler是Netty框架特有了,而Dubbo是支持多種底層通訊框架的包括Mina,Grizzy等,應(yīng)該是為了兼容此類框架存在的;?
# 總結(jié)
本文首先介紹了RPC中引入的長(zhǎng)連接方式,繼而引出長(zhǎng)連接的?;顧C(jī)制,為什么需要?;?#xff1f;然后分別介紹了網(wǎng)絡(luò)層?;顧C(jī)制TCP Keepalive機(jī)制,應(yīng)用層心跳機(jī)制;最后已Dubbo為例看各個(gè)版本中對(duì)心跳機(jī)制的進(jìn)化。
《新程序員》:云原生和全面數(shù)字化實(shí)踐50位技術(shù)專家共同創(chuàng)作,文字、視頻、音頻交互閱讀總結(jié)
以上是生活随笔為你收集整理的真强啊!建议每一位Java程序员都读读Dubbo心跳设计的源码...的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 关于高并发,我想告诉你这些!
- 下一篇: 写的很好!细数 Java 线程池的原理