zookeeper源码分析之五服务端(集群leader)处理请求流程
?leader的實現類為LeaderZooKeeperServer,它間接繼承自標準ZookeeperServer。它規定了請求到達leader時需要經歷的路徑:
PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor
具體情況可以參看代碼:
@Overrideprotected void setupRequestProcessors() {RequestProcessor finalProcessor = new FinalRequestProcessor(this);RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(finalProcessor, getLeader());commitProcessor = new CommitProcessor(toBeAppliedProcessor,Long.toString(getServerId()), false,getZooKeeperServerListener());commitProcessor.start();ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,commitProcessor);proposalProcessor.initialize();prepRequestProcessor = new PrepRequestProcessor(this, proposalProcessor);prepRequestProcessor.start();firstProcessor = new LeaderRequestProcessor(this, prepRequestProcessor);setupContainerManager();}?讓我們一步步分析這些RP都做了什么工作?其中PrepRequestProcessor、FinalRequestProcessor已經在上篇文章中做了分析:
zookeeper源碼分析之四服務端(單機)處理請求流程
那我們就開始余下的RP吧
1.?ProposalRequestProcessor
這個RP僅僅將請求轉發到AckRequestProcessor和SyncRequestProcessor上,看具體代碼:
public void processRequest(Request request) throws RequestProcessorException {// LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +// request.type + " id = " + request.sessionId);// request.addRQRec(">prop");/* In the following IF-THEN-ELSE block, we process syncs on the leader.* If the sync is coming from a follower, then the follower* handler adds it to syncHandler. Otherwise, if it is a client of* the leader that issued the sync command, then syncHandler won't* contain the handler. In this case, we add it to syncHandler, and* call processRequest on the next processor.*/if (request instanceof LearnerSyncRequest){zks.getLeader().processSync((LearnerSyncRequest)request);} else {nextProcessor.processRequest(request);if (request.getHdr() != null) {// We need to sync and get consensus on any transactionstry {zks.getLeader().propose(request);} catch (XidRolloverException e) {throw new RequestProcessorException(e.getMessage(), e);}syncProcessor.processRequest(request);}}}SyncRequestProcessor 我們已經在上文中進行了分析,這里就不在贅述了,那就看看AckRequestProcessor的工作是什么吧?
AckRequestProcessor僅僅將發送過來的請求作為ACk轉發給leader。代碼見明細:
/*** Forward the request as an ACK to the leader*/public void processRequest(Request request) {QuorumPeer self = leader.self;if(self != null)leader.processAck(self.getId(), request.zxid, null);elseLOG.error("Null QuorumPeer");}leader處理請求如下所示:
/*** Keep a count of acks that are received by the leader for a particular* proposal** @param zxid, the zxid of the proposal sent out* @param sid, the id of the server that sent the ack* @param followerAddr*/synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) { if (!allowedToCommit) return; // last op committed was a leader change - from now on // the new leader should commit if (LOG.isTraceEnabled()) {LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));for (Proposal p : outstandingProposals.values()) {long packetZxid = p.packet.getZxid();LOG.trace("outstanding proposal: 0x{}",Long.toHexString(packetZxid));}LOG.trace("outstanding proposals all");}if ((zxid & 0xffffffffL) == 0) {/** We no longer process NEWLEADER ack with this method. However,* the learner sends an ack back to the leader after it gets* UPTODATE, so we just ignore the message.*/return;}if (outstandingProposals.size() == 0) {if (LOG.isDebugEnabled()) {LOG.debug("outstanding is 0");}return;}if (lastCommitted >= zxid) {if (LOG.isDebugEnabled()) {LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}",Long.toHexString(lastCommitted), Long.toHexString(zxid));}// The proposal has already been committedreturn;}Proposal p = outstandingProposals.get(zxid);if (p == null) {LOG.warn("Trying to commit future proposal: zxid 0x{} from {}",Long.toHexString(zxid), followerAddr);return;}p.addAck(sid); /*if (LOG.isDebugEnabled()) {LOG.debug("Count for zxid: 0x{} is {}",Long.toHexString(zxid), p.ackSet.size());}*/boolean hasCommitted = tryToCommit(p, zxid, followerAddr);// If p is a reconfiguration, multiple other operations may be ready to be committed,// since operations wait for different sets of acks.// Currently we only permit one outstanding reconfiguration at a time// such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is// pending all wait for a quorum of old and new config, so its not possible to get enough acks// for an operation without getting enough acks for preceding ops. But in the future if multiple// concurrent reconfigs are allowed, this can happen and then we need to check whether some pending// ops may already have enough acks and can be committed, which is what this code does.if (hasCommitted && p.request!=null && p.request.getHdr().getType() == OpCode.reconfig){long curZxid = zxid;while (allowedToCommit && hasCommitted && p!=null){curZxid++;p = outstandingProposals.get(curZxid);if (p !=null) hasCommitted = tryToCommit(p, curZxid, null); }}}調用實現,最終由CommitProcessor 接著處理請求:
/*** @return True if committed, otherwise false.* @param a proposal p**/synchronized public boolean tryToCommit(Proposal p, long zxid, SocketAddress followerAddr) { // make sure that ops are committed in order. With reconfigurations it is now possible// that different operations wait for different sets of acks, and we still want to enforce// that they are committed in order. Currently we only permit one outstanding reconfiguration// such that the reconfiguration and subsequent outstanding ops proposed while the reconfig is// pending all wait for a quorum of old and new config, so its not possible to get enough acks// for an operation without getting enough acks for preceding ops. But in the future if multiple// concurrent reconfigs are allowed, this can happen.if (outstandingProposals.containsKey(zxid - 1)) return false;// getting a quorum from all necessary configurationsif (!p.hasAllQuorums()) {return false; }// commit proposals in orderif (zxid != lastCommitted+1) { LOG.warn("Commiting zxid 0x" + Long.toHexString(zxid)+ " from " + followerAddr + " not first!");LOG.warn("First is "+ (lastCommitted+1));} // in order to be committed, a proposal must be accepted by a quorum outstandingProposals.remove(zxid);if (p.request != null) {toBeApplied.add(p);}if (p.request == null) {LOG.warn("Going to commmit null: " + p);} else if (p.request.getHdr().getType() == OpCode.reconfig) { LOG.debug("Committing a reconfiguration! " + outstandingProposals.size()); //if this server is voter in new config with the same quorum address, //then it will remain the leader//otherwise an up-to-date follower will be designated as leader. This saves//leader election time, unless the designated leader fails Long designatedLeader = getDesignatedLeader(p, zxid);//LOG.warn("designated leader is: " + designatedLeader); QuorumVerifier newQV = p.qvAcksetPairs.get(p.qvAcksetPairs.size()-1).getQuorumVerifier();self.processReconfig(newQV, designatedLeader, zk.getZxid(), true);if (designatedLeader != self.getId()) {allowedToCommit = false;}// we're sending the designated leader, and if the leader is changing the followers are // responsible for closing the connection - this way we are sure that at least a majority of them // receive the commit message. commitAndActivate(zxid, designatedLeader);informAndActivate(p, designatedLeader);//turnOffFollowers();} else { commit(zxid);inform(p);}zk.commitProcessor.commit(p.request);if(pendingSyncs.containsKey(zxid)){for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {sendSync(r);} } return true; }該程序第一步是發送一個請求到Quorum的所有成員
/*** Create a commit packet and send it to all the members of the quorum** @param zxid*/public void commit(long zxid) {synchronized(this){lastCommitted = zxid;}QuorumPacket qp = new QuorumPacket(Leader.COMMIT, zxid, null, null);sendPacket(qp);}發送報文如下:
/*** send a packet to all the followers ready to follow** @param qp* the packet to be sent*/void sendPacket(QuorumPacket qp) {synchronized (forwardingFollowers) {for (LearnerHandler f : forwardingFollowers) {f.queuePacket(qp);}}}第二步是通知Observer
/*** Create an inform packet and send it to all observers.* @param zxid* @param proposal*/public void inform(Proposal proposal) {QuorumPacket qp = new QuorumPacket(Leader.INFORM, proposal.request.zxid,proposal.packet.getData(), null);sendObserverPacket(qp);}發送observer程序如下:
/*** send a packet to all observers*/void sendObserverPacket(QuorumPacket qp) {for (LearnerHandler f : getObservingLearners()) {f.queuePacket(qp);}}第三步到
zk.commitProcessor.commit(p.request);2.?CommitProcessor
CommitProcessor是多線程的,線程間通信通過queue,atomic和wait/notifyAll同步。CommitProcessor扮演一個網關角色,允許請求到剩下的處理管道。在同一瞬間,它支持多個讀請求而僅支持一個寫請求,這是為了保證寫請求在事務中的順序。
? ? 1個commit處理主線程,它監控請求隊列,并將請求分發到工作線程,分發過程基于sessionId,這樣特定session的讀寫請求通常分發到同一個線程,因而可以保證運行的順序。
0~N個工作進程,他們在請求上運行剩下的請求處理管道。如果配置為0個工作線程,主commit線程將會直接運行管道。
經典(默認)線程數是:在32核的機器上,一個commit處理線程和32個工作線程。
多線程的限制:
每個session的請求處理必須是順序的。
寫請求處理必須按照zxid順序。
必須保證一個session內不會出現寫條件競爭,條件競爭可能導致另外一個session的讀請求觸發監控。
當前實現解決第三個限制,僅僅通過不允許在寫請求時允許讀進程的處理。
@Overridepublic void run() {Request request;try {while (!stopped) {synchronized(this) {while (!stopped &&((queuedRequests.isEmpty() || isWaitingForCommit() || isProcessingCommit()) &&(committedRequests.isEmpty() || isProcessingRequest()))) {wait();}}/** Processing queuedRequests: Process the next requests until we* find one for which we need to wait for a commit. We cannot* process a read request while we are processing write request.*/while (!stopped && !isWaitingForCommit() &&!isProcessingCommit() &&(request = queuedRequests.poll()) != null) {if (needCommit(request)) {nextPending.set(request);} else {sendToNextProcessor(request);}}/** Processing committedRequests: check and see if the commit* came in for the pending request. We can only commit a* request when there is no other request being processed.*/processCommitted();}} catch (Throwable e) {handleException(this.getName(), e);}LOG.info("CommitProcessor exited loop!");}主邏輯程序如下:
/** Separated this method from the main run loop* for test purposes (ZOOKEEPER-1863)*/protected void processCommitted() {Request request;if (!stopped && !isProcessingRequest() &&(committedRequests.peek() != null)) {/** ZOOKEEPER-1863: continue only if there is no new request* waiting in queuedRequests or it is waiting for a* commit. */if ( !isWaitingForCommit() && !queuedRequests.isEmpty()) {return;}request = committedRequests.poll();/** We match with nextPending so that we can move to the* next request when it is committed. We also want to* use nextPending because it has the cnxn member set* properly.*/Request pending = nextPending.get();if (pending != null &&pending.sessionId == request.sessionId &&pending.cxid == request.cxid) {// we want to send our version of the request.// the pointer to the connection in the request pending.setHdr(request.getHdr());pending.setTxn(request.getTxn());pending.zxid = request.zxid;// Set currentlyCommitting so we will block until this// completes. Cleared by CommitWorkRequest after// nextProcessor returns. currentlyCommitting.set(pending);nextPending.set(null);sendToNextProcessor(pending);} else {// this request came from someone else so just// send the commit packet currentlyCommitting.set(request); sendToNextProcessor(request);}} }啟動多線程處理程序
/*** Schedule final request processing; if a worker thread pool is not being* used, processing is done directly by this thread.*/private void sendToNextProcessor(Request request) {numRequestsProcessing.incrementAndGet();workerPool.schedule(new CommitWorkRequest(request), request.sessionId);}真實邏輯是
/*** Schedule work to be done by the thread assigned to this id. Thread* assignment is a single mod operation on the number of threads. If a* worker thread pool is not being used, work is done directly by* this thread.*/public void schedule(WorkRequest workRequest, long id) {if (stopped) {workRequest.cleanup();return;}ScheduledWorkRequest scheduledWorkRequest =new ScheduledWorkRequest(workRequest);// If we have a worker thread pool, use that; otherwise, do the work// directly.int size = workers.size();if (size > 0) {try {// make sure to map negative ids as well to [0, size-1]int workerNum = ((int) (id % size) + size) % size;ExecutorService worker = workers.get(workerNum);worker.execute(scheduledWorkRequest);} catch (RejectedExecutionException e) {LOG.warn("ExecutorService rejected execution", e);workRequest.cleanup();}} else {// When there is no worker thread pool, do the work directly// and wait for its completion scheduledWorkRequest.start();try {scheduledWorkRequest.join();} catch (InterruptedException e) {LOG.warn("Unexpected exception", e);Thread.currentThread().interrupt();}}}請求處理線程run方法:
@Overridepublic void run() {try {// Check if stopped while request was on queueif (stopped) {workRequest.cleanup();return;}workRequest.doWork();} catch (Exception e) {LOG.warn("Unexpected exception", e);workRequest.cleanup();}}調用commitProcessor的doWork方法
public void doWork() throws RequestProcessorException {try { nextProcessor.processRequest(request);} finally {// If this request is the commit request that was blocking// the processor, clear.currentlyCommitting.compareAndSet(request, null);/** Decrement outstanding request count. The processor may be* blocked at the moment because it is waiting for the pipeline* to drain. In that case, wake it up if there are pending* requests.*/if (numRequestsProcessing.decrementAndGet() == 0) {if (!queuedRequests.isEmpty() ||!committedRequests.isEmpty()) {wakeup();}}}}將請求傳遞給下一個RP:Leader.ToBeAppliedRequestProcessor
3.Leader.ToBeAppliedRequestProcessor
Leader.ToBeAppliedRequestProcessor僅僅維護一個toBeApplied列表。
/*** This request processor simply maintains the toBeApplied list. For* this to work next must be a FinalRequestProcessor and* FinalRequestProcessor.processRequest MUST process the request* synchronously!** @param next* a reference to the FinalRequestProcessor*/ToBeAppliedRequestProcessor(RequestProcessor next, Leader leader) {if (!(next instanceof FinalRequestProcessor)) {throw new RuntimeException(ToBeAppliedRequestProcessor.class.getName()+ " must be connected to "+ FinalRequestProcessor.class.getName()+ " not "+ next.getClass().getName());}this.leader = leader;this.next = next;}/** (non-Javadoc)** @see org.apache.zookeeper.server.RequestProcessor#processRequest(org.apache.zookeeper.server.Request)*/public void processRequest(Request request) throws RequestProcessorException {next.processRequest(request);// The only requests that should be on toBeApplied are write// requests, for which we will have a hdr. We can't simply use// request.zxid here because that is set on read requests to equal// the zxid of the last write op.if (request.getHdr() != null) {long zxid = request.getHdr().getZxid();Iterator<Proposal> iter = leader.toBeApplied.iterator();if (iter.hasNext()) {Proposal p = iter.next();if (p.request != null && p.request.zxid == zxid) {iter.remove();return;}}LOG.error("Committed request not found on toBeApplied: "+ request);}}4.?FinalRequestProcessor前文已經說明,本文不在贅述。
?
小結:從上面的分析可以知道,leader處理請求的順序分別是:PrepRequestProcessor -> ProposalRequestProcessor ->CommitProcessor -> Leader.ToBeAppliedRequestProcessor ->FinalRequestProcessor。
請求先通過PrepRequestProcessor接收請求,并進行包裝,然后請求類型的不同,設置同享數據;主要負責通知所有follower和observer;CommitProcessor 啟動多線程處理請求;Leader.ToBeAppliedRequestProcessor僅僅維護一個toBeApplied列表;
FinalRequestProcessor來作為消息處理器的終結者,發送響應消息,并觸發watcher的處理程序。
?
轉載于:https://www.cnblogs.com/davidwang456/p/5004599.html
總結
以上是生活随笔為你收集整理的zookeeper源码分析之五服务端(集群leader)处理请求流程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 技术高手如何炼成?--转自知乎
- 下一篇: 马云:员工的离职原因--转载