ES 之 Java API 基本操作和聚合操作
生活随笔
收集整理的這篇文章主要介紹了
ES 之 Java API 基本操作和聚合操作
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1、es是基于lucene開發的一個分布式全文索引框架。往ES中存儲和從ES中查詢,格式為Json
索引:Index 相當于數據庫中的database
類型:type 相當于數據庫中的table
主鍵:id 相當于數據庫中的主鍵
往ES中存儲數據,其實就是往ES中的index下的type存儲Json數據
RESTFul風格API
通過http的形式,發送請求,對ES進行操作
查詢: GET
刪除: DELETE
添加: PUT/POST
修改: PUT/POST
2、基本api區別
* 結論:相關度查詢使用match,精確字段查詢使用matchPhrase即可。
* match:全文搜索, 通常用于對text類型字段的查詢,會對進行查詢的文本先進行分詞操作
* term:精確查詢,通常用于對keyword和有精確值的字段進行查詢,不會對進行查詢的文本進行分詞操作
1、EsCRUD.java
package es;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.bulk.byscroll.BulkByScrollResponse;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.DeleteByQueryAction;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.Aggregation;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.DoubleTerms;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.AvgAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.avg.InternalAvg;
import org.elasticsearch.search.aggregations.metrics.max.InternalMax;
import org.elasticsearch.search.aggregations.metrics.max.MaxAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.sum.InternalSum;
import org.elasticsearch.search.aggregations.metrics.sum.SumAggregationBuilder;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.InetAddress;
import java.util.Date;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.QueryBuilders.rangeQuery;
/**
*
* https://www.elastic.co/guide/en/elasticsearch/client/java-api/5.4/index.html
*/
public class EsCRUD {
private TransportClient client = null;
@Before
public void init() throws Exception {
//設置集群名稱
Settings settings = Settings.builder()
.put("cluster.name", "bigdata")
//自動感知的功能(可以通過當前指定的節點獲取所有es節點的信息)
.put("client.transport.sniff", true)
.build();
//創建client
client = new PreBuiltTransportClient(settings).addTransportAddresses(
new InetSocketTransportAddress(InetAddress.getByName("192.168.33.100"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("192.168.33.101"), 9300),
new InetSocketTransportAddress(InetAddress.getByName("192.168.33.102"), 9300));
}
@Test
public void testCreate() throws IOException {
IndexResponse response = client.prepareIndex("gamelog", "users", "1")
.setSource(
jsonBuilder()
.startObject()
.field("username", "老趙")
.field("gender", "male")
.field("birthday", new Date())
.field("fv", 9999)
.field("message", "trying out Elasticsearch")
.endObject()
).get();
}
//查找一條
@Test
public void testGet() throws IOException {
GetResponse response = client.prepareGet("gamelog", "users", "1").get();
System.out.println(response.getSourceAsString());
}
//查找多條
@Test
public void testMultiGet() throws IOException {
MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
.add("gamelog", "users", "1")
.add("gamelog", "users", "2", "3")
.add("news", "fulltext", "1")
.get();
for (MultiGetItemResponse itemResponse : multiGetItemResponses) {
GetResponse response = itemResponse.getResponse();
if (response.isExists()) {
String json = response.getSourceAsString();
System.out.println(json);
}
}
}
@Test
public void testUpdate() throws Exception {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("gamelog");
updateRequest.type("users");
updateRequest.id("2");
updateRequest.doc(
jsonBuilder()
.startObject()
.field("fv", 999.9)
.endObject());
client.update(updateRequest).get();
}
@Test
public void testDelete() {
DeleteResponse response = client.prepareDelete("gamelog", "users", "2").get();
System.out.println(response);
}
@Test
public void testDeleteByQuery() {
BulkByScrollResponse response =
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
//指定查詢條件
.filter(QueryBuilders.matchQuery("username", "老段"))
//指定索引名稱
.source("gamelog")
.get();
long deleted = response.getDeleted();
System.out.println(deleted);
}
//異步刪除
@Test
public void testDeleteByQueryAsync() {
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male"))
.source("gamelog")
.execute(new ActionListener<BulkByScrollResponse>() {
@Override
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted();
System.out.println("數據刪除了");
System.out.println(deleted);
}
@Override
public void onFailure(Exception e) {
e.printStackTrace();
}
});
try {
System.out.println("異步刪除");
Thread.sleep(10000);
} catch (Exception e) {
e.printStackTrace();
}
}
@Test
public void testRange() {
QueryBuilder qb = rangeQuery("fv")
// [88.99, 10000)
.from(88.99)
.to(10000)
.includeLower(true)
.includeUpper(false);
SearchResponse response = client.prepareSearch("gamelog").setQuery(qb).get();
System.out.println(response);
}
/**
* curl -XPUT 'http://192.168.5.251:9200/player_info/player/1' -d '{ "name": "curry", "age": 29, "salary": 3500,"team": "war", "position": "pg"}'
* curl -XPUT 'http://192.168.5.251:9200/player_info/player/2' -d '{ "name": "thompson", "age": 26, "salary": 2000,"team": "war", "position": "pg"}'
* curl -XPUT 'http://192.168.5.251:9200/player_info/player/3' -d '{ "name": "irving", "age": 25, "salary": 2000,"team": "cav", "position": "pg"}'
* curl -XPUT 'http://192.168.5.251:9200/player_info/player/4' -d '{ "name": "green", "age": 26, "salary": 2000,"team": "war", "position": "pf"}'
* curl -XPUT 'http://192.168.5.251:9200/player_info/player/5' -d '{ "name": "james", "age": 33, "salary": 4000,"team": "cav", "position": "sf"}'
*/
@Test
public void testAddPlayer() throws IOException {
IndexResponse response = client.prepareIndex("player_info", "player", "1")
.setSource(
jsonBuilder()
.startObject()
.field("name", "James")
.field("age", 33)
.field("salary", 3000)
.field("team", "cav")
.field("position", "sf")
.endObject()
).get();
}
/**
* https://elasticsearch.cn/article/102
*
* select team, count(*) as player_count from player group by team;
*/
@Test
public void testAgg1() {
//指定索引和type
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//按team分組然后聚合,但是并沒有指定聚合函數
TermsAggregationBuilder teamAgg = AggregationBuilders.terms("player_count").field("team");
//添加聚合器
builder.addAggregation(teamAgg);
//觸發
SearchResponse response = builder.execute().actionGet();
//System.out.println(response);
//將返回的結果放入到一個map中 key:字段名
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
// Set<String> keys = aggMap.keySet();
//
// for (String key: keys) {
// System.out.println(key);
// }
// //取出聚合屬性
StringTerms terms = (StringTerms) aggMap.get("player_count");
//
//// //依次迭代出分組聚合數據
// for (Terms.Bucket bucket : terms.getBuckets()) {
// //分組的名字
// String team = (String) bucket.getKey();
// //count,分組后一個組有多少數據
// long count = bucket.getDocCount();
// System.out.println(team + " " + count);
// }
Iterator<Terms.Bucket> teamBucketIt = terms.getBuckets().iterator();
while (teamBucketIt .hasNext()) {
Terms.Bucket bucket = teamBucketIt.next();
String team = (String) bucket.getKey();
long count = bucket.getDocCount();
System.out.println(team + " " + count);
}
}
/**
* select team, position, count(*) as pos_count from player group by team, position;
*/
@Test
public void testAgg2() {
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//指定別名team_name和分組的字段team
TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
TermsAggregationBuilder posAgg= AggregationBuilders.terms("pos_count").field("position");
//添加兩個聚合構建器
builder.addAggregation(teamAgg.subAggregation(posAgg));
//執行查詢
SearchResponse response = builder.execute().actionGet();
//將查詢結果放入map中
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
//根據屬性名到map中查找
StringTerms teams = (StringTerms) aggMap.get("team_name");
//循環查找結果
for (Terms.Bucket teamBucket : teams.getBuckets()) {
//先按球隊進行分組
String team = (String) teamBucket.getKey();
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
StringTerms positions = (StringTerms) subAggMap.get("pos_count");
//因為一個球隊有很多位置,那么還要依次拿出位置信息
for (Terms.Bucket posBucket : positions.getBuckets()) {
//拿到位置的名字
String pos = (String) posBucket.getKey();
//拿出該位置的數量
long docCount = posBucket.getDocCount();
//打印球隊,位置,人數
System.out.println(team + " " + pos + " " + docCount);
}
}
}
/**
* select team, max(age) as max_age from player group by team;
*/
@Test
public void testAgg3() {
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//指定安球隊進行分組
TermsAggregationBuilder teamAgg = AggregationBuilders.terms("team_name").field("team");
//指定分組求最大值
MaxAggregationBuilder maxAgg = AggregationBuilders.max("max_age").field("age");
//分組后求最大值
builder.addAggregation(teamAgg.subAggregation(maxAgg));
//查詢
SearchResponse response = builder.execute().actionGet();
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
//根據team屬性,獲取map中的內容
StringTerms teams = (StringTerms) aggMap.get("team_name");
for (Terms.Bucket teamBucket : teams.getBuckets()) {
//分組的屬性名
String team = (String) teamBucket.getKey();
//在將聚合后取最大值的內容取出來放到map中
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
//取分組后的最大值
InternalMax ages = (InternalMax)subAggMap.get("max_age");
double max = ages.getValue();
System.out.println(team + " " + max);
}
}
/**
* select team, avg(age) as avg_age, sum(salary) as total_salary from player group by team;
*/
@Test
public void testAgg4() {
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//指定分組字段
TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team");
//指定聚合函數是求平均數據
AvgAggregationBuilder avgAgg = AggregationBuilders.avg("avg_age").field("age");
//指定另外一個聚合函數是求和
SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
//分組的聚合器關聯了兩個聚合函數
builder.addAggregation(termsAgg.subAggregation(avgAgg).subAggregation(sumAgg));
SearchResponse response = builder.execute().actionGet();
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
//按分組的名字取出數據
StringTerms teams = (StringTerms) aggMap.get("team_name");
for (Terms.Bucket teamBucket : teams.getBuckets()) {
//獲取球隊名字
String team = (String) teamBucket.getKey();
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
//根據別名取出平均年齡
InternalAvg avgAge = (InternalAvg)subAggMap.get("avg_age");
//根據別名取出薪水總和
InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary");
double avgAgeValue = avgAge.getValue();
double totalSalaryValue = totalSalary.getValue();
System.out.println(team + " " + avgAgeValue + " " + totalSalaryValue);
}
}
/**
* select team, sum(salary) as total_salary from player group by team order by total_salary desc;
*/
@Test
public void testAgg5() {
SearchRequestBuilder builder = client.prepareSearch("player_info").setTypes("player");
//按team進行分組,然后指定排序規則
TermsAggregationBuilder termsAgg = AggregationBuilders.terms("team_name").field("team").order(Terms.Order.aggregation("total_salary ", true));
SumAggregationBuilder sumAgg = AggregationBuilders.sum("total_salary").field("salary");
builder.addAggregation(termsAgg.subAggregation(sumAgg));
SearchResponse response = builder.execute().actionGet();
Map<String, Aggregation> aggMap = response.getAggregations().getAsMap();
StringTerms teams = (StringTerms) aggMap.get("team_name");
for (Terms.Bucket teamBucket : teams.getBuckets()) {
String team = (String) teamBucket.getKey();
Map<String, Aggregation> subAggMap = teamBucket.getAggregations().getAsMap();
InternalSum totalSalary = (InternalSum)subAggMap.get("total_salary");
double totalSalaryValue = totalSalary.getValue();
System.out.println(team + " " + totalSalaryValue);
}
}
}
總結
以上是生活随笔為你收集整理的ES 之 Java API 基本操作和聚合操作的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: ZPW-2000A无绝缘移频轨道电路之主
- 下一篇: python socket epoll_