zookeeper源码分析之三客户端发送请求流程
znode?可以被監(jiān)控,包括這個(gè)目錄節(jié)點(diǎn)中存儲(chǔ)的數(shù)據(jù)的修改,子節(jié)點(diǎn)目錄的變化等,一旦變化可以通知設(shè)置監(jiān)控的客戶端,這個(gè)功能是zookeeper對(duì)于應(yīng)用最重要的特性,通過(guò)這個(gè)特性可以實(shí)現(xiàn)的功能包括配置的集中管理,集群管理,分布式鎖等等。
知識(shí)準(zhǔn)備:
zookeeper定義的狀態(tài)有:
Unknown (-1),Disconnected (0),NoSyncConnected (1),SyncConnected (3),AuthFailed (4),ConnectedReadOnly (5),SaslAuthenticated(6),Expired (-112);事件定義的的類型有:None (-1),NodeCreated (1),NodeDeleted (2),NodeDataChanged (3),NodeChildrenChanged (4),DataWatchRemoved (5),ChildWatchRemoved (6);
watcher定義的的類型有Children(1), Data(2), Any(3);
在上一篇
zookeeper源碼分析之一客戶端
中,我們連接zookeeper時(shí),啟動(dòng)了一個(gè)MyWatcher
protected void connectToZK(String newHost) throws InterruptedException, IOException {if (zk != null && zk.getState().isAlive()) {zk.close();}host = newHost;boolean readOnly = cl.getOption("readonly") != null;if (cl.getOption("secure") != null) {System.setProperty(ZooKeeper.SECURE_CLIENT, "true");System.out.println("Secure connection is enabled");} zk = new ZooKeeper(host,Integer.parseInt(cl.getOption("timeout")),new MyWatcher(), readOnly);}創(chuàng)建zookeeper示例時(shí),使用到watchManager:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,boolean canBeReadOnly, HostProvider aHostProvider)throws IOException {LOG.info("Initiating client connection, connectString=" + connectString+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);watchManager = defaultWatchManager();watchManager.defaultWatcher = watcher;ConnectStringParser connectStringParser = new ConnectStringParser(connectString);hostProvider = aHostProvider; cnxn = new ClientCnxn(connectStringParser.getChrootPath(),hostProvider, sessionTimeout, this, watchManager,getClientCnxnSocket(), canBeReadOnly);cnxn.start();}將傳進(jìn)來(lái)的MyWatcher作為默認(rèn)watcher,存入watchManager,然后通過(guò)ClientCnxn包裝后,啟動(dòng)線程。
那我們先了解一下ClientCnxn吧,ClientCnxn管理客戶端socket的io,它維護(hù)了一組可以連接上的server及當(dāng)需要轉(zhuǎn)換時(shí)可以透明的轉(zhuǎn)換到的一組server。
先了解一下如何獲取socket的吧:
private static ClientCnxnSocket getClientCnxnSocket() throws IOException {String clientCnxnSocketName = System.getProperty(ZOOKEEPER_CLIENT_CNXN_SOCKET);if (clientCnxnSocketName == null) {clientCnxnSocketName = ClientCnxnSocketNIO.class.getName();}try {return (ClientCnxnSocket) Class.forName(clientCnxnSocketName).newInstance();} catch (Exception e) {IOException ioe = new IOException("Couldn't instantiate "+ clientCnxnSocketName);ioe.initCause(e);throw ioe;}}接著啟動(dòng)ClientCnxn的start()方法,在此方法中啟動(dòng)了兩個(gè)線程:
public void start() {sendThread.start();eventThread.start();}其中SendThread類為發(fā)送的請(qǐng)求隊(duì)列提供服務(wù),并且產(chǎn)生心跳。它同時(shí)也產(chǎn)生ReadThread。
我們看一下SendThread的run方法的主體:
if (!clientCnxnSocket.isConnected()) {// don't re-establish connection if we are closingif (closing) {break;}startConnect();clientCnxnSocket.updateLastSendAndHeard();}if (state.isConnected()) {// determine whether we need to send an AuthFailed event.if (zooKeeperSaslClient != null) {boolean sendAuthEvent = false;if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {try {zooKeeperSaslClient.initialize(ClientCnxn.this);} catch (SaslException e) {LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);state = States.AUTH_FAILED;sendAuthEvent = true;}}KeeperState authState = zooKeeperSaslClient.getKeeperState();if (authState != null) {if (authState == KeeperState.AuthFailed) {// An authentication error occurred during authentication with the Zookeeper Server.state = States.AUTH_FAILED;sendAuthEvent = true;} else {if (authState == KeeperState.SaslAuthenticated) {sendAuthEvent = true;}}}if (sendAuthEvent == true) {eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,authState,null));}}to = readTimeout - clientCnxnSocket.getIdleRecv();} else {to = connectTimeout - clientCnxnSocket.getIdleRecv();}if (to <= 0) {String warnInfo;warnInfo = "Client session timed out, have not heard from server in "+ clientCnxnSocket.getIdleRecv()+ "ms"+ " for sessionid 0x"+ Long.toHexString(sessionId);LOG.warn(warnInfo);throw new SessionTimeoutException(warnInfo);}if (state.isConnected()) {//1000(1 second) is to prevent race condition missing to send the second ping//also make sure not to send too many pings when readTimeout is small int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);//send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVALif (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {sendPing();clientCnxnSocket.updateLastSend();} else {if (timeToNextPing < to) {to = timeToNextPing;}}}// If we are in read-only mode, seek for read/write serverif (state == States.CONNECTEDREADONLY) {long now = Time.currentElapsedTime();int idlePingRwServer = (int) (now - lastPingRwServer);if (idlePingRwServer >= pingRwTimeout) {lastPingRwServer = now;idlePingRwServer = 0;pingRwTimeout =Math.min(2*pingRwTimeout, maxPingRwTimeout);pingRwServer();}to = Math.min(to, pingRwTimeout - idlePingRwServer);} clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);ClientCnxnSocketNetty實(shí)現(xiàn)了ClientCnxnSocket的抽象方法,它負(fù)責(zé)連接到server,讀取/寫(xiě)入網(wǎng)絡(luò)流量,并作為網(wǎng)絡(luò)數(shù)據(jù)層和更高packet層的中間層。其生命周期如下:
loop:- try:- - !isConnected()- - - connect()- - doTransport()- catch:- - cleanup()close()從上述描述中,我們可以看到ClientCnxnSocket的工作流程,先判斷是否連接,沒(méi)有連接則調(diào)用connect方法進(jìn)行連接,有連接則直接使用;然后調(diào)用doTransport方法進(jìn)行通信,若連接過(guò)程中出現(xiàn)異常,則調(diào)用cleanup()方法;最后關(guān)閉連接。故最主要的流程為doTransport()方法:
@Overridevoid doTransport(int waitTimeOut,List<Packet> pendingQueue,ClientCnxn cnxn)throws IOException, InterruptedException {try {if (!firstConnect.await(waitTimeOut, TimeUnit.MILLISECONDS)) {return;}Packet head = null;if (needSasl.get()) {if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {return;}} else {if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {return;}}// check if being waken up on closing.if (!sendThread.getZkState().isAlive()) {// adding back the patck to notify of failure in conLossPacket(). addBack(head);return;}// channel disconnection happenedif (disconnected.get()) {addBack(head);throw new EndOfStreamException("channel for sessionid 0x"+ Long.toHexString(sessionId)+ " is lost");}if (head != null) {doWrite(pendingQueue, head, cnxn);}} finally {updateNow();}}我們簡(jiǎn)化一下上面的程序,一個(gè)是異常處理addBack(head),另一個(gè)正常流程處理doWrite(pendingQueue, head, cnxn),我們先拋掉異常,走正常流程看看:
先獲取Packet:
Packet head = null;if (needSasl.get()) {if (!waitSasl.tryAcquire(waitTimeOut, TimeUnit.MILLISECONDS)) {return;}} else {if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) {return;}}其中,protected LinkedBlockingDeque<Packet> outgoingQueue是一個(gè)鏈表阻塞隊(duì)列,保存發(fā)出的請(qǐng)求;
然后執(zhí)行doWrite方法:
/*** doWrite handles writing the packets from outgoingQueue via network to server.*/private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn) {updateNow();while (true) {if (p != WakeupPacket.getInstance()) {if ((p.requestHeader != null) &&(p.requestHeader.getType() != ZooDefs.OpCode.ping) &&(p.requestHeader.getType() != ZooDefs.OpCode.auth)) {p.requestHeader.setXid(cnxn.getXid());synchronized (pendingQueue) {pendingQueue.add(p);}} sendPkt(p);}if (outgoingQueue.isEmpty()) {break;}p = outgoingQueue.remove();}}dowrite方法負(fù)責(zé)將outgoingQueue的報(bào)文通過(guò)網(wǎng)絡(luò)寫(xiě)到服務(wù)器上。發(fā)送報(bào)文程序如上紅色所示:
private void sendPkt(Packet p) {// Assuming the packet will be sent out successfully. Because if it fails,// the channel will close and clean up queues. p.createBB();updateLastSend();sentCount++;channel.write(ChannelBuffers.wrappedBuffer(p.bb));}1. Packet報(bào)文的結(jié)構(gòu)如下:
/*** This class allows us to pass the headers and the relevant records around.*/static class Packet {RequestHeader requestHeader;ReplyHeader replyHeader;Record request;Record response;ByteBuffer bb;/** Client's view of the path (may differ due to chroot) **/String clientPath;/** Servers's view of the path (may differ due to chroot) **/String serverPath;boolean finished;AsyncCallback cb;Object ctx;WatchRegistration watchRegistration;public boolean readOnly;WatchDeregistration watchDeregistration;/** Convenience ctor */Packet(RequestHeader requestHeader, ReplyHeader replyHeader,Record request, Record response,WatchRegistration watchRegistration) {this(requestHeader, replyHeader, request, response,watchRegistration, false);}Packet(RequestHeader requestHeader, ReplyHeader replyHeader,Record request, Record response,WatchRegistration watchRegistration, boolean readOnly) {this.requestHeader = requestHeader;this.replyHeader = replyHeader;this.request = request;this.response = response;this.readOnly = readOnly;this.watchRegistration = watchRegistration;}public void createBB() {try {ByteArrayOutputStream baos = new ByteArrayOutputStream();BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);boa.writeInt(-1, "len"); // We'll fill this in laterif (requestHeader != null) { requestHeader.serialize(boa, "header");}if (request instanceof ConnectRequest) {request.serialize(boa, "connect");// append "am-I-allowed-to-be-readonly" flagboa.writeBool(readOnly, "readOnly");} else if (request != null) {request.serialize(boa, "request");}baos.close();this.bb = ByteBuffer.wrap(baos.toByteArray());this.bb.putInt(this.bb.capacity() - 4);this.bb.rewind();} catch (IOException e) {LOG.warn("Ignoring unexpected exception", e);}}@Overridepublic String toString() {StringBuilder sb = new StringBuilder();sb.append("clientPath:" + clientPath);sb.append(" serverPath:" + serverPath);sb.append(" finished:" + finished);sb.append(" header:: " + requestHeader);sb.append(" replyHeader:: " + replyHeader);sb.append(" request:: " + request);sb.append(" response:: " + response);// jute toString is horrible, remove unnecessary newlinesreturn sb.toString().replaceAll("\r*\n+", " ");}}從createBB方法中,我們看到在底層實(shí)際的網(wǎng)絡(luò)傳輸序列化中,zookeeper只會(huì)講requestHeader和request兩個(gè)屬性進(jìn)行序列化,即只有這兩個(gè)會(huì)被序列化到底層字節(jié)數(shù)組中去進(jìn)行網(wǎng)絡(luò)傳輸,不會(huì)將watchRegistration相關(guān)的信息進(jìn)行網(wǎng)絡(luò)傳輸。
2. 更新最后一次發(fā)送updateLastSend
void updateLastSend() {this.lastSend = now;}3. 使用nio channel 發(fā)送字節(jié)緩存到server
channel.write(ChannelBuffers.wrappedBuffer(p.bb));
其中,bb的類型為ByteBuffer,在packet中進(jìn)行了初始化。
this.bb = ByteBuffer.wrap(baos.toByteArray());this.bb.putInt(this.bb.capacity() - 4);this.bb.rewind();?
小結(jié):
zookeeper客戶端和服務(wù)器的連接主要是通過(guò)ClientCnxnSocket來(lái)實(shí)現(xiàn)的,有兩個(gè)具體的實(shí)現(xiàn)類ClientCnxnSocketNetty和ClientCnxnSocketNIO,其工作流程如下:
先判斷是否連接,沒(méi)有連接則調(diào)用connect方法進(jìn)行連接,有連接則進(jìn)入下一步;
然后調(diào)用doTransport方法進(jìn)行通信,若連接過(guò)程中出現(xiàn)異常,則調(diào)用cleanup()方法;
最后關(guān)閉連接。
上述的發(fā)現(xiàn)可以在SendThread的run方法中體現(xiàn)。
?
另:Zookeeper的特性--》順序一致性:按照客戶端發(fā)送請(qǐng)求的順序更新數(shù)據(jù)。我們?cè)賡endThread里可以看到多次更新時(shí)間戳來(lái)保證順序一致性,如下:
?
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/5000927.html
總結(jié)
以上是生活随笔為你收集整理的zookeeper源码分析之三客户端发送请求流程的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: java set转list,数组与lis
- 下一篇: zookeeper源码分析之四服务端(单