企业搜索引擎开发之连接器connector(二十七)
ChangeQueue類實(shí)現(xiàn)ChangeSource接口,聲明了拉取下一條Change對(duì)象的方法
* A source of {@link Change} objects.** @since 2.8*/ public interface ChangeSource {/*** @return the next change, or {@code null} if there is no change available*/public Change getNextChange(); }在ChangeQueue類實(shí)例里面初始化阻塞隊(duì)列private final BlockingQueue<Change> pendingChanges,作為保存Change對(duì)象容器
/*** 初始化阻塞隊(duì)列pendingChanges* @param size* @param sleepInterval* @param introduceDelayAfterEachScan* @param activityLogger*/private ChangeQueue(int size, long sleepInterval, boolean introduceDelayAfterEachScan, CrawlActivityLogger activityLogger) {pendingChanges = new ArrayBlockingQueue<Change>(size);this.sleepInterval = sleepInterval;this.activityLogger = activityLogger;this.introduceDelayAfterEveryScan = introduceDelayAfterEachScan;}參數(shù)introduceDelayAfterEveryScan設(shè)置在數(shù)據(jù)迭代完畢是否延時(shí)
上文中提到在其內(nèi)部類CallBack中將提交的數(shù)據(jù)添加到阻塞隊(duì)列BlockingQueue<Change> pendingChanges之中
而在ChangeQueue實(shí)現(xiàn)ChangeSource接口的方法中,實(shí)現(xiàn)從阻塞隊(duì)列獲取Change對(duì)象
/*** 獲取阻塞隊(duì)列pendingChanges元素* Gets the next available change from the ChangeQueue. Will wait up to* 1/4 second for a change to appear if none is immediately available.** @return the next available change, or {@code null} if no changes are* available*/public Change getNextChange() {try {return pendingChanges.poll(250L, TimeUnit.MILLISECONDS);} catch (InterruptedException ie) {return null;}}ChangeQueue對(duì)象作為保存Change對(duì)象的緩沖容器,上文中分析到Change對(duì)象是通過啟動(dòng)監(jiān)控器對(duì)象DocumentSnapshotRepositoryMonitor的線程方法添加進(jìn)來的
那么,由哪個(gè)對(duì)象實(shí)現(xiàn)調(diào)用ChangeQueue對(duì)象的getNextChange()方法取出Change對(duì)象數(shù)據(jù)呢?
通過跟蹤C(jī)heckpointAndChangeQueue類的loadUpFromChangeSource方法調(diào)用了getNextChange()方法,在該方法里面將獲取的Chnage對(duì)象經(jīng)過包裝為CheckpointAndChange類型對(duì)象后添加到成員屬性List<CheckpointAndChange> checkpointAndChangeList之中
先熟悉一下相關(guān)成員屬性和構(gòu)造函數(shù)
private final AtomicInteger maximumQueueSize =new AtomicInteger(DEFAULT_MAXIMUM_QUEUE_SIZE);private final List<CheckpointAndChange> checkpointAndChangeList;private final ChangeSource changeSource;private final DocumentHandleFactory internalDocumentHandleFactory;private final DocumentHandleFactory clientDocumentHandleFactory;private volatile DiffingConnectorCheckpoint lastCheckpoint;private final File persistDir; // place to persist enqueued valuesprivate MonitorRestartState monitorPoints = new MonitorRestartState();public CheckpointAndChangeQueue(ChangeSource changeSource, File persistDir,DocumentHandleFactory internalDocumentHandleFactory,DocumentHandleFactory clientDocumentHandleFactory) {this.changeSource = changeSource;this.checkpointAndChangeList= Collections.synchronizedList(new ArrayList<CheckpointAndChange>(maximumQueueSize.get()));this.persistDir = persistDir;this.internalDocumentHandleFactory = internalDocumentHandleFactory;this.clientDocumentHandleFactory = clientDocumentHandleFactory;ensurePersistDirExists();}包括初始化ChangeSource類型對(duì)象changeSource(也即ChangeQueue類型對(duì)象)以及List容器List<CheckpointAndChange> checkpointAndChangeList
再來回顧loadUpFromChangeSource方法
/*** 從ChangeSource拉取Change,加入checkpointAndChangeList*/private void loadUpFromChangeSource() {int max = maximumQueueSize.get();if (checkpointAndChangeList.size() < max) {lastCheckpoint = lastCheckpoint.nextMajor();} while (checkpointAndChangeList.size() < max) {Change newChange = changeSource.getNextChange();if (newChange == null) {break;}lastCheckpoint = lastCheckpoint.next();checkpointAndChangeList.add(new CheckpointAndChange(lastCheckpoint, newChange)); }}方法主要行為即從changeSource對(duì)象取出Change對(duì)象,然后經(jīng)過包裝為CheckPointAndChange對(duì)象添加到?容器List<CheckpointAndChange> checkpointAndChangeList之中
在其resume方法里面調(diào)用了loadUpFromChangeSource方法(resume方法在DiffingConnectorDocumentList類的構(gòu)造函數(shù)中調(diào)用)
/*** 獲取List<CheckpointAndChange>隊(duì)列* Returns an {@link Iterator} for currently available* {@link CheckpointAndChange} objects that occur after the passed in* checkpoint. The {@link String} form of a {@link DiffingConnectorCheckpoint}* passed in is produced by calling* {@link DiffingConnectorCheckpoint#toString()}. As a side effect, Objects* up to and including the object with the passed in checkpoint are removed* from this queue.** @param checkpointString null means return all {@link CheckpointAndChange}* objects and a non null value means to return* {@link CheckpointAndChange} objects with checkpoints after the* passed in value.* @throws IOException if error occurs while manipulating recovery state*/synchronized List<CheckpointAndChange> resume(String checkpointString)throws IOException {//移除已完成隊(duì)列 removeCompletedChanges(checkpointString);//從ChangeSource拉取Change,加入checkpointAndChangeList loadUpFromChangeSource();//更新monitorPoints monitorPoints.updateOnGuaranteed(checkpointAndChangeList);try {//持久化checkpointAndChangeList到隊(duì)列文件//一次resume即生成一文件 writeRecoveryState();} finally {// TODO: Enahnce with mechanism that remembers// information about recovery files to avoid re-reading.//移除冗余的隊(duì)列文件 (已經(jīng)消費(fèi)完成的) removeExcessRecoveryState();}return getList();}在填充List<CheckpointAndChange> checkpointAndChangeList容器后,將其中的數(shù)據(jù)以json格式持久化到隊(duì)列文件?
/** * 持久化json隊(duì)列* @throws IOException*/private void writeRecoveryState() throws IOException {// TODO(pjo): Move this method into RecoveryFile.File recoveryFile = new RecoveryFile(persistDir);FileOutputStream outStream = new FileOutputStream(recoveryFile);Writer writer = new OutputStreamWriter(outStream, Charsets.UTF_8);try {try {writeJson(writer);} catch (JSONException e) {throw IOExceptionHelper.newIOException("Failed writing recovery file.", e);}writer.flush();outStream.getFD().sync();} finally {writer.close();}}隊(duì)列文件命名包含了當(dāng)前系統(tǒng)時(shí)間,用于比較文件創(chuàng)建的早晚
/** * 可用于比較時(shí)間的隊(duì)列文件* A File that has some of the recovery logic. * Original recovery files' names contained a single nanosecond timestamp,* eg. recovery.10220010065599398 . These turned out to be flawed* because nanosecond times can go "back in time" between JVM restarts.* Updated recovery files' names contain a wall clock millis timestamp * followed by an underscore followed by a nanotimestamp, eg.* recovery.702522216012_10220010065599398 .*/static class RecoveryFile extends File {final static long NO_TIME_AVAIL = -1;long milliTimestamp = NO_TIME_AVAIL;long nanoTimestamp;long parseTime(String s) throws IOException {try {return Long.parseLong(s);} catch(NumberFormatException e) {throw new LoggingIoException("Invalid recovery filename: "+ getAbsolutePath());}}/*** 解析文件名稱中包含的時(shí)間* @throws IOException*/void parseOutTimes() throws IOException {try {String basename = getName();if (!basename.startsWith(RECOVERY_FILE_PREFIX)) {throw new LoggingIoException("Invalid recovery filename: "+ getAbsolutePath());} else {String extension = basename.substring(RECOVERY_FILE_PREFIX.length());if (!extension.contains("_")) { // Original name format.nanoTimestamp = parseTime(extension);} else { // Updated name format.String timeParts[] = extension.split("_");if (2 != timeParts.length) {throw new LoggingIoException("Invalid recovery filename: "+ getAbsolutePath());}milliTimestamp = parseTime(timeParts[0]);nanoTimestamp = parseTime(timeParts[1]);}}} catch(IndexOutOfBoundsException e) {throw new LoggingIoException("Invalid recovery filename: "+ getAbsolutePath());}}RecoveryFile(File persistanceDir) throws IOException {super(persistanceDir, RECOVERY_FILE_PREFIX + System.currentTimeMillis()+ "_" + System.nanoTime());parseOutTimes();}/*** 該構(gòu)造函數(shù)用于先獲得文件絕對(duì)路徑* @param absolutePath* @throws IOException*/RecoveryFile(String absolutePath) throws IOException {super(absolutePath);parseOutTimes();}boolean isOlder(RecoveryFile other) {boolean weHaveMillis = milliTimestamp != NO_TIME_AVAIL;boolean otherHasMillis = other.milliTimestamp != NO_TIME_AVAIL;boolean bothHaveMillis = weHaveMillis && otherHasMillis;boolean neitherHasMillis = (!weHaveMillis) && (!otherHasMillis);if (bothHaveMillis) {if (this.milliTimestamp < other.milliTimestamp) {return true;} else if (this.milliTimestamp > other.milliTimestamp) {return false;} else {return this.nanoTimestamp < other.nanoTimestamp;}} else if (neitherHasMillis) {return this.nanoTimestamp < other.nanoTimestamp;} else if (weHaveMillis) { // and other doesn't; we are newer.return false;} else { // other has millis; other is newer.return true;}}/** A delete method that logs failures. *//*** 刪除文件*/public void logOnFailDelete() {boolean deleted = super.delete();if (!deleted) {LOG.severe("Failed to delete: " + getAbsolutePath());}}// TODO(pjo): Move more recovery logic into this class.}下面來看在其啟動(dòng)方法(start方法)都做了什么
/*** Initialize to start processing from after the passed in checkpoint* or from the beginning if the passed in checkpoint is null. Part of* making DocumentSnapshotRepositoryMonitorManager go from "cold" to "warm".*/public synchronized void start(String checkpointString) throws IOException {LOG.info("Starting CheckpointAndChangeQueue from " + checkpointString);//創(chuàng)建隊(duì)列目錄 ensurePersistDirExists();checkpointAndChangeList.clear();lastCheckpoint = constructLastCheckpoint(checkpointString);if (null == checkpointString) {//刪除隊(duì)列文件 removeAllRecoveryState();} else {RecoveryFile current = removeExcessRecoveryState();//加載monitorPoints和checkpointAndChangeList隊(duì)列 loadUpFromRecoveryState(current);//this.monitorPoints.points.entrySet(); }}無非從原先保存的隊(duì)列文件中加載CheckPointAndChange對(duì)象列表到List<CheckpointAndChange> checkpointAndChangeList容器中(另外還包括MonitorCheckoint對(duì)象)
/*** 加載隊(duì)列* @param file* @throws IOException*/private void loadUpFromRecoveryState(RecoveryFile file) throws IOException {// TODO(pjo): Move this method into RecoveryFile.new LoadingQueueReader().readJson(file);}在CheckpointAndChangeQueue類中定義了內(nèi)部類,即用于從json格式文件加載CheckPointAndChange對(duì)象列表到List<CheckpointAndChange> checkpointAndChangeList容器
抽象隊(duì)列讀取抽象類AbstractQueueReader
/*** 從json文件加載隊(duì)列抽象類* Reads JSON recovery files. Uses the Template Method pattern to* delegate what to do with the parsed objects to subclasses.** Note: This class uses gson for streaming support.*/private abstract class AbstractQueueReader {public void readJson(File file) throws IOException {readJson(new BufferedReader(new InputStreamReader(new FileInputStream(file), Charsets.UTF_8)));}/*** Reads and parses the stream, calling the abstract methods to* take whatever action is required. The given stream will be* closed automatically.** @param reader the stream to parse*/@VisibleForTestingvoid readJson(Reader reader) throws IOException {JsonReader jsonReader = new JsonReader(reader);try {readJson(jsonReader);} finally {jsonReader.close();}}/*** Reads and parses the stream, calling the abstract methods to* take whatever action is required.*/private void readJson(JsonReader reader) throws IOException {JsonParser parser = new JsonParser();reader.beginObject();while (reader.hasNext()) {String name = reader.nextName();if (name.equals(MONITOR_STATE_JSON_TAG)) {readMonitorPoints(parser.parse(reader));} else if (name.equals(QUEUE_JSON_TAG)) {reader.beginArray();while (reader.hasNext()) {readCheckpointAndChange(parser.parse(reader));}reader.endArray();} else {throw new IOException("Read invalid recovery file.");}}reader.endObject();reader.setLenient(true);String name = reader.nextString();if (!name.equals(SENTINAL)) {throw new IOException("Read invalid recovery file.");}}protected abstract void readMonitorPoints(JsonElement gson)throws IOException;protected abstract void readCheckpointAndChange(JsonElement gson)throws IOException;}抽象方法由子類實(shí)現(xiàn)
/*** 檢測(cè)隊(duì)列文件的有效性* Verifies that a JSON recovery file is valid JSON with a* trailing sentinel.*/private class ValidatingQueueReader extends AbstractQueueReader {protected void readMonitorPoints(JsonElement gson) throws IOException {}protected void readCheckpointAndChange(JsonElement gson)throws IOException {}}/*** 從json文件加載隊(duì)列實(shí)現(xiàn)類*//** Loads the queue from a JSON recovery file. *//** TODO(jlacey): Change everything downstream to gson. For now, we* reserialize the individual gson objects and deserialize them* using org.json.*/@VisibleForTestingclass LoadingQueueReader extends AbstractQueueReader {/*** 加載MonitorRestartState checkpoint(HashMap<String, MonitorCheckpoint> points)*/protected void readMonitorPoints(JsonElement gson) throws IOException {try {JSONObject json = gsonToJson(gson);monitorPoints = new MonitorRestartState(json);//monitorPoints.updateOnGuaranteed(checkpointAndChangeList)} catch (JSONException e) {throw IOExceptionHelper.newIOException("Failed reading persisted JSON queue.", e);}}/*** 加載checkpointAndChangeList*/protected void readCheckpointAndChange(JsonElement gson)throws IOException {try {JSONObject json = gsonToJson(gson);checkpointAndChangeList.add(new CheckpointAndChange(json,internalDocumentHandleFactory, clientDocumentHandleFactory));} catch (JSONException e) {throw IOExceptionHelper.newIOException("Failed reading persisted JSON queue.", e);}}// TODO(jlacey): This could be much more efficient, especially// with LOBs, if we directly transformed the objects with a little// recursive parser. This code is only used when recovering failed// batches, so I don't know if that's worth the effort.private JSONObject gsonToJson(JsonElement gson) throws JSONException {return new JSONObject(gson.toString());}}---------------------------------------------------------------------------
本系列企業(yè)搜索引擎開發(fā)之連接器connector系本人原創(chuàng)
轉(zhuǎn)載請(qǐng)注明出處 博客園 刺猬的溫馴
本人郵箱:?chenying998179@163#com (#改為.)
本文鏈接?http://www.cnblogs.com/chenying99/p/3789560.html?
轉(zhuǎn)載于:https://www.cnblogs.com/chenying99/p/3789560.html
總結(jié)
以上是生活随笔為你收集整理的企业搜索引擎开发之连接器connector(二十七)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: leetcode - Same Tree
- 下一篇: puppet完全攻略(一)puppet应