Heritrix 3.1.0 源码解析(六)
本文分析BdbFrontier對象的相關狀態(tài)和方法
BdbFrontier類繼承自WorkQueueFrontier類 ? WorkQueueFrontier類繼承自AbstractFrontier類
BdbFrontier類的void start()方法如下(在其父類WorkQueueFrontier里面):
org.archive.crawler.frontier.BdbFrontier
? ? ? ? org.archive.crawler.frontier.WorkQueueFrontier
public void start() {if(isRunning()) {return; }uriUniqFilter.setDestination(this);super.start();try {initInternalQueues();} catch (Exception e) {throw new IllegalStateException(e);}}調(diào)用父類AbstractFrontier的void start()方法
public void start() {if(isRunning()) {return; }if (getRecoveryLogEnabled()) try {initJournal(loggerModule.getPath().getFile().getAbsolutePath());} catch (IOException e) {throw new IllegalStateException(e);}pause();startManagerThread();}首先設置當前對象(BdbFrontier)為State.PAUSE狀態(tài),然后調(diào)用void startManagerThread()方法
/*** Start the dedicated thread with an independent view of the frontier's* state. */protected void startManagerThread() {managerThread = new Thread(this+".managerThread") {public void run() {AbstractFrontier.this.managementTasks();}};managerThread.setPriority(Thread.NORM_PRIORITY+1); managerThread.start();}在線程對象Thread managerThread里面調(diào)用void managementTasks()方法
/*** Main loop of frontier's managerThread. Only exits when State.FINISH * is requested (perhaps automatically at URI exhaustion) and reached. * * General strategy is to try to fill outbound queue, then process an* item from inbound queue, and repeat. A HOLD (to be implemented) or * PAUSE puts frontier into a stable state that won't be changed* asynchronously by worker thread activity. */protected void managementTasks() {assert Thread.currentThread() == managerThread;try {loop: while (true) {try {State reachedState = null; switch (targetState) {case EMPTY:reachedState = State.EMPTY; case RUN:// enable outbound takes if previously lockedwhile(outboundLock.isWriteLockedByCurrentThread()) {outboundLock.writeLock().unlock();}if(reachedState==null) {reachedState = State.RUN; }reachedState(reachedState);Thread.sleep(1000);if(isEmpty()&&targetState==State.RUN) {requestState(State.EMPTY); } else if (!isEmpty()&&targetState==State.EMPTY) {requestState(State.RUN); }break;case HOLD:// TODO; for now treat same as PAUSEcase PAUSE:// pausing// prevent all outbound takes outboundLock.writeLock().lock();// process all inboundwhile (targetState == State.PAUSE) {if (getInProcessCount()==0) {reachedState(State.PAUSE);}Thread.sleep(1000);}break;case FINISH:// prevent all outbound takes outboundLock.writeLock().lock();// process all inboundwhile (getInProcessCount()>0) {Thread.sleep(1000);}finalTasks(); // TODO: more cleanup? reachedState(State.FINISH);break loop;}} catch (RuntimeException e) {// log, try to pause, continuelogger.log(Level.SEVERE,"",e);if(targetState!=State.PAUSE && targetState!=State.FINISH) {requestState(State.PAUSE);}}}} catch (InterruptedException e) {throw new RuntimeException(e);} // try to leave in safely restartable state: targetState = State.PAUSE;while(outboundLock.isWriteLockedByCurrentThread()) {outboundLock.writeLock().unlock();}//TODO: ensure all other structures are cleanly reset on restart logger.log(Level.FINE,"ending frontier mgr thread");}上面的方法是不斷的根據(jù)BdbFrontier對象當前狀態(tài)設置成員變量protected ReentrantReadWriteLock outboundLock = new ReentrantReadWriteLock(true)的鎖定狀態(tài)
后面的void initInternalQueues() 方法是初始化爬蟲任務的相關隊列
/*** Initializes internal queues. May decide to keep all queues in memory based on* {@link QueueAssignmentPolicy#maximumNumberOfKeys}. Otherwise invokes* {@link #initAllQueues()} to actually set up the queues.* * Subclasses should invoke this method with recycle set to "true" in * a private readObject method, to restore queues after a checkpoint.* * @param recycle* @throws IOException* @throws DatabaseException*/protected void initInternalQueues() throws IOException, DatabaseException {this.initOtherQueues();if (workQueueDataOnDisk()&& preparer.getQueueAssignmentPolicy().maximumNumberOfKeys() >= 0&& preparer.getQueueAssignmentPolicy().maximumNumberOfKeys() <= MAX_QUEUES_TO_HOLD_ALLQUEUES_IN_MEMORY) {this.allQueues = new ObjectIdentityMemCache<WorkQueue>(701, .9f, 100);} else {this.initAllQueues();}}首先調(diào)用BdbFrontier對象的void initOtherQueues()方法,在BdbFrontier類里面
@Overrideprotected void initOtherQueues() throws DatabaseException {boolean recycle = (recoveryCheckpoint != null);// tiny risk of OutOfMemoryError: if giant number of snoozed// queues all wake-to-ready at oncereadyClassQueues = new LinkedBlockingQueue<String>();inactiveQueuesByPrecedence = new ConcurrentSkipListMap<Integer,Queue<String>>();retiredQueues = bdb.getStoredQueue("retiredQueues", String.class, recycle);// primary snoozed queuessnoozedClassQueues = new DelayQueue<DelayedWorkQueue>();// just in case: overflow for extreme situationssnoozedOverflow = bdb.getStoredMap("snoozedOverflow", Long.class, DelayedWorkQueue.class, true, false);this.futureUris = bdb.getStoredMap("futureUris", Long.class, CrawlURI.class, true, recoveryCheckpoint!=null);// initialize master map in which other queues livethis.pendingUris = createMultipleWorkQueues();}上述方法初始化了一系列的隊列,這些隊列各自的作用待后文再分析
void initAllQueues()方法是初始化成員變量ObjectIdentityCache<WorkQueue> allQueues = null;如下,在BdbFrontier類里面
@Overrideprotected void initAllQueues() throws DatabaseException {boolean isRecovery = (recoveryCheckpoint != null);this.allQueues = bdb.getObjectCache("allqueues", isRecovery, WorkQueue.class, BdbWorkQueue.class);if(isRecovery) {// restore simple instance fields JSONObject json = recoveryCheckpoint.loadJson(beanName);try {nextOrdinal.set(json.getLong("nextOrdinal"));queuedUriCount.set(json.getLong("queuedUriCount"));futureUriCount.set(json.getLong("futureUriCount"));succeededFetchCount.set(json.getLong("succeededFetchCount"));failedFetchCount.set(json.getLong("failedFetchCount"));disregardedUriCount.set(json.getLong("disregardedUriCount"));totalProcessedBytes.set(json.getLong("totalProcessedBytes"));JSONArray inactivePrecedences = json.getJSONArray("inactivePrecedences"); // restore all intended inactiveQueuesfor(int i = 0; i < inactivePrecedences.length(); i++) {int precedence = inactivePrecedences.getInt(i);inactiveQueuesByPrecedence.put(precedence,createInactiveQueueForPrecedence(precedence,true));}} catch (JSONException e) {throw new RuntimeException(e);} // retired queues already restored with prior data in initOtherQueues// restore ready queues (those not already on inactive, retired)BufferedReader activeQueuesReader = null;try {activeQueuesReader = recoveryCheckpoint.loadReader(beanName,"active");String line; while((line = activeQueuesReader.readLine())!=null) {readyClassQueues.add(line); }} catch (IOException ioe) {throw new RuntimeException(ioe); } finally {IOUtils.closeQuietly(activeQueuesReader); }// TODO: restore largestQueues topNset? }}?ObjectIdentityCache<WorkQueue> allQueues成員用于管理BdbWorkQueue工作隊列的緩存
---------------------------------------------------------------------------
本系列Heritrix 3.1.0 源碼解析系本人原創(chuàng)
轉(zhuǎn)載請注明出處 博客園 刺猬的溫馴
本文鏈接?http://www.cnblogs.com/chenying99/archive/2013/04/18/3027677.html
總結(jié)
以上是生活随笔為你收集整理的Heritrix 3.1.0 源码解析(六)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 火狐插件火狐***插件将Firefox变
- 下一篇: 另一种同步软件Unison的使用方法