Heritrix 3.1.0 源码解析(八)
本文接著分析存儲CrawlURI curi的隊列容器,最重要的是BdbWorkQueue類及BdbMultipleWorkQueues類
BdbWorkQueue類繼承自抽象類WorkQueue,抽象類WorkQueue最重要的方法是
long enqueue(final WorkQueueFrontier frontier,CrawlURI curi)
CrawlURI peek(final WorkQueueFrontier frontier)
void dequeue(final WorkQueueFrontier frontier, CrawlURI expected)
分別為添加CrawlURI curi,獲取CrawlURI curi以及完成CrawlURI curi的方法,具體邏輯在其子類實現(這里是BdbWorkQueue類)
BdbWorkQueue類完成抽象父類的邏輯,實現上述方法的具體方法分別為
void insertItem(final WorkQueueFrontier frontier,final CrawlURI curi, boolean overwriteIfPresent)
CrawlURI peekItem(final WorkQueueFrontier frontier)
void deleteItem(final WorkQueueFrontier frontier,final CrawlURI peekItem)
在分析BdbWorkQueue類的相關方法之前,先了解一下BdbWorkQueue類對象的系統環境中的狀態
我們先來查看一下BdbWorkQueue對象在系統環境中實例化方法,在BdbFrontier對象的WorkQueue getQueueFor(final String classKey)方法里面
/*** Return the work queue for the given classKey, or null* if no such queue exists.* * @param classKey key to look for* @return the found WorkQueue*/protected WorkQueue getQueueFor(final String classKey) { WorkQueue wq = allQueues.getOrUse(classKey,new Supplier<WorkQueue>() {public BdbWorkQueue get() {String qKey = new String(classKey); // ensure private minimal keyBdbWorkQueue q = new BdbWorkQueue(qKey, BdbFrontier.this);q.setTotalBudget(getQueueTotalBudget()); //-1 System.out.println(getQueuePrecedencePolicy().getClass().getName());getQueuePrecedencePolicy().queueCreated(q);return q;}});return wq;}在BdbWorkQueue對象實例化時,傳入ClassKey和BdbFrontier對象本身,然后是設置BdbWorkQueue對象的屬性值(這些屬性值用于工作隊列的調度,后面文章再具體分析)
我們再看BdbWorkQueue類的構造方法:
/*** Create a virtual queue inside the given BdbMultipleWorkQueues * * @param classKey*/public BdbWorkQueue(String classKey, BdbFrontier frontier) {super(classKey);this.origin = BdbMultipleWorkQueues.calculateOriginKey(classKey);if (LOGGER.isLoggable(Level.FINE)) {LOGGER.fine(getPrefixClassKey(this.origin) + " " + classKey);}// add the queue-front 'cap' entry; see...// http://sourceforge.net/tracker/index.php?func=detail&aid=1262665&group_id=73833&atid=539102 frontier.getWorkQueues().addCap(origin);}在BdbWorkQueue類的構造方法里面初始化成員變量String classKey和byte[] origin(根據classkey生成的),并將它作為鍵添加到BdbMultipleWorkQueues封裝的Database pendingUrisDB數據庫(BdbMultipleWorkQueues對象為BdbFrontier對象成員BdbMultipleWorkQueues pendingUris實際數據庫名稱為pending)
下面我們來看BdbWorkQueue類的上面提到過的具體實現方法
void insertItem(final WorkQueueFrontier frontier,final CrawlURI curi, boolean overwriteIfPresent)
protected void insertItem(final WorkQueueFrontier frontier,final CrawlURI curi, boolean overwriteIfPresent) throws IOException {try {final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier).getWorkQueues();queues.put(curi, overwriteIfPresent);if (LOGGER.isLoggable(Level.FINE)) {LOGGER.fine("Inserted into " + getPrefixClassKey(this.origin) +" (count " + Long.toString(getCount())+ "): " +curi.toString());}} catch (DatabaseException e) {throw new IOException(e);}}CrawlURI peekItem(final WorkQueueFrontier frontier)
protected CrawlURI peekItem(final WorkQueueFrontier frontier)throws IOException {final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier).getWorkQueues();DatabaseEntry key = new DatabaseEntry(origin);CrawlURI curi = null;int tries = 1;while(true) {try {curi = queues.get(key);} catch (DatabaseException e) {LOGGER.log(Level.SEVERE,"peekItem failure; retrying",e);}// ensure CrawlURI, if any, came from acceptable range: if(!ArchiveUtils.startsWith(key.getData(),origin)) {LOGGER.severe("inconsistency: "+classKey+"("+getPrefixClassKey(origin)+") with " + getCount() + " items gave "+ curi +"("+getPrefixClassKey(key.getData()));// clear curi to allow retrycuri = null; // reset key to original origin for retry key.setData(origin);}if (curi!=null) {// successbreak;}if (tries>3) {LOGGER.severe("no item where expected in queue "+classKey);break;}tries++;LOGGER.severe("Trying get #" + Integer.toString(tries)+ " in queue " + classKey + " with " + getCount()+ " items using key "+ getPrefixClassKey(key.getData()));}return curi;}void deleteItem(final WorkQueueFrontier frontier,final CrawlURI peekItem)
protected void deleteItem(final WorkQueueFrontier frontier,final CrawlURI peekItem) throws IOException {try {final BdbMultipleWorkQueues queues = ((BdbFrontier) frontier).getWorkQueues();queues.delete(peekItem);} catch (DatabaseException e) {throw new IOException(e);}}接下里分析BdbMultipleWorkQueues類的相關方法,照例在分析它的方法之前了解一下該對象的系統環境中的狀態
BdbMultipleWorkQueues對象在BdbFrontier類源碼中的實例化方法如下:
/*** Create the single object (within which is one BDB database)* inside which all the other queues live. * * @return the created BdbMultipleWorkQueues* @throws DatabaseException*/protected BdbMultipleWorkQueues createMultipleWorkQueues()throws DatabaseException {Database db;boolean recycle = (recoveryCheckpoint != null);BdbModule.BdbConfig dbConfig = new BdbModule.BdbConfig();dbConfig.setAllowCreate(!recycle);// Make database deferred write: URLs that are added then removed // before a page-out is required need never cause disk IO.db = bdb.openDatabase("pending", dbConfig, recycle);return new BdbMultipleWorkQueues(db, bdb.getClassCatalog());}傳入數據庫Database db和StoredClassCatalog類型對象 (我后來注意到,這個StoredClassCatalog對象在BdbMultipleWorkQueues構造方法里面并沒有用到;這里的BDB數據庫類型轉換的EntryBinding<CrawlURI>對象沒有采用這個StoredClassCatalog對象,而是后面的EntryBinding<CrawlURI> crawlUriBinding對象)
我們再看它的構造函數
/*** Create the multi queue in the given environment. * * @param env bdb environment to use* @param classCatalog Class catalog to use.* @param recycle True if we are to reuse db content if any.* @throws DatabaseException*/public BdbMultipleWorkQueues(Database db,StoredClassCatalog classCatalog)throws DatabaseException {this.pendingUrisDB = db;crawlUriBinding =new KryoBinding<CrawlURI>(CrawlURI.class); // new RecyclingSerialBinding<CrawlURI>(classCatalog, CrawlURI.class); // new BenchmarkingBinding<CrawlURI>(new EntryBinding[] { // new KryoBinding<CrawlURI>(CrawlURI.class,true), // new KryoBinding<CrawlURI>(CrawlURI.class,false), // new RecyclingSerialBinding<CrawlURI>(classCatalog, CrawlURI.class), // }); }初始化成員變量Database pendingUrisDB和EntryBinding<CrawlURI> crawlUriBinding
我們再看它的相關方法
void put(CrawlURI curi, boolean overwriteIfPresent)?
/*** Put the given CrawlURI in at the appropriate place. * * @param curi* @throws DatabaseException*/public void put(CrawlURI curi, boolean overwriteIfPresent) throws DatabaseException {DatabaseEntry insertKey = (DatabaseEntry)curi.getHolderKey();if (insertKey == null) {insertKey = calculateInsertKey(curi);curi.setHolderKey(insertKey);}DatabaseEntry value = new DatabaseEntry();crawlUriBinding.objectToEntry(curi, value);// Output tally on avg. size if level is FINE or greater.if (LOGGER.isLoggable(Level.FINE)) {tallyAverageEntrySize(curi, value);}OperationStatus status;if(overwriteIfPresent) {status = pendingUrisDB.put(null, insertKey, value);} else {status = pendingUrisDB.putNoOverwrite(null, insertKey, value);}if (status!=OperationStatus.SUCCESS) {LOGGER.log(Level.SEVERE,"URI enqueueing failed; "+status+ " "+curi, new RuntimeException());}}上面關鍵的是我們要了解DatabaseEntry insertKey是怎么算出來的?
/*** Calculate the insertKey that places a CrawlURI in the* desired spot. First bytes are always classKey (usu. host)* based -- ensuring grouping by host -- terminated by a zero* byte. Then 8 bytes of data ensuring desired ordering * within that 'queue' are used. The first byte of these 8 is* priority -- allowing 'immediate' and 'soon' items to * sort above regular. Next 1 byte is 'precedence'. Last 6 bytes * are ordinal serial number, ensuring earlier-discovered * URIs sort before later. * * NOTE: Dangers here are:* (1) priorities or precedences over 2^7 (signed byte comparison)* (2) ordinals over 2^48* * Package access & static for testing purposes. * * @param curi* @return a DatabaseEntry key for the CrawlURI*/static DatabaseEntry calculateInsertKey(CrawlURI curi) {byte[] classKeyBytes = null;int len = 0;classKeyBytes = curi.getClassKey().getBytes(Charsets.UTF_8);len = classKeyBytes.length;byte[] keyData = new byte[len+9];System.arraycopy(classKeyBytes,0,keyData,0,len);keyData[len]=0;long ordinalPlus = curi.getOrdinal() & 0x0000FFFFFFFFFFFFL;ordinalPlus = ((long)curi.getSchedulingDirective() << 56) | ordinalPlus;long precedence = Math.min(curi.getPrecedence(), 127);ordinalPlus = (((precedence) & 0xFFL) << 48) | ordinalPlus;ArchiveUtils.longIntoByteArray(ordinalPlus, keyData, len+1);return new DatabaseEntry(keyData);}CrawlURI get(DatabaseEntry headKey),這個DatabaseEntry headKey是有BdbWorkQueue對象傳過來的,具體怎么得到的,待后文分析
/*** Get the next nearest item after the given key. Relies on * external discipline -- we'll look at the queues count of how many* items it has -- to avoid asking for something from a* range where there are no associated items --* otherwise could get first item of next 'queue' by mistake. * * <p>TODO: hold within a queue's range* * @param headKey Key prefix that demarks the beginning of the range* in <code>pendingUrisDB</code> we're interested in.* @return CrawlURI.* @throws DatabaseException*/public CrawlURI get(DatabaseEntry headKey)throws DatabaseException {DatabaseEntry result = new DatabaseEntry();// From Linda Lee of sleepycat:// "You want to check the status returned from Cursor.getSearchKeyRange// to make sure that you have OperationStatus.SUCCESS. In that case,// you have found a valid data record, and result.getData()// (called by internally by the binding code, in this case) will be// non-null. The other possible status return is// OperationStatus.NOTFOUND, in which case no data record matched// the criteria. "OperationStatus status = getNextNearestItem(headKey, result);CrawlURI retVal = null;if (status != OperationStatus.SUCCESS) {LOGGER.severe("See '1219854 NPE je-2.0 "+ "entryToObject...'. OperationStatus "+ " was not SUCCESS: "+ status+ ", headKey "+ BdbWorkQueue.getPrefixClassKey(headKey.getData()));return null;}try {retVal = (CrawlURI)crawlUriBinding.entryToObject(result);} catch (ClassCastException cce) {Object obj = crawlUriBinding.entryToObject(result);LOGGER.log(Level.SEVERE,"see [#HER-1283]: deserialized " + obj.getClass() + " has ClassLoader " + obj.getClass().getClassLoader().getClass(),cce);return null; } catch (RuntimeExceptionWrapper rw) {LOGGER.log(Level.SEVERE,"expected object missing in queue " +BdbWorkQueue.getPrefixClassKey(headKey.getData()),rw);return null; }retVal.setHolderKey(headKey);return retVal;}進一步調用OperationStatus getNextNearestItem(DatabaseEntry headKey,DatabaseEntry result)方法
protected OperationStatus getNextNearestItem(DatabaseEntry headKey,DatabaseEntry result) throws DatabaseException {Cursor cursor = null;OperationStatus status;try {cursor = this.pendingUrisDB.openCursor(null, null);// get cap; headKey at this point should always point to // a queue-beginning cap entry (zero-length value)status = cursor.getSearchKey(headKey, result, null);if (status != OperationStatus.SUCCESS) {LOGGER.severe("bdb queue cap missing: " + status.toString() + " " + new String(headKey.getData()));return status;}if (result.getData().length > 0) {LOGGER.severe("bdb queue has nonzero size: " + result.getData().length);return OperationStatus.KEYEXIST;}// get next item (real first item of queue)status = cursor.getNext(headKey,result,null);} finally { if(cursor!=null) {cursor.close();}}return status;}void delete(CrawlURI item)方法(根據holderKey鍵刪除)
/*** Delete the given CrawlURI from persistent store. Requires* the key under which it was stored be available. * * @param item* @throws DatabaseException*/public void delete(CrawlURI item) throws DatabaseException {OperationStatus status;DatabaseEntry de = (DatabaseEntry)item.getHolderKey();status = pendingUrisDB.delete(null, de);if (status != OperationStatus.SUCCESS) {LOGGER.severe("expected item not present: "+ item+ "("+ (new BigInteger(((DatabaseEntry) item.getHolderKey()).getData())).toString(16) + ")");}}void forAllPendingDo(Closure c) 方法用于遍歷記錄并且回調Closure c的方法(閉包)
/*** 遍歷記錄* Utility method to perform action for all pending CrawlURI instances.* @param c Closure action to perform* @throws DatabaseException*/protected void forAllPendingDo(Closure c) throws DatabaseException {DatabaseEntry key = new DatabaseEntry();DatabaseEntry value = new DatabaseEntry();Cursor cursor = pendingUrisDB.openCursor(null, null);while (cursor.getNext(key, value, null) == OperationStatus.SUCCESS) {if (value.getData().length == 0) {continue;}CrawlURI item = (CrawlURI) crawlUriBinding.entryToObject(value);c.execute(item);}cursor.close(); }CompositeData getFrom(String m, int maxMatches, Pattern pattern, boolean verbose)方法是JMX管理 需要用到的
/*** @param m marker or null to start with first entry* @param maxMatches* @return list of matches starting from marker position* @throws DatabaseException*/public CompositeData getFrom(String m, int maxMatches, Pattern pattern, boolean verbose) throws DatabaseException {int matches = 0;int tries = 0;ArrayList<String> results = new ArrayList<String>(maxMatches);DatabaseEntry key;if (m == null) {key = getFirstKey();} else {byte[] marker = m.getBytes(); // = FrontierJMXTypes.fromString(m);key = new DatabaseEntry(marker);}DatabaseEntry value = new DatabaseEntry();Cursor cursor = null;OperationStatus result = null;try {cursor = pendingUrisDB.openCursor(null,null);result = cursor.getSearchKey(key, value, null);while(matches < maxMatches && result == OperationStatus.SUCCESS) {if(value.getData().length>0) {CrawlURI curi = (CrawlURI) crawlUriBinding.entryToObject(value);if(pattern.matcher(curi.toString()).matches()) {if (verbose) {results.add("[" + curi.getClassKey() + "] " + curi.shortReportLine());} else {results.add(curi.toString());}matches++;}tries++;}result = cursor.getNext(key,value,null);}} finally {if (cursor !=null) {cursor.close();}}if(result != OperationStatus.SUCCESS) {// end of scanm = null;} else {m = new String(key.getData()); // = FrontierJMXTypes.toString(key.getData()); }String[] arr = results.toArray(new String[results.size()]);CompositeData cd;try {cd = new CompositeDataSupport(/*FrontierJMXTypes.URI_LIST_DATA*/ null,new String[] { "list", "marker" },new Object[] { arr, m });} catch (OpenDataException e) {throw new IllegalStateException(e);}return cd;}最后我們有必要了解一下EntryBinding<CrawlURI> crawlUriBinding對象,該類為一個泛型類,實現了je的EntryBinding<K>接口,用于將系統里面的自定義類型(在heritrix系統中也就是CrawlURI類)轉換為BDB數據庫的類型,其源碼如下:
/*** Binding for use with BerkeleyDB-JE that uses Kryo serialization rather* than BDB's (custom version of) Java serialization.* * @contributor gojomo*/ public class KryoBinding<K> implements EntryBinding<K> {protected Class<K> baseClass;protected AutoKryo kryo = new AutoKryo(); protected ThreadLocal<WeakReference<ObjectBuffer>> threadBuffer = new ThreadLocal<WeakReference<ObjectBuffer>>() {@Overrideprotected WeakReference<ObjectBuffer> initialValue() {return new WeakReference<ObjectBuffer>(new ObjectBuffer(kryo,16*1024,Integer.MAX_VALUE));}};/*** Constructor. Save parameters locally, as superclass * fields are private. * * @param classCatalog is the catalog to hold shared class information** @param baseClass is the base class for serialized objects stored using* this binding*/@SuppressWarnings("unchecked")public KryoBinding(Class baseClass) {this.baseClass = baseClass;kryo.autoregister(baseClass);// TODO: reevaluate if explicit registration should be requiredkryo.setRegistrationOptional(true);}public Kryo getKryo() {return kryo;}private ObjectBuffer getBuffer() {WeakReference<ObjectBuffer> ref = threadBuffer.get();ObjectBuffer ob = ref.get();if (ob == null) {ob = new ObjectBuffer(kryo,16*1024,Integer.MAX_VALUE);threadBuffer.set(new WeakReference<ObjectBuffer>(ob));}return ob; }/*** Copies superclass simply to allow different source for FastOoutputStream.* * @see com.sleepycat.bind.serial.SerialBinding#entryToObject*/public void objectToEntry(K object, DatabaseEntry entry) {entry.setData(getBuffer().writeObjectData(object));}@Overridepublic K entryToObject(DatabaseEntry entry) {return getBuffer().readObjectData(entry.getData(), baseClass);} }---------------------------------------------------------------------------?
本系列Heritrix 3.1.0 源碼解析系本人原創?
轉載請注明出處 博客園 刺猬的溫馴?
本文鏈接?http://www.cnblogs.com/chenying99/archive/2013/04/17/3025420.html
轉載于:https://www.cnblogs.com/chenying99/archive/2013/04/19/3025420.html
總結
以上是生活随笔為你收集整理的Heritrix 3.1.0 源码解析(八)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: C#中两个窗体间的数据传递
- 下一篇: 各种网盘体验对比