Elasticsearch - 聚合查询、先分组后排序
最编程
2024-04-28 21:10:42
...
需求
对明细数据先按waybillId分组,再按eventTime降序,取最新一条数据。
桶聚合(bucket)
Elasticsearch桶聚合,目的就是数据分组,先将数据按指定的条件分成多个组,然后对每一个组进行统计。
1. ES SQL写法
{
"query": {
"bool": {
"must": [
{
"match_all": {}
}
]
}
},
"aggs": {
"waybillIdAgg": {
"terms": {
"field": "waybillId",
"size": 1000,
"min_doc_count": 1
},
"aggs": {
"top1": {
"top_hits": {
"size": 1,
"sort": [
{
"eventTime": {
"order": "desc"
}
}
]
}
}
}
}
}
}
{
"query": {
"bool": {
"must": [
{
"match_all": {}
}
]
}
},
"aggs": {
"waybillIdAgg": {
"terms": {
"field": "waybillId",
"size": 1000,
"min_doc_count": 1
},
"aggs": {
"top1": {
"top_hits": {
"size": 1,
"sort": [
{
"eventTime": {
"order": "desc"
}
}
]
}
}
}
}
}
}
返回结果如下:
2. Java Elasticsearch写法及结果解析
// 查询条件
BoolQueryBuilder queryBool = QueryBuilders.boolQuery();
BoolQueryBuilder inFilter = new BoolQueryBuilder();
waybillIds.forEach(
waybillId -> inFilter.should(QueryBuilders.termQuery("waybillId", waybillId)));
queryBool.must(inFilter);
// 桶聚合(bucket),按waybillId分组
TermsAggregationBuilder termsAggregationBuilder =
AggregationBuilders.terms("waybillIdAgg").field("waybillId").size(1000).minDocCount(1);
// 嵌套桶,再按时间倒序取第一条数据
TopHitsAggregationBuilder sort =
AggregationBuilders.topHits("top1").size(1).sort("eventTime", SortOrder.DESC);
termsAggregationBuilder.subAggregation(sort);
SearchSourceBuilder searchSourceBuilder =
SearchSourceBuilder.searchSource().query(queryBool).aggregation(termsAggregationBuilder);
// 查询请求
SearchRequest searchRequest = new SearchRequest(esIndexConfig.getIndexNameTrackingDetail());
searchRequest.source(searchSourceBuilder);
// 执行查询
SearchResponse searchResponse =
restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 处理聚合查询结果
Aggregations aggregations = searchResponse.getAggregations();
Terms terms = aggregations.get("waybillIdAgg");
List<TrackingDetail> trackingDetails =
terms.getBuckets().stream()
.map(
t -> {
Aggregation top1 = t.getAggregations().get("top1");
Optional<SearchHit> first =
Arrays.stream(((ParsedTopHits) top1).getHits().getHits()).findFirst();
if (first.isPresent()) {
Map<String, Object> trackingDetailMap = first.get().getSourceAsMap();
return BeanUtil.fillBeanWithMap(trackingDetailMap, new TrackingDetail(), false);
}
return null;
})
.collect(Collectors.toList());
// 查询条件
BoolQueryBuilder queryBool = QueryBuilders.boolQuery();
BoolQueryBuilder inFilter = new BoolQueryBuilder();
waybillIds.forEach(
waybillId -> inFilter.should(QueryBuilders.termQuery("waybillId", waybillId)));
queryBool.must(inFilter);
// 桶聚合(bucket),按waybillId分组
TermsAggregationBuilder termsAggregationBuilder =
AggregationBuilders.terms("waybillIdAgg").field("waybillId").size(1000).minDocCount(1);
// 嵌套桶,再按时间倒序取第一条数据
TopHitsAggregationBuilder sort =
AggregationBuilders.topHits("top1").size(1).sort("eventTime", SortOrder.DESC);
termsAggregationBuilder.subAggregation(sort);
SearchSourceBuilder searchSourceBuilder =
SearchSourceBuilder.searchSource().query(queryBool).aggregation(termsAggregationBuilder);
// 查询请求
SearchRequest searchRequest = new SearchRequest(esIndexConfig.getIndexNameTrackingDetail());
searchRequest.source(searchSourceBuilder);
// 执行查询
SearchResponse searchResponse =
restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 处理聚合查询结果
Aggregations aggregations = searchResponse.getAggregations();
Terms terms = aggregations.get("waybillIdAgg");
List<TrackingDetail> trackingDetails =
terms.getBuckets().stream()
.map(
t -> {
Aggregation top1 = t.getAggregations().get("top1");
Optional<SearchHit> first =
Arrays.stream(((ParsedTopHits) top1).getHits().getHits()).findFirst();
if (first.isPresent()) {
Map<String, Object> trackingDetailMap = first.get().getSourceAsMap();
return BeanUtil.fillBeanWithMap(trackingDetailMap, new TrackingDetail(), false);
}
return null;
})
.collect(Collectors.toList());