[二]RabbitMQ-客户端源码之AMQConnection
上一篇文章([一]RabbitMQ-客戶端源碼之ConnectionFactory)中闡述了conn.start()方法完成之后客戶端就已經和broker建立了正常的連接,而這個Connection的關鍵就在于這個start()方法之內,下面我們來慢慢分析。
首先來看看start()方法的源碼,這個方法有點長,這里拆開來一一分析,首先是注釋:
/*** Start up the connection, including the MainLoop thread.* Sends the protocol* version negotiation header, and runs through* Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then* calls Connection.Open and waits for the OpenOk. Sets heart-beat* and frame max values after tuning has taken place.* @throws IOException if an error is encountered* either before, or during, protocol negotiation;* sub-classes {@link ProtocolVersionMismatchException} and* {@link PossibleAuthenticationFailureException} will be thrown in the* corresponding circumstances. {@link AuthenticationFailureException}* will be thrown if the broker closes the connection with ACCESS_REFUSED.* If an exception is thrown, connection resources allocated can all be* garbage collected when the connection object is no longer referenced.*/首先來看看方法上的注釋說了什么:
- 方法的作用是啟動連接(start up the connection), 包括啟動MainLoop線程,這個MainLoop線程主要是和broker進行通信交互處理通信幀(Frame)的一個線程(非常的重要!!!)。
- 這個方法會在建立連接的初始化階段(negotiation)會進行Connection.Start/.StartOk, Connection.Tune/.TuneOk, 調用Connection.Open之后再等待Conenction.OpenOk(這里的都是指AMQP協議層面的),這個可以參考本文中第一張使用wireshark抓包的網絡截圖,一一對應的關系。
- 通過broker回復的Connection.Tune幀(幀中包含Channel-Max, Frame-Max, Heartbeat三個參數)設置channelMax, frameMax以及Heartbeat的參數值。
- 一些異常情況。
public void start()throws IOException, TimeoutException {initializeConsumerWorkService();initializeHeartbeatSender();this._running = true;// Make sure that the first thing we do is to send the header,// which should cause any socket errors to show up for us, rather// than risking them pop out in the MainLoopAMQChannel.SimpleBlockingRpcContinuation connStartBlocker =new AMQChannel.SimpleBlockingRpcContinuation();// We enqueue an RPC continuation here without sending an RPC// request, since the protocol specifies that after sending// the version negotiation header, the client (connection// initiator) is to wait for a connection.start method to// arrive._channel0.enqueueRpc(connStartBlocker);
首先是初始化工作線程池(initializeConsumerWorkService)和初始化心跳線程(initializeHeartbeatSender)并設置運行狀態為true(this._isrunning=true,這個值會在MainLoop線程中有用,控制MainLoop線程是否繼續運行)。
“AMQChannel.SimpleBlockingRpcContinuation connStartBlocker = new AMQChannel.SimpleBlockingRpcContinuation();”這句代碼,從命名上來說像是rpc, 其實這么理解也沒錯。RabbitMQ-Client這個版本(3.5.3)的客戶端與broker端的通信是采用java原生socket.當然后面也改成了NIO,這個自然是后話。RabbitMQ-Client程序中會對各種幀進行處理,處理的方式也不是單一化的,這里舉Connection.Start這個類型的報文做分析。當broker發送Connection.Start至client端,client收到之后進行處理(MainLoop線程中),然后將此報文存入SimpleBlockingRpcContinuation中,照著SimpleBlockingRpcContinuation深究下去,其就是一個容量為1的BlockingQueue,也就是當MainLoop主導的線程將收到的Connection.Start存入其中,然后AMQConnction類的start()線程在等待(start()方法下面的代碼):
connStart = (AMQP.Connection.Start) connStartBlocker.getReply(HANDSHAKE_TIMEOUT/2).getMethod();然后繼續處理。這看上去也算是個rpc,等待別的線程(這個線程同樣在等待broker的返回)處理完畢。
AMQCommand(這個之后會講到), 下面的“_channel0.enqueueRpc(connStartBlocker)”將這個rpc任務放入Channel中,如果深入代碼看的話,channel中當前至多只能enqueue一個rpc,如果當前的rpc沒有處理完再enqueue的話會被阻塞(wait())直到處理完當前的rpc才能enqueue下一個rpc。
try {// The following two lines are akin to AMQChannel's// transmit() method for this pseudo-RPC._frameHandler.setTimeout(HANDSHAKE_TIMEOUT);_frameHandler.sendHeader();} catch (IOException ioe) {_frameHandler.close();throw ioe;}
接下來“_frameHandler.sendHeader()”主要是發送Protocol-Header 0-9-1幀(可參考下圖),這個客戶端與broker建立連接的AMQP協議的第一幀,幀中的內容包括AMQP的版本號。這里發_frameHandler就是前面Connection提到的SocketFrameHandler對象,我們來看看sendHeader()做了什么:
//本段代碼在SocketFrameHandler類中 public void sendHeader(int major, int minor, int revision) throws IOException {synchronized (_outputStream) {_outputStream.write("AMQP".getBytes("US-ASCII"));_outputStream.write(0);_outputStream.write(major);_outputStream.write(minor);_outputStream.write(revision);_outputStream.flush();}}public void sendHeader() throws IOException {sendHeader(AMQP.PROTOCOL.MAJOR, AMQP.PROTOCOL.MINOR, AMQP.PROTOCOL.REVISION);}上面這段對照著下圖一目了然:
// start the main loop goingMainLoop loop = new MainLoop();final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();mainLoopThread = Environment.newThread(threadFactory, loop, name);mainLoopThread.start();// after this point clear-up of MainLoop is triggered by closing the frameHandler.
下面就是最重要的MainLoop線程了。這里先跳過,接下去看看start()方法,之后就是Connection.Start/.StartOk, Connection.Tune/.TuneOk, Connection.Open/.OpenOk的來回negotiation,以及設置channelMax, frameMax和heartbeat的參數值。當然在設置frameMax之前還初始化了ChannelManager,至于ChannelManager可以簡單的理解為管理Channel的一個類,具體實現細節可以參考([三]RabbitMQ-客戶端源碼之ChannelManager)
AMQP.Connection.Start connStart = null;AMQP.Connection.Tune connTune = null;try {connStart =(AMQP.Connection.Start) connStartBlocker.getReply(HANDSHAKE_TIMEOUT/2).getMethod();_serverProperties = Collections.unmodifiableMap(connStart.getServerProperties());Version serverVersion =new Version(connStart.getVersionMajor(),connStart.getVersionMinor());if (!Version.checkVersion(clientVersion, serverVersion)) {throw new ProtocolVersionMismatchException(clientVersion,serverVersion);}String[] mechanisms = connStart.getMechanisms().toString().split(" ");SaslMechanism sm = this.saslConfig.getSaslMechanism(mechanisms);if (sm == null) {throw new IOException("No compatible authentication mechanism found - " +"server offered [" + connStart.getMechanisms() + "]");}LongString challenge = null;LongString response = sm.handleChallenge(null, this.username, this.password);do {Method method = (challenge == null)? new AMQP.Connection.StartOk.Builder().clientProperties(_clientProperties).mechanism(sm.getName()).response(response).build(): new AMQP.Connection.SecureOk.Builder().response(response).build();try {Method serverResponse = _channel0.rpc(method, HANDSHAKE_TIMEOUT/2).getMethod();if (serverResponse instanceof AMQP.Connection.Tune) {connTune = (AMQP.Connection.Tune) serverResponse;} else {challenge = ((AMQP.Connection.Secure) serverResponse).getChallenge();response = sm.handleChallenge(challenge, this.username, this.password);}} catch (ShutdownSignalException e) {Method shutdownMethod = e.getReason();if (shutdownMethod instanceof AMQP.Connection.Close) {AMQP.Connection.Close shutdownClose = (AMQP.Connection.Close) shutdownMethod;if (shutdownClose.getReplyCode() == AMQP.ACCESS_REFUSED) {throw new AuthenticationFailureException(shutdownClose.getReplyText());}}throw new PossibleAuthenticationFailureException(e);}} while (connTune == null);} catch (TimeoutException te) {_frameHandler.close();throw te;} catch (ShutdownSignalException sse) {_frameHandler.close();throw AMQChannel.wrap(sse);} catch(IOException ioe) {_frameHandler.close();throw ioe;}try {int channelMax =negotiateChannelMax(this.requestedChannelMax,connTune.getChannelMax());_channelManager = instantiateChannelManager(channelMax, threadFactory);int frameMax =negotiatedMaxValue(this.requestedFrameMax,connTune.getFrameMax());this._frameMax = frameMax;int heartbeat =negotiatedMaxValue(this.requestedHeartbeat,connTune.getHeartbeat());setHeartbeat(heartbeat);_channel0.transmit(new AMQP.Connection.TuneOk.Builder().channelMax(channelMax).frameMax(frameMax).heartbeat(heartbeat).build());_channel0.exnWrappingRpc(new AMQP.Connection.Open.Builder().virtualHost(_virtualHost).build());} catch (IOException ioe) {_heartbeatSender.shutdown();_frameHandler.close();throw ioe;} catch (ShutdownSignalException sse) {_heartbeatSender.shutdown();_frameHandler.close();throw AMQChannel.wrap(sse);}// We can now respond to errors having finished tailoring the connectionthis._inConnectionNegotiation = false;return; }接著回顧MainLoop, 在start()方法中關于MainLoop的代碼主要有:
// start the main loop goingMainLoop loop = new MainLoop();final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();mainLoopThread = Environment.newThread(threadFactory, loop, name);mainLoopThread.start();// after this point clear-up of MainLoop is triggered by closing the frameHandler.這段代碼主要是初始化MainLoop線程對象,然后讓其運行。沒有什么特別之處,而特別之處在于MainLoop本身。
MainLoop類是AMQConnection類的私有內部類:
private class MainLoop implements Runnable {/*** Channel reader thread main loop. Reads a frame, and if it is* not a heartbeat frame, dispatches it to the channel it refers to.* Continues running until the "running" flag is set false by* shutdown().*/public void run() {try {while (_running) {Frame frame = _frameHandler.readFrame();if (frame != null) {_missedHeartbeats = 0;if (frame.type == AMQP.FRAME_HEARTBEAT) {// Ignore it: we've already just reset the heartbeat counter.} else {if (frame.channel == 0) { // the special channel_channel0.handleFrame(frame);} else {if (isOpen()) {// If we're still _running, but not isOpen(), then we// must be quiescing, which means any inbound frames// for non-zero channels (and any inbound commands on// channel zero that aren't Connection.CloseOk) must// be discarded.ChannelManager cm = _channelManager;if (cm != null) {cm.getChannel(frame.channel).handleFrame(frame);}}}}} else {// Socket timeout waiting for a frame.// Maybe missed heartbeat.handleSocketTimeout();}}} catch (EOFException ex) {if (!_brokerInitiatedShutdown)shutdown(null, false, ex, true);} catch (Throwable ex) {_exceptionHandler.handleUnexpectedConnectionDriverException(AMQConnection.this,ex);shutdown(null, false, ex, true);} finally {// Finally, shut down our underlying data connection._frameHandler.close();_appContinuation.set(null);notifyListeners();}} }MainLoop線程主要用來處理通信幀(Frame,有關Frame的細節將會在([四]RabbitMQ-客戶端源碼之Frame)中陳述)的,可以看到當AMQConnection調用start()方法后,_isrunning就設置為true,那么線程一直在運行(while(true))。
MainLoop線程當讀取到通信幀之后,判斷是否是心跳幀,如果是則忽略繼續監聽。如果是其他幀,則判斷其frame.channel值是否為0,frame.channel值為0代表的是特殊幀,這些特殊幀是和Connection有關的,而不是和Channel有關的(上面代碼里的frame.channel就是Channel里的channel number, 一般Connection類型的幀的channel number為0,而其余Channel類別幀的channel number大于0。)
這里就分channel_number=0和channel_number !=0分別進行處理。
當channel_number=0即frame.channel=0則直接調用_channel0的handleFrame方法。
這個_channel0是在AMQConnection類中創建的私有變量:
調用AMQChannel的handleFrame方法(有關AMQChannel的更多實現細節可以參考:([五]RabbitMQ-客戶端源碼之AMQChannel)):
public void handleFrame(Frame frame) throws IOException {AMQCommand command = _command;if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line_command = new AMQCommand(); // prepare for the next onehandleCompleteInboundCommand(command);} }對于channel number為0的幀,AMQCommand的handleFrame方法都是返回true.(有關AMQCommand的實現細節可以參考:([六]RabbitMQ-客戶端源碼之AMQCommand))
進而調用AMQChannel的handleCompleteInboundCommand(command)方法:
進而調用AMQChannel的processAsync方法。這個方法在AMQChannel類中是一個抽象方法,而觀察AMQConnection中的AMQChannel _channel0私有變量其正好實現了這個方法:
/** The special channel 0 (not managed by the_channelManager) */ private final AMQChannel _channel0 = new AMQChannel(this, 0) {@Override public boolean processAsync(Command c) throws IOException {return getConnection().processControlCommand(c);} };ChannelN中也實現了processAsync方法。(有關ChannelN的實現細節可以參考:([八]RabbitMQ-客戶端源碼之ChannelN))
進而調用了AMQConnection的processControlCommand方法:
/*** Handles incoming control commands on channel zero.* @see ChannelN#processAsync*/ @SuppressWarnings("unused") public boolean processControlCommand(Command c) throws IOException {// Similar trick to ChannelN.processAsync used here, except// we're interested in whole-connection quiescing.// See the detailed comments in ChannelN.processAsync.Method method = c.getMethod();if (isOpen()) {if (method instanceof AMQP.Connection.Close) {handleConnectionClose(c);return true;} else if (method instanceof AMQP.Connection.Blocked) {AMQP.Connection.Blocked blocked = (AMQP.Connection.Blocked) method;try {for (BlockedListener l : this.blockedListeners) {l.handleBlocked(blocked.getReason());}} catch (Throwable ex) {getExceptionHandler().handleBlockedListenerException(this, ex);}return true;} else if (method instanceof AMQP.Connection.Unblocked) {try {for (BlockedListener l : this.blockedListeners) {l.handleUnblocked();}} catch (Throwable ex) {getExceptionHandler().handleBlockedListenerException(this, ex);}return true;} else {return false;}} else {if (method instanceof AMQP.Connection.Close) {// Already shutting down, so just send back a CloseOk.try {_channel0.quiescingTransmit(new AMQP.Connection.CloseOk.Builder().build());} catch (IOException _e) { } // ignorereturn true;} else if (method instanceof AMQP.Connection.CloseOk) {// It's our final "RPC". Time to shut down._running = false;// If Close was sent from within the MainLoop we// will not have a continuation to return to, so// we treat this as processed in that case.return !_channel0.isOutstandingRpc();} else { // Ignore all others.return true;}} }這個方法是用來處理AMQP控制命令的:Connection.Close/CloseOk, Connection.Blocked/.Unblocked。正常情況下(比如Connection.Start/.StartOk)直接返回false。
這樣就會運行到 nextOutstandingRpc().handleCommand(command);這句代碼,意思就是將從broker接受到的AMQCommand對象存入RpcContinuation對象,確切的來說是SimpleBlockingRpcContinuation這個對象中,更確切的來說是存放到容量為1的BlockingQueue中,等待其余的線程來“take()”。有關RpcContinuation或者SimpleBlockingRpcContinuation細節可以參考:[五]RabbitMQ-客戶端源碼之AMQChannel。
我們假設有一個可靠的面向流的網絡傳輸層(TCP/IP或相當)。在單個套接字連接中,可以存在多個獨立控制線程,這些稱之為通道。每個幀都使用通道編號來編號。通過交織他們的幀,不同的通道共享連接。對于給定的通道,幀運行在一個嚴格的序列,這樣可以用來驅動一個協議解析器(通常是一個狀態機)。
當channel_number!=0則需要從ChannelManager中根據channel number找出相應的AMQChannel再調用handleFrame方法處理。
這里的ChannelManager從何而來?
這里就還是要到AMQChannel的start()方法來看,有這么一句:_channelManager = instantiateChannelManager(channelMax, threadFactory);
ChannelManager構造方法中的ConsumerWorkService參數就是AMQConnection中start()方法第一行代碼初始化的ConsumerWorkService對象。
有關ChannelManager的實現細節可以參考:([三]RabbitMQ-客戶端源碼之ChannelManager)
當channel number等于0的時候是調用AMQChannel,也可以說是AQMConnection的內部成員變量AMQChannel _channel0來處理。
當channel number不等于0時,這個接下去的處理就要涉及到整個RabbitMQ-Client代碼最核心的類——ChannelN。可以類別上上面channel number為0的情況,具體可以參考:[八]RabbitMQ-客戶端源碼之ChannelN。
附:本系列全集
總結
以上是生活随笔為你收集整理的[二]RabbitMQ-客户端源码之AMQConnection的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Kubernetes环境下的各种调试方法
- 下一篇: Zabbix邮件报警配置