ElasticSearch初体验之使用Java进行最基本的增删改查
好久沒寫博文了, 最近項(xiàng)目中使用到了ElaticSearch相關(guān)的一些內(nèi)容, 剛好自己也來做個(gè)總結(jié)。
現(xiàn)在自己也只能算得上入門, 總結(jié)下自己在工作中使用Java操作ES的一些小經(jīng)驗(yàn)吧。
本文總共分為三個(gè)部分:
一:ES相關(guān)基本概念及原理
二:ES使用場景介紹
三:使用Java進(jìn)行ES的增刪改查及代碼講解
一:ES相關(guān)基本概念:
ElasticSearch(簡稱ES)是一個(gè)基于Lucene構(gòu)建的開源、分布式、RESTful的全文本搜索引擎。
不過,ElasticSearch卻也不僅只是一個(gè)全文本搜索引擎,它還是一個(gè)分布式實(shí)時(shí)文檔存儲(chǔ),其中每個(gè)field均是被索引的數(shù)據(jù)且可被搜索;也是一個(gè)帶實(shí)時(shí)分析功能的分布式搜索引擎,并且能夠擴(kuò)展至數(shù)以百計(jì)的服務(wù)器存儲(chǔ)及處理PB級(jí)的數(shù)據(jù)。
如前所述,ElasticSearch在底層利用Lucene完成其索引功能,因此其許多基本概念源于Lucene。
我們先說說ES的基本概念。
- 索引 Index:對數(shù)據(jù)的邏輯存儲(chǔ)(倒排索引),不存儲(chǔ)原
始值。 - 類型 Type:對索引的邏輯分類,可以有?個(gè)或多個(gè)分類。
- ?檔 Document:基本數(shù)據(jù)單元,JSON。
- 字段 Filed
關(guān)系型數(shù)據(jù)與ES對比:
Relational DB -> Databases -> Tables -> Rows -> Columns
Elasticsearch -> Indices -> Types -> Documents -> Fields
這里再說下ES中很重要的概念--倒排索引。這同樣也是solr,lucene中所使用的索引方式。
例如我們正常的索引:
當(dāng)我們在關(guān)系型數(shù)據(jù)庫中,都是有id索引的, 我們通過id去查value速度是很快的。
但是如果我們想查value中包含字母b的值呢?特別是數(shù)據(jù)量很大的時(shí)候, 這種以id為索引的方式是不是就不適合了?
那么這里就適合使用倒排索引了:
這里將value進(jìn)行分詞, 然后將分詞結(jié)果拿出來當(dāng)做索引
跟正向的索引比較,也就是做了一個(gè)倒置,這就是倒排索引的思想
二,ES使用場景介紹
1、全文搜索(搜索引擎)
在一組文檔中查找某一單詞所在文檔及位置
2、模糊匹配
通過用戶的輸入去匹配詞庫中符合條件的詞條
3、商品搜索
通過商品的關(guān)鍵字去數(shù)據(jù)源中查找符合條件的商品
在我自己的項(xiàng)目中使用的情況是我有上百萬的文章需要被通過各種條件檢索到, 所以這里就直接使用ES, 現(xiàn)在線上檢索速度都是10ms之內(nèi)返回。
下面看看ES數(shù)據(jù)在瀏覽器的展示形式以及可視化界面的搜索:
三:使用Java進(jìn)行ES的增刪改查及代碼講解
1, 使用ES進(jìn)行增加和更新操作。
上面EsMixedDataDto是自己構(gòu)建的一個(gè)類, ES中保存的字段就是這個(gè)類中的所有字段。
接著是增加和更新操作了:
這個(gè)還是調(diào)用了系統(tǒng)封裝好的esClient中的insertOrUpdate方法,最后我會(huì)把ESClient中所有封裝的方法都貼出來, 其內(nèi)部就是調(diào)用了ES原生的insert或者update方法的。
2,使用ES進(jìn)行刪除操作
/** * 刪除索引 */ public void deleteIndex(String id) throws Exception{esClient.deleteDocument(id); }同上,也是使用了esClient中的delete方法,后面我會(huì)貼上esClient中所有方法。
3,使用ES進(jìn)行查詢
3.1 當(dāng)然ES最重要的還是多維度的查詢, 這里也是我要講的重點(diǎn)。
首先來個(gè)最簡單的搜索一篇文章的標(biāo)題:
這里先說說search_type, 也就是上面setSearchType(SearchType.QUERY_THEN_FETCH)的內(nèi)容:
- query_then_fetch:執(zhí)?查詢得到對?檔進(jìn)?排序的所需信息(在所
有分?上執(zhí)?),然后在相關(guān)分?上查詢?檔實(shí)際內(nèi)容。返回結(jié)果的
最?數(shù)量等于size參數(shù)的值。 - query_and_fetch:查詢在所有分?上并?執(zhí)?,所有分?返回等于
size值的結(jié)果數(shù),最終返回結(jié)果的最?數(shù)量等于size的值乘以分?
數(shù)。分?較多時(shí)會(huì)消耗過多資源。 - count:只返回匹配查詢的?檔數(shù)量。
- scan:?般在需要返回?量結(jié)果時(shí)使?。在發(fā)送第?次請求后,ES
會(huì)返回?個(gè)滾動(dòng)標(biāo)識(shí)符,類似于數(shù)據(jù)庫中的游標(biāo)。
我這里使用的是query_then_fetch。
3.2 緊接著說個(gè)多條件復(fù)雜的查詢:
/** * @param jiaxiaoId: 駕校id * @param title 文章的title關(guān)鍵詞 * @param publishStatus 發(fā)布狀態(tài) * @param stickStatus 置頂狀態(tài) * @param pageRequest 請求的頁碼和條數(shù) * @param highlight 搜索結(jié)果是否高亮顯示 */ public PageResponse<EsMixedDataDto> queryByConditions(Long jiaxiaoId, String title, PageRequest pageRequest, int publishStatus, int stickStatus, boolean highlight) {BoolQueryBuilder booleanQueryBuilder = QueryBuilders.boolQuery();booleanQueryBuilder.must(QueryBuilders.termQuery("jiaxiaoId", jiaxiaoId));if (StringUtils.isNotBlank(title)) {booleanQueryBuilder.must(QueryBuilders.multiMatchQuery(title, "title").type(MultiMatchQueryBuilder.Type.BEST_FIELDS));}//這里是添加是否發(fā)布的搜索條件, 默認(rèn)是只展示已發(fā)布的文章if (publishStatus == CommonConstants.DataStatus.INIT_STATUS) {booleanQueryBuilder.must(QueryBuilders.termQuery("publishStatus", CommonConstants.DataStatus.INIT_STATUS));} else {booleanQueryBuilder.mustNot(QueryBuilders.termQuery("publishStatus", CommonConstants.DataStatus.INIT_STATUS));}//這里是添加是否置頂?shù)乃阉鳁l件if (stickStatus == CommonConstants.DataStatus.PUBLISH_STATUS) {booleanQueryBuilder.must(QueryBuilders.termQuery("stickStatus", CommonConstants.DataStatus.PUBLISH_STATUS));} else if(stickStatus == CommonConstants.DataStatus.INIT_STATUS){booleanQueryBuilder.mustNot(QueryBuilders.termQuery("stickStatus", CommonConstants.DataStatus.PUBLISH_STATUS));}SearchRequestBuilder searchRequestBuilder = esClient.prepareSearch().setTypes(ES_TYPE_MIXEDDATA).setSearchType(SearchType.QUERY_THEN_FETCH).setQuery(booleanQueryBuilder).setFrom(pageRequest.getOffset()).setSize(pageRequest.getLimit()).addSort("stickStatus", SortOrder.DESC).setExplain(false);if (jiaxiaoId == null) {BoolFilterBuilder filterBuilder = FilterBuilders.boolFilter().must(FilterBuilders.missingFilter("jiaxiaoId"));searchRequestBuilder.setPostFilter(filterBuilder);}if (highlight) {searchRequestBuilder.addHighlightedField("title", 100, 1).setHighlighterPreTags("<font color='red'>").setHighlighterPostTags("</font>");} else {searchRequestBuilder.addSort("publishTime", SortOrder.DESC);}try {return getMixedData(searchRequestBuilder);} catch (Exception e) {log.error("ES搜索異常!", e.getMessage());throw new RuntimeException(e);} }這里不用的就是使用query和filterBuilder,searchRequestBuilder中可以設(shè)置query和postFilter。
Debug到這里, 其實(shí)寫的查詢語句最終還是拼接成了一個(gè)ES可讀的結(jié)構(gòu)化查詢語句:
3.3 最后貼上最重要的一個(gè)類ESClient.java, 這是我們針對于ElasticSearch封裝的一個(gè)類。
public class ESClient<T> {private static final Logger LOG = LoggerFactory.getLogger(ESClient.class);private static final String DEFAULT_ANALYZER = "ik_smart";private static final DozerBeanMapper dozerBeanMapper = new DozerBeanMapper();private final Client client;private String index;private Class<T> clazz;private String type;private BulkProcessor bulkProcessor;private List<String> serverHttpAddressList = Lists.newArrayList();private Map<String, JSONObject> sqlJsonMap = Maps.newHashMap();/** * 初始化一個(gè)連接ElasticSearch的客戶端 * * @param addresses ES服務(wù)器的Transport地址和端口的列表,多個(gè)服務(wù)器用逗號(hào)分隔,例如 localhost:9300,localhost:9300,... * @param clusterName 集群名稱 * @param index 索引名稱,這里應(yīng)該使用項(xiàng)目名稱 * @param username 用戶名稱 * @param password 用戶密碼 * @param type 索引類型 * @param clazz 存儲(chǔ)類 */public ESClient(String addresses, String clusterName, String index,String username, String password, String type, Class<T> clazz) {if (StringUtils.isBlank(addresses)) {throw new RuntimeException("沒有給定的ES服務(wù)器地址。");}this.index = index;this.type = type;this.clazz = clazz;// 獲得鏈接地址對象列表List<InetSocketTransportAddress> addressList = Lists.transform(Splitter.on(",").trimResults().omitEmptyStrings().splitToList(addresses),new Function<String, InetSocketTransportAddress>() {public InetSocketTransportAddress apply(String input) {String[] addressPort = input.split(":");String address = addressPort[0];Integer port = Integer.parseInt(addressPort[1]);serverHttpAddressList.add(address + ":" + 9200);return new InetSocketTransportAddress(address, port);}});// 建立關(guān)于ES的配置ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder().put("cluster.name", clusterName).put("client.transport.sniff", false);if (StringUtils.isNotBlank(username)) {builder.put("shield.user", username + ":" + password);}Settings settings = builder.build();// 生成原生客戶端TransportClient transportClient = new TransportClient(settings);for (InetSocketTransportAddress address : addressList) {transportClient.addTransportAddress(address);}client = transportClient;bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {public void beforeBulk(long executionId, BulkRequest request) {}public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {}public void afterBulk(long executionId, BulkRequest request, Throwable failure) {throw new RuntimeException(failure);}}).build();}/** * 初始化連接ElasticSearch的客戶端 * * @param client 原生客戶端 * @param index 索引名稱 * @param type 類型 * @param clazz 存儲(chǔ)類 */public ESClient(Client client, String index, String type, Class<T> clazz) {this.client = client;this.index = index;this.type = type;this.clazz = clazz;}/** * 向ES發(fā)送存儲(chǔ)請求,將一個(gè)對象存儲(chǔ)到服務(wù)器。 * * @param id 該對象的id * @param t 存儲(chǔ)實(shí)例 * @return 是否存儲(chǔ)成功 */public boolean indexDocument(String id, T t) {return indexDocument(id, type, t);}/** * 向ES發(fā)送存儲(chǔ)請求,將一個(gè)對象存儲(chǔ)到服務(wù)器。 * * @param t 存儲(chǔ)實(shí)例 * @return 返回存儲(chǔ)之后在ES服務(wù)器內(nèi)生成的隨機(jī)ID */public String indexDocument(T t) {IndexResponse indexResponse = client.prepareIndex(index, type).setSource(toJSONString(t)).execute().actionGet();return indexResponse.getId();}/** * 向ES發(fā)送存儲(chǔ)請求,將一個(gè)對象存儲(chǔ)到服務(wù)器,這個(gè)方法允許用戶手動(dòng)指定該對象的存儲(chǔ)類型名稱 * * @param id 對象id * @param type 存儲(chǔ)類型 * @param t 存儲(chǔ)實(shí)例 * @return 是否存儲(chǔ)成功 */public boolean indexDocument(String id, String type, T t) {IndexResponse indexResponse = client.prepareIndex(index, type, id).setSource(toJSONString(t)).execute().actionGet();return true;}/** * 向ES發(fā)送批量儲(chǔ)存請求, 請求不會(huì)馬上提交,而是會(huì)等待到達(dá)bulk設(shè)置的閾值后進(jìn)行提交.<br/> * 最后客戶端需要調(diào)用{@link #flushBulk()}方法. * * @param id 對象id * @param t 存儲(chǔ)實(shí)例 * @return 成功表示放入到bulk成功, 可能會(huì)拋出runtimeException */ public boolean indexDocumentBulk(String id, T t) { return indexDocumentBulk(id, type, t); } /** * 向ES發(fā)送批量存儲(chǔ)請求,將一個(gè)對象存儲(chǔ)到服務(wù)器,這個(gè)方法允許用戶手動(dòng)指定該對象的存儲(chǔ)類型名稱 * * @param id 對象id * @param type 存儲(chǔ)類型 * @param t 存儲(chǔ)實(shí)例 * @return 成功表示放入到bulk成功, 可能會(huì)拋出runtimeException * @see #indexDocument(String, Object) */ public boolean indexDocumentBulk(String id, String type, T t) { IndexRequest indexRequest = new IndexRequest(index, type, id).source(toJSONString(t)); bulkProcessor.add(indexRequest); return true; } /** * 向ES發(fā)送批量存儲(chǔ)請求, 請求不會(huì)馬上提交,而是會(huì)等待到達(dá)bulk設(shè)置的閾值后進(jìn)行提交.<br/> * 最后客戶端需要調(diào)用{@link #flushBulk()}方法. * * @param t 存儲(chǔ)實(shí)例 * @return 成功表示放入到bulk成功, 可能會(huì)拋出runtimeException */ public boolean indexDocumentBulk(T t) { IndexRequest indexRequest = new IndexRequest(index, type).source(toJSONString(t)); bulkProcessor.add(indexRequest); return true; } public boolean indexDocumentBulk(List<T> list) { for (T t : list) { indexDocumentBulk(t); } return true; } /** * 向ES發(fā)送批量存儲(chǔ)請求, 允許傳入一個(gè)Function, 用來從對象中獲取ID. * * @param list 對象列表 * @param idFunction 獲取ID * @return 成功表示放入到bulk成功, 可能會(huì)拋出runtimeException */ public boolean indexDocumentBulk(List<T> list, Function<T, String> idFunction) { for (T t : list) { indexDocumentBulk(idFunction.apply(t), t); } return true; } /** * 向ES發(fā)送更新文檔請求,將一個(gè)對象更新到服務(wù)器,會(huì)替換原有對應(yīng)ID的數(shù)據(jù)。 * * @param id id * @param t 存儲(chǔ)對象 * @return 是否更新成功 */ public boolean updateDocument(String id, T t) { return updateDocument(id, type, t); } /** * 向ES發(fā)送更新文檔請求,將一個(gè)對象更新到服務(wù)器,會(huì)替換原有對應(yīng)ID的數(shù)據(jù)。 * * @param id id * @param type 存儲(chǔ)類型 * @param t 存儲(chǔ)對象 * @return 是否更新成功 */ public boolean updateDocument(String id, String type, T t) { client.prepareUpdate(index, type, id).setDoc(toJSONString(t)) .execute().actionGet(); return true; } /** * 向ES發(fā)送批量更新請求 * * @param id 索引ID * @param t 存儲(chǔ)對象 * @return 成功表示放入到bulk成功, 可能會(huì)拋出runtimeException */ public boolean updateDocumentBulk(String id, T t) { UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(toJSONString(t)); bulkProcessor.add(updateRequest); return true; } /** * 向ES發(fā)送upsert請求, 如果該document不存在將會(huì)新建這個(gè)document, 如果存在則更新. * * @param id id * @param t 存儲(chǔ)對象 * @return 是否執(zhí)行成功 */ public boolean upsertDocument(String id, T t) { return upsertDocument(id, type, t); } /** * 向ES發(fā)送upsert請求, 如果該document不存在將會(huì)新建這個(gè)document, 如果存在則更新. * * @param id id * @param type 存儲(chǔ)類型 * @param t 存儲(chǔ)對象 * @return 是否執(zhí)行成功 */ public boolean upsertDocument(String id, String type, T t) { client.prepareUpdate(index, type, id).setDocAsUpsert(true).setDoc(toJSONString(t)) .execute().actionGet(); return true; } /** * 向ES發(fā)送批量upsert的請求. * * @param id id * @param t 儲(chǔ)存對象 * @return 是否執(zhí)行成功 */ public boolean upsertDocumentBulk(String id, T t) { UpdateRequest updateRequest = new UpdateRequest(index, type, id) .doc(toJSONString(t)); updateRequest.docAsUpsert(true); bulkProcessor.add(updateRequest); return true; } /** * 向ES發(fā)送獲取指定ID文檔的請求 * * @param id id * @return 搜索引擎實(shí)例 * @throws Exception */ public T getDocument(String id) throws Exception { try { GetResponse getResponse = client.prepareGet(index, type, id) .execute().actionGet(); if (getResponse.getSource() == null) { return null; } JSONObject jsonObject = new JSONObject(getResponse.getSource()); T t = clazz.newInstance(); toObject(t, jsonObject); return t; } catch (Exception e) { throw new RuntimeException(e); } } /** * 向ES發(fā)送刪除指定ID文檔的請求 * * @param id id * @return 是否刪除成功 * @throws Exception */ public boolean deleteDocument(String id) throws Exception { return deleteDocument(id, type); } /** * 向ES發(fā)送刪除指定ID文檔的請求 * * @param id id * @param type 存儲(chǔ)類型 * @return 是否刪除成功 * @throws Exception */ public boolean deleteDocument(String id, String type) throws Exception { DeleteResponse deleteResponse = client.prepareDelete(index, type, id) .execute().actionGet(); return deleteResponse.isFound(); } /** * 向ES發(fā)送搜索文檔的請求,返回分頁結(jié)果 * * @param searchText 搜索內(nèi)容 * @return 分頁結(jié)果 * @throws Exception */ public PageResponse<T> searchDocument(String searchText) throws Exception { PageRequest pageRequest = WebContext.get().page(); SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index) .setTypes(type) .setQuery(QueryBuilders.matchQuery("_all", searchText)) .setFrom(pageRequest.getOffset()) .setSize(pageRequest.getLimit()) .setFetchSource(true); return searchDocument(searchRequestBuilder); } /** * 向ES發(fā)送搜索文檔的請求,返回列表結(jié)果 * * @param searchText 搜索內(nèi)容 * @param start 起始位置 * @param size 獲取數(shù)據(jù)大小 * @return 返回?cái)?shù)據(jù)列表 * @throws Exception */ public List<T> searchDocument(String searchText, int start, int size) throws Exception { SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index) .setTypes(type) .setQuery(QueryBuilders.matchQuery("_all", searchText)) .setFrom(start) .setSize(size) .setFetchSource(true); PageResponse<T> pageResponse = searchDocument(searchRequestBuilder); return pageResponse.getItemList(); } /** * 向ES發(fā)送搜索文檔的請求,返回列表結(jié)果 * * @param searchText 搜索內(nèi)容 * @param type 類型 * @param start 起始位置 * @param size 數(shù)據(jù)大小 * @return 返回?cái)?shù)據(jù)列表 * @throws Exception */ public List<T> searchDocument(String searchText, String type, int start, int size) throws Exception { SearchRequestBuilder searchRequestBuilder = client.prepareSearch(index) .setTypes(type) .setQuery(QueryBuilders.matchQuery("_all", searchText)) .setFrom(start) .setSize(size) .setFetchSource(true); PageResponse<T> pageResponse = searchDocument(searchRequestBuilder); return pageResponse.getItemList(); } /** * 向ES發(fā)送搜索文檔的請求,返回分頁結(jié)果 * * @param searchRequestBuilder 搜索構(gòu)造器 * @return 分頁結(jié)果 * @throws Exception */ public PageResponse<T> searchDocument(SearchRequestBuilder searchRequestBuilder) throws Exception { SearchResponse searchResponse = search(searchRequestBuilder); return searchResponseToPageResponse(searchResponse); } /** * 獲得scrollId對應(yīng)的數(shù)據(jù). 請查看{@link #getScrollId(SearchRequestBuilder, int, int)}.<br/> * 可以反復(fù)調(diào)用該方法, 直到返回?cái)?shù)據(jù)為0. * * @param scrollId 給定的scrollId * @param keepSeconds scroll數(shù)據(jù)保留時(shí)間 * @return 分頁結(jié)果 * @throws Exception */ public PageResponse<T> scrollSearchDocument(String scrollId, int keepSeconds) throws Exception { return searchResponseToPageResponse(scrollSearch(scrollId, keepSeconds)); } /** * 向ES發(fā)送搜索請求,然后直接返回原始結(jié)果。 * * @param searchRequestBuilder 搜索構(gòu)造器 * @return 返回結(jié)果 */ public SearchResponse search(SearchRequestBuilder searchRequestBuilder) { return searchRequestBuilder.setTypes(type).execute().actionGet(); } /** * 向ES發(fā)送搜索請求,然后直接返回原始結(jié)果。 * * @param searchRequestBuilder 搜索構(gòu)造器 * @param type 類型 * @return 返回結(jié)果 */ public SearchResponse search(SearchRequestBuilder searchRequestBuilder, String type) { return searchRequestBuilder.setTypes(type).execute().actionGet(); } /** * 通過scrollId獲得數(shù)據(jù).請查看{@link #getScrollId(SearchRequestBuilder, int, int)}.<br/> * 可以反復(fù)調(diào)用該方法, 直到返回?cái)?shù)據(jù)為0. * * @param scrollId 給定的scrollId * @param keepSeconds scroll繼續(xù)保留的時(shí)間, 建議60秒 * @return 返回獲取的數(shù)據(jù) */ public SearchResponse scrollSearch(String scrollId, int keepSeconds) { return client.prepareSearchScroll(scrollId).setScroll(new TimeValue(keepSeconds * 1000)) .execute().actionGet(); } /** * 提供搜索構(gòu)造器來獲得搜索scrollId, 這個(gè)scrollId用作{@link #scrollSearch(String, int)} * 和{@link #scrollSearchDocument(String, int)}的參數(shù). <br/> * 當(dāng)需要獲取大量數(shù)據(jù)的時(shí)候, 請使用scrollSearch來進(jìn)行. * * @param searchRequestBuilder 搜索構(gòu)造器 * @param keepSeconds scroll搜索保留時(shí)間, 建議60秒 * @param sizePerShard 每次每個(gè)分片獲取的尺寸 * @return 返回scrollId, 用于scrollSearch方法. */ public String getScrollId(SearchRequestBuilder searchRequestBuilder, int keepSeconds, int sizePerShard) { SearchResponse searchResponse = searchRequestBuilder.setSearchType(SearchType.SCAN) .setScroll(new TimeValue(keepSeconds * 1000)) .setSize(sizePerShard).execute().actionGet(); return searchResponse.getScrollId(); } /** * 返回搜索指定內(nèi)容后,總共ES找到匹配的數(shù)據(jù)量。 * * @param searchText 搜索內(nèi)容 * @return 搜索結(jié)果數(shù)據(jù)量 */ public long countSearchResult(String searchText) { CountRequestBuilder countRequestBuilder = client.prepareCount(index) .setTypes(type) .setQuery(QueryBuilders.matchQuery("_all", searchText)); return countSearchResult(countRequestBuilder); } /** * 返回搜索指定內(nèi)容后,總共ES找到匹配的數(shù)據(jù)量。 * * @param searchText 搜索內(nèi)容 * @param type 類型 * @return 搜索結(jié)果數(shù)據(jù)量 */ public long countSearchResult(String searchText, String type) { CountRequestBuilder countRequestBuilder = client.prepareCount(index) .setTypes(type) .setQuery(QueryBuilders.matchQuery("_all", searchText)); return countSearchResult(countRequestBuilder); } /** * 返回搜索指定內(nèi)容后,總共ES找到匹配的數(shù)據(jù)量。 * * @param countRequestBuilder 計(jì)數(shù)請求構(gòu)造器實(shí)例 * @return 搜索結(jié)果數(shù)據(jù)量 * @see #prepareCount() */ public long countSearchResult(CountRequestBuilder countRequestBuilder) { return countRequestBuilder.execute().actionGet().getCount(); } /** * 用默認(rèn)的分詞器進(jìn)行文本分詞。 * * @param docText 給定的文本 * @param order 是否使用排序,如果使用排序,則相同分詞會(huì)被合并,并且出現(xiàn)次數(shù)最高的排在返回列表最頭部。 * @return 分詞器將文本分詞之后的詞語列表 */ public List<String> analyzeDocument(String docText, boolean order) { List<AnalyzeToken> tokenList = analyzeDocument(docText, DEFAULT_ANALYZER); if (order) { // 如果是使用排序,按照分詞出現(xiàn)次數(shù)進(jìn)行排序,并且會(huì)合并相同的分詞。 // 構(gòu)造分詞Map,key為分詞,value為出現(xiàn)次數(shù)。 Map<String, Integer> tokenMap = Maps.newHashMap(); for (AnalyzeToken token : tokenList) { if (tokenMap.get(token.getTerm()) == null) { tokenMap.put(token.getTerm(), 1); } else { tokenMap.put(token.getTerm(), tokenMap.get(token.getTerm()) + 1); } } // 將分詞Map進(jìn)行排序 List<Map.Entry<String, Integer>> tokenSortList = Ordering.from(new Comparator<Map.Entry<String, Integer>>() { public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) { return o2.getValue().compareTo(o1.getValue()); } }).sortedCopy(tokenMap.entrySet()); // 返回分詞列表。 return Lists.transform(tokenSortList, new Function<Map.Entry<String, Integer>, String>() { public String apply(Map.Entry<String, Integer> input) { return input.getKey(); } }); } else { // 返回所有分詞結(jié)果 return Lists.transform(tokenList, new Function<AnalyzeToken, String>() { public String apply(AnalyzeToken input) { return input.getTerm(); } }); } } /** * 用指定分詞器來分析給定的文本 * * @param docText 給定的文本 * @param analyzer 指定的分析器 * @return 分詞器將文本分詞之后的詞語列表 */ public List<AnalyzeToken> analyzeDocument(String docText, String analyzer) { AnalyzeResponse analyzeResponse = client.admin().indices().prepareAnalyze(index, docText) .setAnalyzer(analyzer) .execute().actionGet(); return analyzeResponse.getTokens(); } /** * 獲得一個(gè)搜索請求構(gòu)造器的實(shí)例,通過這個(gè)實(shí)例,可以進(jìn)行查詢相關(guān)操作。<br/> * 使用這個(gè)方法{@link ESClient#searchDocument(SearchRequestBuilder)}進(jìn)行查詢。 * <pre> * prepareSearch("telepathy") * .setTypes("article") * .setSearchType(SearchType.QUERY_THEN_FETCH) * .setQuery(QueryBuilders.matchQuery("_all", searchText)) * .setFrom(pageRequest.getLimit() * (pageRequest.getPage() - 1)) * .setSize(pageRequest.getLimit()) * .setExplain(true) * .addHighlightedField("title", 100, 1) * .setFetchSource(new String[]{}, new String[]{}); * </pre> * * @return 搜索請求構(gòu)造器實(shí)例 */ public SearchRequestBuilder prepareSearch() { return client.prepareSearch(index); } /** * 獲得一個(gè)計(jì)數(shù)請求構(gòu)造器的實(shí)例,通過這個(gè)實(shí)例可以進(jìn)行查詢選項(xiàng)的構(gòu)造。 * * @return 計(jì)數(shù)請求構(gòu)造器實(shí)例 * @see #prepareSearch() */ public CountRequestBuilder prepareCount() { return client.prepareCount(index); } /** * 獲得一個(gè)Document的term vector (doc frequency, positions, offsets) * * @return TermVectorResponse * @see #termVector() */ public ActionFuture<TermVectorResponse> termVector(TermVectorRequest request) { return client.termVector(request); } /** * 將SQL轉(zhuǎn)換成ES的JSON查詢對象. * * @param sql 給定的SQL * @return JSON對象 */ public JSONObject convertSqlToJSON(String sql) { if (sqlJsonMap.get(sql) != null) { return sqlJsonMap.get(sql); } List<String> addresses = Lists.newArrayList(serverHttpAddressList); while (addresses.size() > 0) { String sqlPluginUrl = "http://" + addresses.remove(RandomUtils.nextInt(0, addresses.size())) + "/_sql/_explain"; try { JSONObject json = JSONObject.parseObject( MucangHttpClient.getDefault().httpPostBody(sqlPluginUrl, sql, "text/plain") ); sqlJsonMap.put(sql, json); return json; } catch (Exception e) { LOG.error("調(diào)用elasticsearch-sql插件時(shí)遇到錯(cuò)誤, 原因:{}", e); } } throw new RuntimeException("調(diào)用elasticsearch-sql插件多次失敗, 請檢查服務(wù)器或者插件功能是否正常."); } /** * 用SQL語句進(jìn)行搜索. 使用${keyName}的方式代表需要替換的字符串(需要替換的字符串請用雙引號(hào)或者單引號(hào)引起來, 否則插件不能解析)<br/> * 例如: select * from table where mediaId="${mediaId}"<br/> * * @param sql 指定的SQL * @param kvPairs 替換鍵值對 * @return 搜索結(jié)果 * @throws Exception */ public SearchResponse searchSql(String sql, final Map<String, String> kvPairs) throws Exception { JSONObject jsonQuery = convertSqlToJSON(sql); PropertyPlaceholderHelper propertyPlaceholderHelper = new PropertyPlaceholderHelper("${", "}"); String queryString = propertyPlaceholderHelper.replacePlaceholders( jsonQuery.toJSONString(), new PropertyPlaceholderHelper.PlaceholderResolver() { public String resolvePlaceholder(String placeholderName) { if (StringUtils.isBlank(kvPairs.get(placeholderName))) { return ""; } else { return kvPairs.get(placeholderName); } } }); SearchRequestBuilder searchRequestBuilder = prepareSearch() .setSource(XContentFactory.jsonBuilder().value(JSONObject.parseObject(queryString))); return search(searchRequestBuilder); } /** * 將給定的對象轉(zhuǎn)換成JSON字符串,如果有特殊需求,可以覆蓋該方法。 * * @param t 給定的對象 * @return JSON字符串 */ public String toJSONString(T t) { return JSON.toJSONString(t); } /** * 將給定的Map里面的值注入到目標(biāo)對象。如果有特殊需求,可以覆蓋該方法。 * * @param t 目標(biāo)對象 * @param map 給定的map * @throws Exception */ public void toObject(T t, Map<String, ?> map) throws Exception { dozerBeanMapper.map(map, t); } /** * 將BulkProcessor的緩沖內(nèi)容進(jìn)行立即提交. */ public void flushBulk() { this.bulkProcessor.flush(); } public BulkProcessor getBulkProcessor() { return bulkProcessor; } public void setBulkProcessor(BulkProcessor bulkProcessor) { this.bulkProcessor = bulkProcessor; } public PageResponse<T> searchResponseToPageResponse(SearchResponse searchResponse) throws Exception { PageResponse<T> pageResponse = new PageResponse<>(); for (SearchHit searchHit : searchResponse.getHits().getHits()) { // 將結(jié)果實(shí)例化成對應(yīng)的類型實(shí)例 T t = this.clazz.newInstance(); Map<String, Object> hitMap; if (searchHit.getSource() != null) { hitMap = searchHit.getSource(); } else { hitMap = Maps.newHashMap(Maps.transformValues(searchHit.getFields(), new Function<SearchHitField, Object>() { public Object apply(SearchHitField input) { return input.getValues(); } } )); } for (HighlightField highlightField : searchHit.getHighlightFields().values()) { hitMap.put(highlightField.getName(), StringUtils.join(highlightField.getFragments(), "...")); } // 將數(shù)據(jù)轉(zhuǎn)換成對應(yīng)的實(shí)例 toObject(t, hitMap); pageResponse.getItemList().add(t); } pageResponse.setTotal(searchResponse.getHits().getTotalHits()); return pageResponse; } /** * 關(guān)閉native的鏈接. */ public void close() { IOUtils.closeQuietly(bulkProcessor); this.client.close(); }}如果有問題大家可以留言一起交流, 我也是一個(gè)es初學(xué)者。
分類: 工作經(jīng)驗(yàn),全文檢索 標(biāo)簽: ElasticSearch總結(jié)
以上是生活随笔為你收集整理的ElasticSearch初体验之使用Java进行最基本的增删改查的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: excel模板 基金账本_有哪些好用的E
- 下一篇: php gmssl,golang gms