Elasticsearch-Jest 配置ES集群源码解读
文章目錄
- Jest Github地址
- 搭建源碼環境
- Jest配置ES集群
- Jest 配置ES集群,確保應用高可用的原理探究
- 初始化 JestClient
- NodeChecker 源碼分析
- 發起請求的過程
- 遇到的問題
Jest Github地址
直接訪問 https://github.com/searchbox-io/Jest ,把源碼拉下來
搭建源碼環境
我拉了個5.3.4的版本,最新版本為6.3.1 ,大同小異
test 這個module是我自己寫的測試集群代碼,GitHub上是沒有這個的 .
Jest配置ES集群
單例Client ,有個屬性JestClient ,需要初始化。
package com.artisan.test;import com.google.gson.GsonBuilder; import io.searchbox.client.JestClient; import io.searchbox.client.JestClientFactory; import io.searchbox.client.config.HttpClientConfig;import java.util.Arrays; import java.util.concurrent.TimeUnit;public class Client {// volatile修飾,確保內存可見private volatile static Client client = null;private static JestClient jestClient;/*** 私有構造函數*/private Client() {initJestClient(); // 初始化JestClient}/*** 懶漢模式* double Check* @return*/public static Client getInstance() {if (client == null) {synchronized (Client.class) {if (client == null) {client = new Client();}}}return client;}/*** 獲取JestClient* @return*/public static JestClient getJestClient() {return jestClient;}private void initJestClient() {// 初始化的集群節點String[] serverUris = new String[]{"http://127.0.0.1:9200", "http://127.0.0.1:8200"};JestClientFactory factory = new JestClientFactory();// 設置HttpClientConfigfactory.setHttpClientConfig(new HttpClientConfig.Builder(Arrays.asList(serverUris)).discoveryEnabled(true) // 節點發現,確保訪問的節點都是存活的節點,達到高可用.discoveryFrequency(2000, TimeUnit.MILLISECONDS) // NodeChecker的執行頻率,默認10S.gson(new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss").create()).multiThreaded(true).readTimeout(10000).build());// 返回jestClientjestClient = factory.getObject();} }測試類
package com.artisan.test;import io.searchbox.client.JestResult; import io.searchbox.core.Get;import java.io.IOException;public class JestClientTest {/*** 構造函數*/public JestClientTest() {Client.getInstance();// 初始化Client}private static void getDocumentMyStroe(String id) {Get get = new Get.Builder("my_store", id).type("product").build();JestResult result ;try {result = Client.getJestClient().execute(get);if (result != null) System.out.println(id + ":" + result.getJsonObject());} catch (IOException e) {e.printStackTrace();}}public static void main(String[] args) throws Exception {Thread.sleep(5000);// 先讓NodeChecker運行,獲取存活的節點,主線程這里先休眠5秒for (int i = 0; i < Integer.MAX_VALUE; i++) {Thread.sleep(2000);getDocumentMyStroe("998");}} }Jest 配置ES集群,確保應用高可用的原理探究
來看看關鍵點.discoveryEnabled(true) 都干了啥?
初始化 JestClient
到 JestClientFactory#getObject() 方法 中看下 ,大致說下整個方法的邏輯:
public JestClient getObject() {// 初始化 JestHttpClientJestHttpClient client = new JestHttpClient();if (httpClientConfig == null) {log.debug("There is no configuration to create http client. Going to create simple client with default values");httpClientConfig = new HttpClientConfig.Builder("http://localhost:9200").build();}client.setRequestCompressionEnabled(httpClientConfig.isRequestCompressionEnabled());// 初始化的es集群節點client.setServers(httpClientConfig.getServerList());// 設置HttpClient、AsyncClient final HttpClientConnectionManager connectionManager = getConnectionManager();final NHttpClientConnectionManager asyncConnectionManager = getAsyncConnectionManager();client.setHttpClient(createHttpClient(connectionManager));client.setAsyncClient(createAsyncHttpClient(asyncConnectionManager));// 設置自定義的GsonGson gson = httpClientConfig.getGson();if (gson == null) {log.info("Using default GSON instance");} else {log.info("Using custom GSON instance");client.setGson(gson);}// 創建NodeChecker并啟動Node Discovery// set discovery (should be set after setting the httpClient on jestClient)if (httpClientConfig.isDiscoveryEnabled()) {log.info("Node Discovery enabled...");if (!Strings.isNullOrEmpty(httpClientConfig.getDiscoveryFilter())) {log.info("Node Discovery filtering nodes on \"{}\"", httpClientConfig.getDiscoveryFilter());}NodeChecker nodeChecker = createNodeChecker(client, httpClientConfig);client.setNodeChecker(nodeChecker);nodeChecker.startAsync();nodeChecker.awaitRunning();} else {log.info("Node Discovery disabled...");}// 如果maxConnectionIdleTime大于0則會創建IdleConnectionReaper,進行Idle connection reaping (空閑線程回收)// schedule idle connection reaping if configuredif (httpClientConfig.getMaxConnectionIdleTime() > 0) {log.info("Idle connection reaping enabled...");IdleConnectionReaper reaper = new IdleConnectionReaper(httpClientConfig, new HttpReapableConnectionManager(connectionManager, asyncConnectionManager));client.setIdleConnectionReaper(reaper);reaper.startAsync();reaper.awaitRunning();} else {log.info("Idle connection reaping disabled...");}Set<HttpHost> preemptiveAuthTargetHosts = httpClientConfig.getPreemptiveAuthTargetHosts();if (!preemptiveAuthTargetHosts.isEmpty()) {log.info("Authentication cache set for preemptive authentication");client.setHttpClientContextTemplate(createPreemptiveAuthContext(preemptiveAuthTargetHosts));}return client;}重點看下 discoveryEnable 設置為true的情況下,Jest的處理邏輯
NodeChecker 源碼分析
NodeChecker繼承了com.google.common.util.concurrent.AbstractScheduledService,
它的構造器根據clientConfig的discoveryFrequency及discoveryFrequencyTimeUnit了fixedDelayScheduler來執行node checker;
public NodeChecker(JestClient jestClient, ClientConfig clientConfig) {// 構建action ,可以根據前面HttpClientConfig#discoveryFilter(String discoveryFilter) 添加Nodeaction = new NodesInfo.Builder().withHttp().addNode(clientConfig.getDiscoveryFilter()).build();this.client = jestClient;this.defaultScheme = clientConfig.getDefaultSchemeForDiscoveredNodes();// 根據discoveryFrequency(2000, TimeUnit.MILLISECONDS) 實例化一個定時任務出來 使用的Google Guava的包 this.scheduler = Scheduler.newFixedDelaySchedule(0l,clientConfig.getDiscoveryFrequency(),clientConfig.getDiscoveryFrequencyTimeUnit());// 初始化的根節點 this.bootstrapServerList = ImmutableSet.copyOf(clientConfig.getServerList());// 實例化 discoveredServerList 為空,后續使用 this.discoveredServerList = new LinkedHashSet<String>();}實現了runOneIteration方法,該方法主要是發送NodesInfo請求 GET /_nodes/_all/http
@Overrideprotected void runOneIteration() throws Exception {JestResult result;try {result = client.execute(action);} catch (CouldNotConnectException cnce) {// Can't connect to this node, remove it from the listlog.error("Connect exception executing NodesInfo!", cnce);removeNodeAndUpdateServers(cnce.getHost());return;// do not elevate the exception since that will stop the scheduled calls.// throw new RuntimeException("Error executing NodesInfo!", e);} catch (Exception e) {log.error("Error executing NodesInfo!", e);client.setServers(bootstrapServerList);return;// do not elevate the exception since that will stop the scheduled calls.// throw new RuntimeException("Error executing NodesInfo!", e);} if (result.isSucceeded()) {LinkedHashSet<String> httpHosts = new LinkedHashSet<String>();JsonObject jsonMap = result.getJsonObject();JsonObject nodes = (JsonObject) jsonMap.get("nodes");if (nodes != null) {for (Entry<String, JsonElement> entry : nodes.entrySet()) {JsonObject host = entry.getValue().getAsJsonObject();JsonElement addressElement = null;if (host.has("version")) {int majorVersion = Integer.parseInt(Splitter.on('.').splitToList(host.get("version").getAsString()).get(0));if (majorVersion >= 5) {JsonObject http = host.getAsJsonObject("http");if (http != null && http.has(PUBLISH_ADDRESS_KEY_V5))addressElement = http.get(PUBLISH_ADDRESS_KEY_V5);}}if (addressElement == null) {// get as a JsonElement first as some nodes in the cluster may not have an http_addressif (host.has(PUBLISH_ADDRESS_KEY)) addressElement = host.get(PUBLISH_ADDRESS_KEY);}if (addressElement != null && !addressElement.isJsonNull()) {String httpAddress = getHttpAddress(addressElement.getAsString());if(httpAddress != null) httpHosts.add(httpAddress);}}}if (log.isDebugEnabled()) {log.debug("Discovered {} HTTP hosts: {}", httpHosts.size(), Joiner.on(',').join(httpHosts));}discoveredServerList = httpHosts;client.setServers(discoveredServerList);} else {log.warn("NodesInfo request resulted in error: {}", result.getErrorMessage());client.setServers(bootstrapServerList);}}- 請求成功的話 解析body,如果nodes下面有version,取第一位,判斷大于等于5的話則取http節點下面的PUBLISH_ADDRESS_KEY_V5[publish_address]屬性值,封裝成http后添加到discoveredServerList ,供請求獲取URL使用。(里面都是存活的節點),如果沒有取到,則取PUBLISH_ADDRESS_KEY[http_address]屬性值,封裝成http后添加到discoveredServerList。
- 請求拋出CouldNotConnectException則調用removeNodeAndUpdateServers方法移除該host;如果拋出其他的Exception則將client的servers重置為bootstrapServerList
發起請求的過程
執行的execute方法。Client.getJestClient 返回的是 JestClient接口
看下 JestHttpClient#execute
/*** @throws IOException in case of a problem or the connection was aborted during request,* or in case of a problem while reading the response stream* @throws CouldNotConnectException if an {@link HttpHostConnectException} is encountered*/@Overridepublic <T extends JestResult> T execute(Action<T> clientRequest) throws IOException {return execute(clientRequest, null);}繼續
public <T extends JestResult> T execute(Action<T> clientRequest, RequestConfig requestConfig) throws IOException {// 獲取 HttpUriRequest HttpUriRequest request = prepareRequest(clientRequest, requestConfig);CloseableHttpResponse response = null;try {response = executeRequest(request);return deserializeResponse(response, request, clientRequest);} catch (HttpHostConnectException ex) {throw new CouldNotConnectException(ex.getHost().toURI(), ex);} finally {if (response != null) {try {response.close();} catch (IOException ex) {log.error("Exception occurred while closing response stream.", ex);}}}}重點來了
HttpUriRequest request = prepareRequest(clientRequest, requestConfig);繼續跟到prepareRequest
protected <T extends JestResult> HttpUriRequest prepareRequest(final Action<T> clientRequest, final RequestConfig requestConfig) {String elasticSearchRestUrl = getRequestURL(getNextServer(), clientRequest.getURI());HttpUriRequest request = constructHttpMethod(clientRequest.getRestMethodName(), elasticSearchRestUrl, clientRequest.getData(gson), requestConfig);log.debug("Request method={} url={}", clientRequest.getRestMethodName(), elasticSearchRestUrl);// add headers added to actionfor (Entry<String, Object> header : clientRequest.getHeaders().entrySet()) {request.addHeader(header.getKey(), header.getValue().toString());}return request;}重點: getNextServer()
繼續
總結一下:
- JestHttpClient繼承了AbstractJestClient,它的execute及executeAsync方法都調用了prepareRequest來構造HttpUriRequest;
- prepareRequest方法會先調用getNextServer方法來獲取要請求的elasticSearchServer的地址;
- 而getNextServer方法則是調用的serverPoolReference.get().getNextServer()
- 看看 serverPoolReference 是個啥?
- 再看看剛才NodeChecker 處理完成后調用的 client.setServers(discoveredServerList);
到 AbstractJestClient 類中看下 setServers方法
AbstractJestClient有一個serverPoolReference屬性,AtomicReference,其泛型為ServerPool;setServers方法則是創建新的ServerPool,然后更新serverPoolReference
ServerPool有個AtomicInteger類型的nextServerIndex,getNextServer方法則是通過nextServerIndex.getAndIncrement() % serversRing.size()來確定取的serversRing這個List的index,其實現的是Round Robin策略;極端情況下出現IndexOutOfBoundsException的話,則會重置nextServerIndex為0,然后繼續按Round Robin策略取下一個server
是不是就對上了? NodeChecker負責更新,execute則從里面取,所里取出來的都是 存活的節點。 這樣就做到了動態的發現。
節點上線后,自動發送到該節點,節點掛掉后,能自動移除。 全稱無需干預。
再說一點, NodeChecker有個執行頻率, 確保這個執行完了以后,再請求ES。 舉個例子,比如3個節點,你啟動應用的時候,正好有一個節點是掛掉的,而且正常的業務請求正好請求到了這個壞的節點上,是不是就掛了。 如果NodeChecker執行完以后,那取出的節點肯定是都是存活的。
遇到的問題
說下背景, 老項目 升級 , 以前是 單個ES節點,所以 沒有配置 集群,且Jest版本為Jdk1.7
初始化JestClient如下
JestClientFactory factory = new JestClientFactory();factory.setHttpClientConfig(new HttpClientConfi.Builder("http://127.0.0.1:9200").gson(new GsonBuilder().setDateFormat("yyyy-MM-dd'T'HH:mm:ss").create()).multiThreaded(true).readTimeout(10000).build());jestClient = factory.getObject();配置連接集群的地址,最重要的一行代碼,增加 .discoveryEnabled(true)
用的是2.4.0的版本, 升級到了5.3.4以后,去debug jest的源碼的時候,打上的斷點,總和是源碼對不起來 … 結果是 IDEA 發布的Tomcat工程路徑中 老的2.4.0的jar包還在原來的目錄下面,導致Tomcat加載了2.4.0 jar包中的類,刪除老的jar包,重新編譯測試,通過。
做了幾件事兒
感興趣的同學,用我上面提供的測試代碼測試即可。
總結
以上是生活随笔為你收集整理的Elasticsearch-Jest 配置ES集群源码解读的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 白话Elasticsearch71-ES
- 下一篇: 白话Elasticsearch72_利用