solr源码分析之数据导入DataImporter追溯。
若要搜索的信息都是被存儲在數(shù)據(jù)庫里面的,但是solr不能直接搜數(shù)據(jù)庫,所以只有借助Solr組件將要搜索的信息在搜索服務(wù)器上進行索引,然后在客戶端供客戶使用。
1. SolrDispatchFilter
SolrDispatchFilter的作用:將請求的url映射到定義在solrconfig.xml中的處理器handler。
要處理的動作有:
enum Action {PASSTHROUGH, FORWARD, RETURN, RETRY, ADMIN, REMOTEQUERY, PROCESS}PASSTHROUGH:通過webapp傳遞到Restlet。
FORWARD:跳轉(zhuǎn)重寫的url(沒有路徑前綴和核心/集合名稱)到Restlet。
RETURN:返回控制,不需要更多特定的處理,通常在設(shè)置錯誤并返回時產(chǎn)生。
RETRY:重試請求。當沒有發(fā)現(xiàn)工作的core時,設(shè)置此參數(shù)。
注:核心是指CoreContainer
SolrDispatchFilter間接繼承了javax.servlet.Filter,實現(xiàn)方法為doFilter():
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain, boolean retry) throws IOException, ServletException {if (!(request instanceof HttpServletRequest)) return;AtomicReference<ServletRequest> wrappedRequest = new AtomicReference();if (!authenticateRequest(request, response, wrappedRequest)) { // the response and status code have already been sentreturn;}if (wrappedRequest.get() != null) {request = wrappedRequest.get();}if (cores.getAuthenticationPlugin() != null) {log.debug("User principal: {}", ((HttpServletRequest)request).getUserPrincipal());}// No need to even create the HttpSolrCall object if this path is excluded.if(excludePatterns != null) {String servletPath = ((HttpServletRequest) request).getServletPath();for (Pattern p : excludePatterns) {Matcher matcher = p.matcher(servletPath);if (matcher.lookingAt()) {chain.doFilter(request, response);return;}}}HttpSolrCall call = getHttpSolrCall((HttpServletRequest) request, (HttpServletResponse) response, retry);try { Action result = call.call();switch (result) {case PASSTHROUGH:chain.doFilter(request, response);break;case RETRY:doFilter(request, response, chain, true);break;case FORWARD:request.getRequestDispatcher(call.getPath()).forward(request, response);break;} } finally {call.destroy();}}SolrDispatchFilter調(diào)用HttpSolrCall的call()方法來處理。
2. 調(diào)用HttpSolrCall處理請求
HttpSolrCall的構(gòu)造函數(shù):
public HttpSolrCall(SolrDispatchFilter solrDispatchFilter, CoreContainer cores,HttpServletRequest request, HttpServletResponse response, boolean retry) {this.solrDispatchFilter = solrDispatchFilter;this.cores = cores;this.req = request;this.response = response;this.retry = retry;this.requestType = RequestType.UNKNOWN;queryParams = SolrRequestParsers.parseQueryString(req.getQueryString());}在call方法中完整請求處理:
/*** This method processes the request.*/public Action call() throws IOException {MDCLoggingContext.reset();MDCLoggingContext.setNode(cores);if (cores == null) {sendError(503, "Server is shutting down or failed to initialize");return RETURN;}if (solrDispatchFilter.abortErrorMessage != null) {sendError(500, solrDispatchFilter.abortErrorMessage);return RETURN;}try {init();/* Authorize the request if1. Authorization is enabled, and2. The requested resource is not a known static file*/if (cores.getAuthorizationPlugin() != null) {AuthorizationContext context = getAuthCtx();log.info(context.toString());AuthorizationResponse authResponse = cores.getAuthorizationPlugin().authorize(context);if (!(authResponse.statusCode == HttpStatus.SC_ACCEPTED) && !(authResponse.statusCode == HttpStatus.SC_OK)) {sendError(authResponse.statusCode,"Unauthorized request, Response code: " + authResponse.statusCode);return RETURN;}}HttpServletResponse resp = response;switch (action) {case ADMIN:handleAdminRequest();return RETURN;case REMOTEQUERY:remoteQuery(coreUrl + path, resp);return RETURN;case PROCESS:final Method reqMethod = Method.getMethod(req.getMethod());HttpCacheHeaderUtil.setCacheControlHeader(config, resp, reqMethod);// unless we have been explicitly told not to, do cache validation// if we fail cache validation, execute the queryif (config.getHttpCachingConfig().isNever304() ||!HttpCacheHeaderUtil.doCacheHeaderValidation(solrReq, req, reqMethod, resp)) {SolrQueryResponse solrRsp = new SolrQueryResponse();/* even for HEAD requests, we need to execute the handler to* ensure we don't get an error (and to make sure the correct* QueryResponseWriter is selected and we get the correct* Content-Type)*/SolrRequestInfo.setRequestInfo(new SolrRequestInfo(solrReq, solrRsp));execute(solrRsp);HttpCacheHeaderUtil.checkHttpCachingVeto(solrRsp, resp, reqMethod);Iterator<Map.Entry<String, String>> headers = solrRsp.httpHeaders();while (headers.hasNext()) {Map.Entry<String, String> entry = headers.next();resp.addHeader(entry.getKey(), entry.getValue());}QueryResponseWriter responseWriter = core.getQueryResponseWriter(solrReq);if (invalidStates != null) solrReq.getContext().put(CloudSolrClient.STATE_VERSION, invalidStates);writeResponse(solrRsp, responseWriter, reqMethod);}return RETURN;default: return action;}} catch (Throwable ex) {sendError(ex);// walk the the entire cause chain to search for an ErrorThrowable t = ex;while (t != null) {if (t instanceof Error) {if (t != ex) {SolrDispatchFilter.log.error("An Error was wrapped in another exception - please report complete stacktrace on SOLR-6161", ex);}throw (Error) t;}t = t.getCause();}return RETURN;} finally {MDCLoggingContext.clear();}}3.獲取handler
RequestHandlerBase獲取handler:
/*** Get the request handler registered to a given name.** This function is thread safe.*/public static SolrRequestHandler getRequestHandler(String handlerName, PluginBag<SolrRequestHandler> reqHandlers) {if(handlerName == null) return null;SolrRequestHandler handler = reqHandlers.get(handlerName);int idx = 0;if(handler == null) {for (; ; ) {idx = handlerName.indexOf('/', idx+1);if (idx > 0) {String firstPart = handlerName.substring(0, idx);handler = reqHandlers.get(firstPart);if (handler == null) continue;if (handler instanceof NestedRequestHandler) {return ((NestedRequestHandler) handler).getSubHandler(handlerName.substring(idx));}} else {break;}}}return handler;}4.處理請求handleRequest
RequestHandlerBase的handleRequest()方法處理請求:
public void handleRequest(SolrQueryRequest req, SolrQueryResponse rsp) {numRequests.incrementAndGet();TimerContext timer = requestTimes.time();try {if(pluginInfo != null && pluginInfo.attributes.containsKey(USEPARAM)) req.getContext().put(USEPARAM,pluginInfo.attributes.get(USEPARAM));SolrPluginUtils.setDefaults(this, req, defaults, appends, invariants);req.getContext().remove(USEPARAM);rsp.setHttpCaching(httpCaching); handleRequestBody( req, rsp );// count timeoutsNamedList header = rsp.getResponseHeader();if(header != null) {Object partialResults = header.get("partialResults");boolean timedOut = partialResults == null ? false : (Boolean)partialResults;if( timedOut ) {numTimeouts.incrementAndGet();rsp.setHttpCaching(false);}}} catch (Exception e) {if (e instanceof SolrException) {SolrException se = (SolrException)e;if (se.code() == SolrException.ErrorCode.CONFLICT.code) {// TODO: should we allow this to be counted as an error (numErrors++)? } else {SolrException.log(SolrCore.log,e);}} else {SolrException.log(SolrCore.log,e);if (e instanceof SyntaxError) {e = new SolrException(SolrException.ErrorCode.BAD_REQUEST, e);}}rsp.setException(e);numErrors.incrementAndGet();}finally {timer.stop();}}5.具體請求落到各個handler的handleRequestBody()方法,以DataImportHandler為例:
@Override@SuppressWarnings("unchecked")public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp)throws Exception {rsp.setHttpCaching(false);//TODO: figure out why just the first one is OK...ContentStream contentStream = null;Iterable<ContentStream> streams = req.getContentStreams();if(streams != null){for (ContentStream stream : streams) {contentStream = stream;break;}}SolrParams params = req.getParams();NamedList defaultParams = (NamedList) initArgs.get("defaults");RequestInfo requestParams = new RequestInfo(req, getParamsMap(params), contentStream);String command = requestParams.getCommand();if (DataImporter.SHOW_CONF_CMD.equals(command)) { String dataConfigFile = params.get("config");String dataConfig = params.get("dataConfig");if(dataConfigFile != null) {dataConfig = SolrWriter.getResourceAsString(req.getCore().getResourceLoader().openResource(dataConfigFile));}if(dataConfig==null) {rsp.add("status", DataImporter.MSG.NO_CONFIG_FOUND);} else {// Modify incoming request params to add wt=rawModifiableSolrParams rawParams = new ModifiableSolrParams(req.getParams());rawParams.set(CommonParams.WT, "raw");req.setParams(rawParams);ContentStreamBase content = new ContentStreamBase.StringStream(dataConfig);rsp.add(RawResponseWriter.CONTENT, content);}return;}rsp.add("initArgs", initArgs);String message = "";if (command != null) {rsp.add("command", command);}// If importer is still nullif (importer == null) {rsp.add("status", DataImporter.MSG.NO_INIT);return;}if (command != null && DataImporter.ABORT_CMD.equals(command)) {importer.runCmd(requestParams, null);} else if (importer.isBusy()) {message = DataImporter.MSG.CMD_RUNNING;} else if (command != null) {if (DataImporter.FULL_IMPORT_CMD.equals(command)|| DataImporter.DELTA_IMPORT_CMD.equals(command) ||IMPORT_CMD.equals(command)) {importer.maybeReloadConfiguration(requestParams, defaultParams);UpdateRequestProcessorChain processorChain =req.getCore().getUpdateProcessorChain(params);UpdateRequestProcessor processor = processorChain.createProcessor(req, rsp);SolrResourceLoader loader = req.getCore().getResourceLoader();DIHWriter sw = getSolrWriter(processor, loader, requestParams, req);if (requestParams.isDebug()) {if (debugEnabled) {// Synchronous request for the debug mode importer.runCmd(requestParams, sw);rsp.add("mode", "debug");rsp.add("documents", requestParams.getDebugInfo().debugDocuments);if (requestParams.getDebugInfo().debugVerboseOutput != null) {rsp.add("verbose-output", requestParams.getDebugInfo().debugVerboseOutput);}} else {message = DataImporter.MSG.DEBUG_NOT_ENABLED;}} else {// Asynchronous request for normal modeif(requestParams.getContentStream() == null && !requestParams.isSyncMode()){importer.runAsync(requestParams, sw);} else {importer.runCmd(requestParams, sw);}}} else if (DataImporter.RELOAD_CONF_CMD.equals(command)) { if(importer.maybeReloadConfiguration(requestParams, defaultParams)) {message = DataImporter.MSG.CONFIG_RELOADED;} else {message = DataImporter.MSG.CONFIG_NOT_RELOADED;}}}rsp.add("status", importer.isBusy() ? "busy" : "idle");rsp.add("importResponse", message);rsp.add("statusMessages", importer.getStatusMessages());}?6. 導(dǎo)入數(shù)據(jù)操作
分全量和增量:
void runCmd(RequestInfo reqParams, DIHWriter sw) {String command = reqParams.getCommand();if (command.equals(ABORT_CMD)) {if (docBuilder != null) {docBuilder.abort();}return;}if (!importLock.tryLock()){LOG.warn("Import command failed . another import is running"); return;}try {if (FULL_IMPORT_CMD.equals(command) || IMPORT_CMD.equals(command)) {doFullImport(sw, reqParams);} else if (command.equals(DELTA_IMPORT_CMD)) {doDeltaImport(sw, reqParams);}} finally {importLock.unlock();}}以全量為例:
public void doFullImport(DIHWriter writer, RequestInfo requestParams) {LOG.info("Starting Full Import");setStatus(Status.RUNNING_FULL_DUMP);try {DIHProperties dihPropWriter = createPropertyWriter();setIndexStartTime(dihPropWriter.getCurrentTimestamp());docBuilder = new DocBuilder(this, writer, dihPropWriter, requestParams);checkWritablePersistFile(writer, dihPropWriter);docBuilder.execute();if (!requestParams.isDebug())cumulativeStatistics.add(docBuilder.importStatistics);} catch (Exception e) {SolrException.log(LOG, "Full Import failed", e);docBuilder.handleError("Full Import failed", e);} finally {setStatus(Status.IDLE);DocBuilder.INSTANCE.set(null);}}7.?EntityProcessorWrapper處理sql的實現(xiàn)類SqlEntityProcessor
@Overridepublic void init(Context context) {rowcache = null;this.context = context;resolver = (VariableResolver) context.getVariableResolver();if (entityName == null) {onError = resolver.replaceTokens(context.getEntityAttribute(ON_ERROR));if (onError == null) onError = ABORT;entityName = context.getEntityAttribute(ConfigNameConstants.NAME);}delegate.init(context);}初始化時實現(xiàn)SqlEntityProcessor的初始化
public void init(Context context) {super.init(context);dataSource = context.getDataSource();}contextImpl
@Overridepublic DataSource getDataSource() {if (ds != null) return ds;if(epw==null) { return null; }if (epw!=null && epw.getDatasource() == null) {epw.setDatasource(dataImporter.getDataSourceInstance(epw.getEntity(), epw.getEntity().getDataSourceName(), this));}if (epw!=null && epw.getDatasource() != null && docBuilder != null && docBuilder.verboseDebug &&Context.FULL_DUMP.equals(currentProcess())) {//debug is not yet implemented properly for deltas epw.setDatasource(docBuilder.getDebugLogger().wrapDs(epw.getDatasource()));}return epw.getDatasource();}DataImporter獲取數(shù)據(jù)庫配置:
public DataSource getDataSourceInstance(Entity key, String name, Context ctx) {Map<String,String> p = requestLevelDataSourceProps.get(name);if (p == null)p = config.getDataSources().get(name);if (p == null)p = requestLevelDataSourceProps.get(null);// for default data sourceif (p == null)p = config.getDataSources().get(null);if (p == null) throw new DataImportHandlerException(SEVERE,"No dataSource :" + name + " available for entity :" + key.getName());String type = p.get(TYPE);DataSource dataSrc = null;if (type == null) {dataSrc = new JdbcDataSource();} else {try {dataSrc = (DataSource) DocBuilder.loadClass(type, getCore()).newInstance();} catch (Exception e) {wrapAndThrow(SEVERE, e, "Invalid type for data source: " + type);}}try {Properties copyProps = new Properties();copyProps.putAll(p);Map<String, Object> map = ctx.getRequestParameters();if (map.containsKey("rows")) {int rows = Integer.parseInt((String) map.get("rows"));if (map.containsKey("start")) {rows += Integer.parseInt((String) map.get("start"));}copyProps.setProperty("maxRows", String.valueOf(rows));}dataSrc.init(ctx, copyProps);} catch (Exception e) {wrapAndThrow(SEVERE, e, "Failed to initialize DataSource: " + key.getDataSourceName());}return dataSrc;}?
8.查詢結(jié)果
public ResultSetIterator(String query) {try {Connection c = getConnection();stmt = c.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);stmt.setFetchSize(batchSize);stmt.setMaxRows(maxRows);LOG.debug("Executing SQL: " + query);long start = System.nanoTime();if (stmt.execute(query)) {resultSet = stmt.getResultSet();}LOG.trace("Time taken for sql :"+ TimeUnit.MILLISECONDS.convert(System.nanoTime() - start, TimeUnit.NANOSECONDS));colNames = readFieldNames(resultSet.getMetaData());} catch (Exception e) {wrapAndThrow(SEVERE, e, "Unable to execute query: " + query);}if (resultSet == null) {rSetIterator = new ArrayList<Map<String, Object>>().iterator();return;}rSetIterator = new Iterator<Map<String, Object>>() {@Overridepublic boolean hasNext() {return hasnext();}@Overridepublic Map<String, Object> next() {return getARow();}@Overridepublic void remove() {/* do nothing */}};}?
solr支持數(shù)據(jù)庫的全量和增量索引建立,上述代碼介紹了全量索引的來龍去脈,增量索引和全量索引雷同,就不贅述了。
?
轉(zhuǎn)載于:https://www.cnblogs.com/davidwang456/p/4754628.html
總結(jié)
以上是生活随笔為你收集整理的solr源码分析之数据导入DataImporter追溯。的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: spark初识
- 下一篇: Five ways to maximiz