聊聊storm TridentBoltExecutor的finishBatch方法
生活随笔
收集整理的這篇文章主要介紹了
聊聊storm TridentBoltExecutor的finishBatch方法
小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,幫大家做個(gè)參考.
序
本文主要研究一下storm TridentBoltExecutor的finishBatch方法
MasterBatchCoordinator.nextTuple
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/MasterBatchCoordinator.java
public void nextTuple() {sync();}private void sync() {// note that sometimes the tuples active may be less than max_spout_pending, e.g.// max_spout_pending = 3// tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2 (because tx 1 isn't committed yet),// and there won't be a batch for tx 4 because there's max_spout_pending tx activeTransactionStatus maybeCommit = _activeTx.get(_currTransaction);if(maybeCommit!=null && maybeCommit.status == AttemptStatus.PROCESSED) {maybeCommit.status = AttemptStatus.COMMITTING;_collector.emit(COMMIT_STREAM_ID, new Values(maybeCommit.attempt), maybeCommit.attempt);LOG.debug("Emitted on [stream = {}], [tx_status = {}], [{}]", COMMIT_STREAM_ID, maybeCommit, this);}if(_active) {if(_activeTx.size() < _maxTransactionActive) {Long curr = _currTransaction;for(int i=0; i<_maxTransactionActive; i++) {if(!_activeTx.containsKey(curr) && isReady(curr)) {// by using a monotonically increasing attempt id, downstream tasks// can be memory efficient by clearing out state for old attempts// as soon as they see a higher attempt id for a transactionInteger attemptId = _attemptIds.get(curr);if(attemptId==null) {attemptId = 0;} else {attemptId++;}_attemptIds.put(curr, attemptId);for(TransactionalState state: _states) {state.setData(CURRENT_ATTEMPTS, _attemptIds);}TransactionAttempt attempt = new TransactionAttempt(curr, attemptId);final TransactionStatus newTransactionStatus = new TransactionStatus(attempt);_activeTx.put(curr, newTransactionStatus);_collector.emit(BATCH_STREAM_ID, new Values(attempt), attempt);LOG.debug("Emitted on [stream = {}], [tx_attempt = {}], [tx_status = {}], [{}]", BATCH_STREAM_ID, attempt, newTransactionStatus, this);_throttler.markEvent();}curr = nextTransactionId(curr);}}}} 復(fù)制代碼- MasterBatchCoordinator是整個(gè)trident的真正的spout,它的nextTuple方法會(huì)向TridentSpoutCoordinator向MasterBatchCoordinator.BATCH_STREAM_ID($batch)發(fā)射tuple
TridentSpoutCoordinator.execute
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/spout/TridentSpoutCoordinator.java
public void execute(Tuple tuple, BasicOutputCollector collector) {TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);if(tuple.getSourceStreamId().equals(MasterBatchCoordinator.SUCCESS_STREAM_ID)) {_state.cleanupBefore(attempt.getTransactionId());_coord.success(attempt.getTransactionId());} else {long txid = attempt.getTransactionId();Object prevMeta = _state.getPreviousState(txid);Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));_state.overrideState(txid, meta);collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));}} 復(fù)制代碼- TridentSpoutCoordinator接收MasterBatchCoordinator在MasterBatchCoordinator.BATCH_STREAM_ID($batch)發(fā)過(guò)來(lái)的tuple,然后向包裝用戶spout的TridentBoltExecutor發(fā)送batch指令
TridentBoltExecutor(TridentSpoutExecutor)
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java
public void execute(Tuple tuple) {if(TupleUtils.isTick(tuple)) {long now = System.currentTimeMillis();if(now - _lastRotate > _messageTimeoutMs) {_batches.rotate();_lastRotate = now;}return;}String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());if(batchGroup==null) {// this is so we can do things like have simple DRPC that doesn't need to use batch processing_coordCollector.setCurrBatch(null);_bolt.execute(null, tuple);_collector.ack(tuple);return;}IBatchID id = (IBatchID) tuple.getValue(0);//get transaction id//if it already exists and attempt id is greater than the attempt thereTrackedBatch tracked = (TrackedBatch) _batches.get(id.getId()); // if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) { // System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex() // + " (" + _batches.size() + ")" + // "\ntuple: " + tuple + // "\nwith tracked " + tracked + // "\nwith id " + id + // "\nwith group " + batchGroup // + "\n"); // // }//System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());// this code here ensures that only one attempt is ever tracked for a batch, so when// failures happen you don't get an explosion in memory usage in the tasksif(tracked!=null) {if(id.getAttemptId() > tracked.attemptId) {_batches.remove(id.getId());tracked = null;} else if(id.getAttemptId() < tracked.attemptId) {// no reason to try to execute a previous attempt than we've already seenreturn;}}if(tracked==null) {tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());_batches.put(id.getId(), tracked);}_coordCollector.setCurrBatch(tracked);//System.out.println("TRACKED: " + tracked + " " + tuple);TupleType t = getTupleType(tuple, tracked);if(t==TupleType.COMMIT) {tracked.receivedCommit = true;checkFinish(tracked, tuple, t);} else if(t==TupleType.COORD) {int count = tuple.getInteger(1);tracked.reportedTasks++;tracked.expectedTupleCount+=count;checkFinish(tracked, tuple, t);} else {tracked.receivedTuples++;boolean success = true;try {_bolt.execute(tracked.info, tuple);if(tracked.condition.expectedTaskReports==0) {success = finishBatch(tracked, tuple);}} catch(FailedException e) {failBatch(tracked, e);}if(success) {_collector.ack(tuple); } else {_collector.fail(tuple);}}_coordCollector.setCurrBatch(null);}private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {boolean success = true;try {_bolt.finishBatch(tracked.info);String stream = COORD_STREAM(tracked.info.batchGroup);for(Integer task: tracked.condition.targetTasks) {_collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));}if(tracked.delayedAck!=null) {_collector.ack(tracked.delayedAck);tracked.delayedAck = null;}} catch(FailedException e) {failBatch(tracked, e);success = false;}_batches.remove(tracked.info.batchId.getId());return success;} 復(fù)制代碼- TridentBoltExecutor.execute方法,首先會(huì)創(chuàng)建并初始化TrackedBatch(如果TrackedBatch不存在的話),之后接收到batch指令的時(shí)候,對(duì)tracked.receivedTuple累加,然后調(diào)用_bolt.execute(tracked.info, tuple)
- 對(duì)于spout來(lái)說(shuō),這里的_bolt是TridentSpoutExecutor,它的execute方法會(huì)往下游的TridentBoltExecutor發(fā)射一個(gè)batch的tuples;由于spout的expectedTaskReports==0,所以這里在調(diào)用完TridentSpoutExecutor發(fā)射batch的tuples時(shí),它就立馬調(diào)用finishBatch
- finishBatch操作,這里會(huì)通過(guò)COORD_STREAM往下游的TridentBoltExecutor發(fā)射[id,count]數(shù)據(jù),告知下游TridentBoltExecutor說(shuō)它一共發(fā)射了多少tuples
TridentBoltExecutor(SubtopologyBolt)
storm-core-1.2.2-sources.jar!/org/apache/storm/trident/topology/TridentBoltExecutor.java
@Overridepublic void execute(Tuple tuple) {if(TupleUtils.isTick(tuple)) {long now = System.currentTimeMillis();if(now - _lastRotate > _messageTimeoutMs) {_batches.rotate();_lastRotate = now;}return;}String batchGroup = _batchGroupIds.get(tuple.getSourceGlobalStreamId());if(batchGroup==null) {// this is so we can do things like have simple DRPC that doesn't need to use batch processing_coordCollector.setCurrBatch(null);_bolt.execute(null, tuple);_collector.ack(tuple);return;}IBatchID id = (IBatchID) tuple.getValue(0);//get transaction id//if it already exists and attempt id is greater than the attempt thereTrackedBatch tracked = (TrackedBatch) _batches.get(id.getId()); // if(_batches.size() > 10 && _context.getThisTaskIndex() == 0) { // System.out.println("Received in " + _context.getThisComponentId() + " " + _context.getThisTaskIndex() // + " (" + _batches.size() + ")" + // "\ntuple: " + tuple + // "\nwith tracked " + tracked + // "\nwith id " + id + // "\nwith group " + batchGroup // + "\n"); // // }//System.out.println("Num tracked: " + _batches.size() + " " + _context.getThisComponentId() + " " + _context.getThisTaskIndex());// this code here ensures that only one attempt is ever tracked for a batch, so when// failures happen you don't get an explosion in memory usage in the tasksif(tracked!=null) {if(id.getAttemptId() > tracked.attemptId) {_batches.remove(id.getId());tracked = null;} else if(id.getAttemptId() < tracked.attemptId) {// no reason to try to execute a previous attempt than we've already seenreturn;}}if(tracked==null) {tracked = new TrackedBatch(new BatchInfo(batchGroup, id, _bolt.initBatchState(batchGroup, id)), _coordConditions.get(batchGroup), id.getAttemptId());_batches.put(id.getId(), tracked);}_coordCollector.setCurrBatch(tracked);//System.out.println("TRACKED: " + tracked + " " + tuple);TupleType t = getTupleType(tuple, tracked);if(t==TupleType.COMMIT) {tracked.receivedCommit = true;checkFinish(tracked, tuple, t);} else if(t==TupleType.COORD) {int count = tuple.getInteger(1);tracked.reportedTasks++;tracked.expectedTupleCount+=count;checkFinish(tracked, tuple, t);} else {tracked.receivedTuples++;boolean success = true;try {_bolt.execute(tracked.info, tuple);if(tracked.condition.expectedTaskReports==0) {success = finishBatch(tracked, tuple);}} catch(FailedException e) {failBatch(tracked, e);}if(success) {_collector.ack(tuple); } else {_collector.fail(tuple);}}_coordCollector.setCurrBatch(null);}private void checkFinish(TrackedBatch tracked, Tuple tuple, TupleType type) {if(tracked.failed) {failBatch(tracked);_collector.fail(tuple);return;}CoordCondition cond = tracked.condition;boolean delayed = tracked.delayedAck==null &&(cond.commitStream!=null && type==TupleType.COMMIT|| cond.commitStream==null);if(delayed) {tracked.delayedAck = tuple;}boolean failed = false;if(tracked.receivedCommit && tracked.reportedTasks == cond.expectedTaskReports) {if(tracked.receivedTuples == tracked.expectedTupleCount) {finishBatch(tracked, tuple); } else {//TODO: add logging that not all tuples were receivedfailBatch(tracked);_collector.fail(tuple);failed = true;}}if(!delayed && !failed) {_collector.ack(tuple);}}private boolean finishBatch(TrackedBatch tracked, Tuple finishTuple) {boolean success = true;try {_bolt.finishBatch(tracked.info);String stream = COORD_STREAM(tracked.info.batchGroup);for(Integer task: tracked.condition.targetTasks) {_collector.emitDirect(task, stream, finishTuple, new Values(tracked.info.batchId, Utils.get(tracked.taskEmittedTuples, task, 0)));}if(tracked.delayedAck!=null) {_collector.ack(tracked.delayedAck);tracked.delayedAck = null;}} catch(FailedException e) {failBatch(tracked, e);success = false;}_batches.remove(tracked.info.batchId.getId());return success;} 復(fù)制代碼- TridentBoltExecutor(SubtopologyBolt)是spout下游的bolt,它的_bolt是SubtopologyBolt,而且它的tracked.condition.expectedTaskReports不為0,因而它是在接收到TupleType.COORD的tuple的時(shí)候,才進(jìn)行checkFinish操作(這里先忽略TupleType.COMMIT類型)
- 由于BoltExecutor是使用Utils.asyncLoop來(lái)挨個(gè)消費(fèi)receiveQueue的數(shù)據(jù)的,而且emitBatch的時(shí)候也是挨個(gè)接收batch的tuples,最后再接收到TridentBoltExecutor(TridentSpoutExecutor)在finishBatch的時(shí)候通過(guò)COORD_STREAM發(fā)過(guò)來(lái)的[id,count]的tuple(注意這里的COORD_STREAM是分發(fā)給每個(gè)task的,如果TridentBoltExecutor有多個(gè)parallel,則他們是按各自的task來(lái)接收的)
- 所以TridentBoltExecutor(SubtopologyBolt)先挨個(gè)處理每個(gè)tuple,處理完之后才輪到TupleType.COORD這個(gè)tuple,然后觸發(fā)checkFinish操作;在沒(méi)有commitStream的情況下,tracked.receivedCommit默認(rèn)為true,因而這里只要檢測(cè)收到的tuples與應(yīng)收的tuples數(shù)一致,就執(zhí)行_bolt.finishBatch操作完成一個(gè)batch,然后再往它的下游TridentBoltExecutor發(fā)射它應(yīng)收的[id,count]的tuple
小結(jié)
- 對(duì)于trident來(lái)說(shuō),真正的spout是MasterBatchCoordinator,它的nextTuple會(huì)觸發(fā)batch的發(fā)送,它將batch指令發(fā)送給TridentSpoutCoordinator,而TridentSpoutCoordinator將觸發(fā)TridentBoltExecutor(TridentSpoutExecutor)的execute方法,進(jìn)而觸發(fā)ITridentSpout的emitter的emitBatch,從而發(fā)送一個(gè)batch的數(shù)據(jù)
- TridentBoltExecutor(TridentSpoutExecutor)的expectedTaskReports==0,它在調(diào)用完TridentSpoutExecutor發(fā)射batch的tuples時(shí),就立馬調(diào)用finishBatch操作,通過(guò)COORD_STREAM往下游的TridentBoltExecutor發(fā)射[id,count]數(shù)據(jù),告知下游TridentBoltExecutor說(shuō)它一共發(fā)射了多少tuples
- spout的下游bolt為TridentBoltExecutor(SubtopologyBolt),它的tracked.condition.expectedTaskReports不為0,因而它是在接收到TupleType.COORD的tuple的時(shí)候,才進(jìn)行checkFinish操作(這里先忽略TupleType.COMMIT類型),由于spout是先執(zhí)行emitBatch操作再最后finishBatch發(fā)送[id,count]數(shù)據(jù),正常情況下按順序進(jìn)入到TridentBoltExecutor(SubtopologyBolt)的receiveQueue隊(duì)列,然后TridentBoltExecutor(SubtopologyBolt)挨個(gè)消費(fèi)tuple,調(diào)用SubtopologyBolt.execute,最后再處理[id,count]數(shù)據(jù),觸發(fā)checkFinish操作,只要檢測(cè)收到的tuples與應(yīng)收的tuples數(shù)一致,就執(zhí)行SubtopologyBolt.finishBatch操作完成這個(gè)batch,然后再往它的下游TridentBoltExecutor發(fā)射它應(yīng)收的[id,count]的tuple
doc
- Trident Tutorial
- 聊聊storm worker的executor與task
- 聊聊storm的AggregateProcessor的execute及finishBatch方法
總結(jié)
以上是生活随笔為你收集整理的聊聊storm TridentBoltExecutor的finishBatch方法的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: iOS轻量分组日志工具 Log4OC
- 下一篇: python处理u开头的字符串