solrCloud源码分析之CloudSolrClient
CloudSolrClient是solrj提供的客戶端與solrCloud交互的類。該類的實例與zookeeper進行通信來確定solrCloud collections中的solr endpoint,然后使用LBHttpSolrClient發送請求。
CloudSolrClient查詢簡單代碼:
import java.io.IOException;import org.apache.solr.client.solrj.SolrQuery; import org.apache.solr.client.solrj.SolrServerException; import org.apache.solr.client.solrj.impl.CloudSolrClient; import org.apache.solr.client.solrj.impl.XMLResponseParser; import org.apache.solr.client.solrj.response.QueryResponse; import org.apache.solr.common.SolrDocumentList;public class SolrJCloudClientSearch {public static void main(String[] args) throws SolrServerException, IOException {String zkHost = "127.0.0.1:9983";CloudSolrClient server = new CloudSolrClient(zkHost);//server = new CloudSolrClient(zkHost);server.setParser(new XMLResponseParser());SolrQuery parameters = new SolrQuery();parameters.set("q", "*:*");parameters.set("qt", "/select"); parameters.set("collection", "Test");QueryResponse response = server.query(parameters);SolrDocumentList list = response.getResults();System.out.println(list.size()); } }查詢時流程
1. 調用方法query:
/*** Performs a query to the Solr server** @param params an object holding all key/value parameters to send along the request** @return a {@link org.apache.solr.client.solrj.response.QueryResponse} containing the response* from the server** @throws IOException If there is a low-level I/O error.* @throws SolrServerException if there is an error on the server*/public QueryResponse query(SolrParams params) throws SolrServerException, IOException {return query(null, params);}接著調用query方法
/*** Performs a query to the Solr server** @param collection the Solr collection to query* @param params an object holding all key/value parameters to send along the request** @return a {@link org.apache.solr.client.solrj.response.QueryResponse} containing the response* from the server** @throws IOException If there is a low-level I/O error.* @throws SolrServerException if there is an error on the server*/public QueryResponse query(String collection, SolrParams params) throws SolrServerException, IOException {return new QueryRequest(params).process(this, collection);}/*** Send this request to a {@link SolrClient} and return the response** @param client the SolrClient to communicate with* @param collection the collection to execute the request against** @return the response** @throws SolrServerException if there is an error on the Solr server* @throws IOException if there is a communication error*/public final T process(SolrClient client, String collection) throws SolrServerException, IOException {long startTime = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);T res = createResponse(client);res.setResponse(client.request(this, collection));long endTime = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);res.setElapsedTime(endTime - startTime);return res;}調用客戶端CloudSolrClient的request方法:
@Overridepublic NamedList<Object> request(SolrRequest request, String collection) throws SolrServerException, IOException {SolrParams reqParams = request.getParams();if (collection == null)collection = (reqParams != null) ? reqParams.get("collection", getDefaultCollection()) : getDefaultCollection();return requestWithRetryOnStaleState(request, 0, collection);}完整的客戶端調用過程如下:
/*** As this class doesn't watch external collections on the client side,* there's a chance that the request will fail due to cached stale state,* which means the state must be refreshed from ZK and retried.*/protected NamedList<Object> requestWithRetryOnStaleState(SolrRequest request, int retryCount, String collection)throws SolrServerException, IOException {connect(); // important to call this before you start working with the ZkStateReader// build up a _stateVer_ param to pass to the server containing all of the// external collection state versions involved in this request, which allows// the server to notify us that our cached state for one or more of the external// collections is stale and needs to be refreshed ... this code has no impact on internal collectionsString stateVerParam = null;List<DocCollection> requestedCollections = null;boolean isAdmin = ADMIN_PATHS.contains(request.getPath());if (collection != null && !isAdmin) { // don't do _stateVer_ checking for admin requestsSet<String> requestedCollectionNames = getCollectionNames(getZkStateReader().getClusterState(), collection);StringBuilder stateVerParamBuilder = null;for (String requestedCollection : requestedCollectionNames) {// track the version of state we're using on the client side using the _stateVer_ paramDocCollection coll = getDocCollection(getZkStateReader().getClusterState(), requestedCollection,null);int collVer = coll.getZNodeVersion();if (coll.getStateFormat()>1) {if(requestedCollections == null) requestedCollections = new ArrayList<>(requestedCollectionNames.size());requestedCollections.add(coll);if (stateVerParamBuilder == null) {stateVerParamBuilder = new StringBuilder();} else {stateVerParamBuilder.append("|"); // hopefully pipe is not an allowed char in a collection name }stateVerParamBuilder.append(coll.getName()).append(":").append(collVer);}}if (stateVerParamBuilder != null) {stateVerParam = stateVerParamBuilder.toString();}}if (request.getParams() instanceof ModifiableSolrParams) {ModifiableSolrParams params = (ModifiableSolrParams) request.getParams();if (stateVerParam != null) {params.set(STATE_VERSION, stateVerParam);} else {params.remove(STATE_VERSION);}} // else: ??? how to set this ??? NamedList<Object> resp = null;try {resp = sendRequest(request, collection);//to avoid an O(n) operation we always add STATE_VERSION to the last and try to read it from thereObject o = resp.get(STATE_VERSION, resp.size()-1);if(o != null && o instanceof Map) {//remove this because no one else needs this and tests would fail if they are comparing responsesresp.remove(resp.size()-1);Map invalidStates = (Map) o;for (Object invalidEntries : invalidStates.entrySet()) {Map.Entry e = (Map.Entry) invalidEntries;getDocCollection(getZkStateReader().getClusterState(),(String)e.getKey(), (Integer)e.getValue());}}} catch (Exception exc) {Throwable rootCause = SolrException.getRootCause(exc);// don't do retry support for admin requests or if the request doesn't have a collection specifiedif (collection == null || isAdmin) {if (exc instanceof SolrServerException) {throw (SolrServerException)exc;} else if (exc instanceof IOException) {throw (IOException)exc;}else if (exc instanceof RuntimeException) {throw (RuntimeException) exc;}else {throw new SolrServerException(rootCause);}}int errorCode = (rootCause instanceof SolrException) ?((SolrException)rootCause).code() : SolrException.ErrorCode.UNKNOWN.code;log.error("Request to collection {} failed due to ("+errorCode+") {}, retry? "+retryCount, collection, rootCause.toString());boolean wasCommError =(rootCause instanceof ConnectException ||rootCause instanceof ConnectTimeoutException ||rootCause instanceof NoHttpResponseException ||rootCause instanceof SocketException);boolean stateWasStale = false;if (retryCount < MAX_STALE_RETRIES &&requestedCollections != null &&!requestedCollections.isEmpty() &&SolrException.ErrorCode.getErrorCode(errorCode) == SolrException.ErrorCode.INVALID_STATE){// cached state for one or more external collections was stale// re-issue request using updated statestateWasStale = true;// just re-read state for all of them, which is a little heavy handed but hopefully a rare occurrencefor (DocCollection ext : requestedCollections) {collectionStateCache.remove(ext.getName());}}// if we experienced a communication error, it's worth checking the state// with ZK just to make sure the node we're trying to hit is still part of the collectionif (retryCount < MAX_STALE_RETRIES &&!stateWasStale &&requestedCollections != null &&!requestedCollections.isEmpty() &&wasCommError) {for (DocCollection ext : requestedCollections) {DocCollection latestStateFromZk = getDocCollection(zkStateReader.getClusterState(), ext.getName(),null);if (latestStateFromZk.getZNodeVersion() != ext.getZNodeVersion()) {// looks like we couldn't reach the server because the state was stale == retrystateWasStale = true;// we just pulled state from ZK, so update the cache so that the retry uses itcollectionStateCache.put(ext.getName(), new ExpiringCachedDocCollection(latestStateFromZk));}}}if (requestedCollections != null) {requestedCollections.clear(); // done with this }// if the state was stale, then we retry the request once with new state pulled from Zkif (stateWasStale) {log.warn("Re-trying request to collection(s) "+collection+" after stale state error from server.");resp = requestWithRetryOnStaleState(request, retryCount+1, collection);} else {if(exc instanceof SolrException) {throw exc;} if (exc instanceof SolrServerException) {throw (SolrServerException)exc;} else if (exc instanceof IOException) {throw (IOException)exc;} else {throw new SolrServerException(rootCause);}}}return resp;}上面的代碼看著很多,其實大部分都是異常處理的,主干代碼只有兩個:
1.connect()
/*** Connect to the zookeeper ensemble.* This is an optional method that may be used to force a connect before any other requests are sent.**/public void connect() {if (zkStateReader == null) {synchronized (this) {if (zkStateReader == null) {ZkStateReader zk = null;try {zk = new ZkStateReader(zkHost, zkClientTimeout, zkConnectTimeout);zk.createClusterStateWatchersAndUpdate();zkStateReader = zk;} catch (InterruptedException e) {zk.close();Thread.currentThread().interrupt();throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);} catch (KeeperException e) {zk.close();throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);} catch (Exception e) {if (zk != null) zk.close();// do not wrap because clients may be relying on the underlying exception being thrownthrow e;}}}}}調用ZkStateReader的方法來監控:
public synchronized void createClusterStateWatchersAndUpdate() throws KeeperException,InterruptedException {// We need to fetch the current cluster state and the set of live nodes log.info("Updating cluster state from ZooKeeper... ");// Sanity check ZK structure.if (!zkClient.exists(CLUSTER_STATE, true)) {throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,"Cannot connect to cluster at " + zkClient.getZkServerAddress() + ": cluster not found/not ready");}// on reconnect of SolrZkClient force refresh and re-add watches.refreshLegacyClusterState(new LegacyClusterStateWatcher());refreshStateFormat2Collections();refreshCollectionList(new CollectionsChildWatcher());refreshLiveNodes(new LiveNodeWatcher());synchronized (ZkStateReader.this.getUpdateLock()) {constructState();zkClient.exists(ALIASES,new Watcher() {@Overridepublic void process(WatchedEvent event) {// session events are not change events,// and do not remove the watcherif (EventType.None.equals(event.getType())) {return;}try {synchronized (ZkStateReader.this.getUpdateLock()) {log.info("Updating aliases... ");// remake watchfinal Watcher thisWatch = this;Stat stat = new Stat();byte[] data = zkClient.getData(ALIASES, thisWatch, stat ,true);Aliases aliases = ClusterState.load(data);ZkStateReader.this.aliases = aliases;}} catch (KeeperException e) {if (e.code() == KeeperException.Code.SESSIONEXPIRED|| e.code() == KeeperException.Code.CONNECTIONLOSS) {log.warn("ZooKeeper watch triggered, but Solr cannot talk to ZK");return;}log.error("", e);throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "", e);} catch (InterruptedException e) {// Restore the interrupted status Thread.currentThread().interrupt();log.warn("", e);return;}}}, true);}updateAliases();if (securityNodeListener != null) {addSecuritynodeWatcher(SOLR_SECURITY_CONF_PATH, new Callable<Pair<byte[], Stat>>() {@Overridepublic void call(Pair<byte[], Stat> pair) {ConfigData cd = new ConfigData();cd.data = pair.getKey() == null || pair.getKey().length == 0 ? EMPTY_MAP : Utils.getDeepCopy((Map) fromJSON(pair.getKey()), 4, false);cd.version = pair.getValue() == null ? -1 : pair.getValue().getVersion();securityData = cd;securityNodeListener.run();}});securityData = getSecurityProps(true);}}?
2. sendRequest()
方法如下:
protected NamedList<Object> sendRequest(SolrRequest request, String collection)throws SolrServerException, IOException {connect();ClusterState clusterState = zkStateReader.getClusterState();boolean sendToLeaders = false;List<String> replicas = null;if (request instanceof IsUpdateRequest) {if (request instanceof UpdateRequest) {NamedList<Object> response = directUpdate((AbstractUpdateRequest) request, collection, clusterState);if (response != null) {return response;}}sendToLeaders = true;replicas = new ArrayList<>();}SolrParams reqParams = request.getParams();if (reqParams == null) {reqParams = new ModifiableSolrParams();}List<String> theUrlList = new ArrayList<>();if (ADMIN_PATHS.contains(request.getPath())) {Set<String> liveNodes = clusterState.getLiveNodes();for (String liveNode : liveNodes) {theUrlList.add(zkStateReader.getBaseUrlForNodeName(liveNode));}} else {if (collection == null) {throw new SolrServerException("No collection param specified on request and no default collection has been set.");}Set<String> collectionNames = getCollectionNames(clusterState, collection);if (collectionNames.size() == 0) {throw new SolrException(ErrorCode.BAD_REQUEST,"Could not find collection: " + collection);}String shardKeys = reqParams.get(ShardParams._ROUTE_);// TODO: not a big deal because of the caching, but we could avoid looking// at every shard// when getting leaders if we tweaked some things// Retrieve slices from the cloud state and, for each collection// specified,// add it to the Map of slices.Map<String,Slice> slices = new HashMap<>();for (String collectionName : collectionNames) {DocCollection col = getDocCollection(clusterState, collectionName, null);Collection<Slice> routeSlices = col.getRouter().getSearchSlices(shardKeys, reqParams , col);ClientUtils.addSlices(slices, collectionName, routeSlices, true);}Set<String> liveNodes = clusterState.getLiveNodes();List<String> leaderUrlList = null;List<String> urlList = null;List<String> replicasList = null;// build a map of unique nodes// TODO: allow filtering by group, role, etcMap<String,ZkNodeProps> nodes = new HashMap<>();List<String> urlList2 = new ArrayList<>();for (Slice slice : slices.values()) {for (ZkNodeProps nodeProps : slice.getReplicasMap().values()) {ZkCoreNodeProps coreNodeProps = new ZkCoreNodeProps(nodeProps);String node = coreNodeProps.getNodeName();if (!liveNodes.contains(coreNodeProps.getNodeName())|| Replica.State.getState(coreNodeProps.getState()) != Replica.State.ACTIVE) continue;if (nodes.put(node, nodeProps) == null) {if (!sendToLeaders || coreNodeProps.isLeader()) {String url;if (reqParams.get(UpdateParams.COLLECTION) == null) {url = ZkCoreNodeProps.getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), collection);} else {url = coreNodeProps.getCoreUrl();}urlList2.add(url);} else {String url;if (reqParams.get(UpdateParams.COLLECTION) == null) {url = ZkCoreNodeProps.getCoreUrl(nodeProps.getStr(ZkStateReader.BASE_URL_PROP), collection);} else {url = coreNodeProps.getCoreUrl();}replicas.add(url);}}}}if (sendToLeaders) {leaderUrlList = urlList2;replicasList = replicas;} else {urlList = urlList2;}if (sendToLeaders) {theUrlList = new ArrayList<>(leaderUrlList.size());theUrlList.addAll(leaderUrlList);} else {theUrlList = new ArrayList<>(urlList.size());theUrlList.addAll(urlList);}if(theUrlList.isEmpty()) {for (String s : collectionNames) {if(s!=null) collectionStateCache.remove(s);}throw new SolrException(SolrException.ErrorCode.INVALID_STATE, "Could not find a healthy node to handle the request.");}Collections.shuffle(theUrlList, rand);if (sendToLeaders) {ArrayList<String> theReplicas = new ArrayList<>(replicasList.size());theReplicas.addAll(replicasList);Collections.shuffle(theReplicas, rand);theUrlList.addAll(theReplicas);}} LBHttpSolrClient.Req req = new LBHttpSolrClient.Req(request, theUrlList);LBHttpSolrClient.Rsp rsp = lbClient.request(req);return rsp.getResponse();}上面代碼的意思是根據選項來確定urlList,然后由LBHttpSolrClient.Req來構造請求,獲取響應。注意:使用for循環來構造每個server的請求。
/*** Tries to query a live server from the list provided in Req. Servers in the dead pool are skipped.* If a request fails due to an IOException, the server is moved to the dead pool for a certain period of* time, or until a test request on that server succeeds.** Servers are queried in the exact order given (except servers currently in the dead pool are skipped).* If no live servers from the provided list remain to be tried, a number of previously skipped dead servers will be tried.* Req.getNumDeadServersToTry() controls how many dead servers will be tried.** If no live servers are found a SolrServerException is thrown.** @param req contains both the request as well as the list of servers to query** @return the result of the request** @throws IOException If there is a low-level I/O error.*/public Rsp request(Req req) throws SolrServerException, IOException {Rsp rsp = new Rsp();Exception ex = null;boolean isUpdate = req.request instanceof IsUpdateRequest;List<ServerWrapper> skipped = null;long timeAllowedNano = getTimeAllowedInNanos(req.getRequest());long timeOutTime = System.nanoTime() + timeAllowedNano;for (String serverStr : req.getServers()) {if(isTimeExceeded(timeAllowedNano, timeOutTime)) {break;}serverStr = normalize(serverStr);// if the server is currently a zombie, just skip to the next oneServerWrapper wrapper = zombieServers.get(serverStr);if (wrapper != null) {// System.out.println("ZOMBIE SERVER QUERIED: " + serverStr);final int numDeadServersToTry = req.getNumDeadServersToTry();if (numDeadServersToTry > 0) {if (skipped == null) {skipped = new ArrayList<>(numDeadServersToTry);skipped.add(wrapper);}else if (skipped.size() < numDeadServersToTry) {skipped.add(wrapper);}}continue;}rsp.server = serverStr;try {MDC.put("LBHttpSolrClient.url", serverStr);HttpSolrClient client = makeSolrClient(serverStr);ex = doRequest(client, req, rsp, isUpdate, false, null);if (ex == null) {return rsp; // SUCCESS}} finally {MDC.remove("LBHttpSolrClient.url");}}// try the servers we previously skippedif (skipped != null) {for (ServerWrapper wrapper : skipped) {if(isTimeExceeded(timeAllowedNano, timeOutTime)) {break;}ex = doRequest(wrapper.client, req, rsp, isUpdate, true, wrapper.getKey());if (ex == null) {return rsp; // SUCCESS }}}if (ex == null) {throw new SolrServerException("No live SolrServers available to handle this request");} else {throw new SolrServerException("No live SolrServers available to handle this request:" + zombieServers.keySet(), ex);}}其有效代碼如紅色標示,創建HttpSolrClient,并發送請求。
protected Exception doRequest(HttpSolrClient client, Req req, Rsp rsp, boolean isUpdate,boolean isZombie, String zombieKey) throws SolrServerException, IOException {Exception ex = null;try {rsp.rsp = client.request(req.getRequest(), (String) null);if (isZombie) {zombieServers.remove(zombieKey);}} catch (SolrException e) {// we retry on 404 or 403 or 503 or 500// unless it's an update - then we only retry on connect exceptionif (!isUpdate && RETRY_CODES.contains(e.code())) {ex = (!isZombie) ? addZombie(client, e) : e;} else {// Server is alive but the request was likely malformed or invalidif (isZombie) {zombieServers.remove(zombieKey);}throw e;}} catch (SocketException e) {if (!isUpdate || e instanceof ConnectException) {ex = (!isZombie) ? addZombie(client, e) : e;} else {throw e;}} catch (SocketTimeoutException e) {if (!isUpdate) {ex = (!isZombie) ? addZombie(client, e) : e;} else {throw e;}} catch (SolrServerException e) {Throwable rootCause = e.getRootCause();if (!isUpdate && rootCause instanceof IOException) {ex = (!isZombie) ? addZombie(client, e) : e;} else if (isUpdate && rootCause instanceof ConnectException) {ex = (!isZombie) ? addZombie(client, e) : e;} else {throw e;}} catch (Exception e) {throw new SolrServerException(e);}return ex;}上述也只有一行有效代碼,其余都是異常處理,調用HttpSolrClient的request方法:
public NamedList<Object> request(final SolrRequest request, final ResponseParser processor, String collection)throws SolrServerException, IOException {HttpRequestBase method = createMethod(request, collection);setBasicAuthHeader(request, method);return executeMethod(method, processor);}響應流程
/*** Send this request to a {@link SolrClient} and return the response** @param client the SolrClient to communicate with* @param collection the collection to execute the request against** @return the response** @throws SolrServerException if there is an error on the Solr server* @throws IOException if there is a communication error*/public final T process(SolrClient client, String collection) throws SolrServerException, IOException {long startTime = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);T res = createResponse(client); res.setResponse(client.request(this, collection));long endTime = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);res.setElapsedTime(endTime - startTime);return res;}先調用創建QueryResponse方法:
@Overrideprotected QueryResponse createResponse(SolrClient client) {return new QueryResponse(client);}然后設置Response內容
/*** Send this request to a {@link SolrClient} and return the response** @param client the SolrClient to communicate with* @param collection the collection to execute the request against** @return the response** @throws SolrServerException if there is an error on the Solr server* @throws IOException if there is a communication error*/public final T process(SolrClient client, String collection) throws SolrServerException, IOException {long startTime = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS); T res = createResponse(client);res.setResponse(client.request(this, collection));long endTime = TimeUnit.MILLISECONDS.convert(System.nanoTime(), TimeUnit.NANOSECONDS);res.setElapsedTime(endTime - startTime);return res;}?小結:
SolrCloud檢索索引過程:
1、用戶的一個查詢,可以發送到含有該Collection的任意Solr的Server,Solr內部處理的邏輯會轉到一個Replica。
2、此Replica會基于查詢索引的方式,啟動分布式查詢,基于索引的Shard的個數,把查詢轉為多個子查詢,并把每個子查詢定位到對應Shard的任意一個Replica。
3、每個子查詢返回查詢結果。
4、最初的Replica合并子查詢,并把最終結果返回給用戶。
參考文獻
【1】https://support.lucidworks.com/hc/en-us/articles/206248037-Solr-JAVA-CloudSolrClient-Getting-Started
【2】http://josh-persistence.iteye.com/blog/2234411
轉載于:https://www.cnblogs.com/davidwang456/p/4971298.html
總結
以上是生活随笔為你收集整理的solrCloud源码分析之CloudSolrClient的全部內容,希望文章能夠幫你解決所遇到的問題。
 
                            
                        - 上一篇: zookeeper源码分析之一服务端启动
- 下一篇: SolrCloud之分布式索引及与Zoo
