欢迎您访问 最编程 本站为您分享编程语言代码,编程技术文章!
您现在的位置是: 首页

Elasticsearch - 聚合查询、先分组后排序

最编程 2024-04-28 21:10:42
...

需求

      对明细数据先按waybillId分组,再按eventTime降序,取最新一条数据。

桶聚合(bucket)

   Elasticsearch桶聚合,目的就是数据分组,先将数据按指定的条件分成多个组,然后对每一个组进行统计。

1. ES SQL写法
{
  "query": {
    "bool": {
      "must": [
        {
          "match_all": {}
        }
      ]
    }
  },
  "aggs": {
    "waybillIdAgg": {
      "terms": {
        "field": "waybillId",
        "size": 1000,
        "min_doc_count": 1
      },
      "aggs": {
        "top1": {
          "top_hits": {
            "size": 1,
            "sort": [
              {
                "eventTime": {
                  "order": "desc"
                }
              }
            ]
          }
        }
      }
    }
  }
}

返回结果如下:

    

2. Java Elasticsearch写法及结果解析
    // 查询条件
    BoolQueryBuilder queryBool = QueryBuilders.boolQuery();

    BoolQueryBuilder inFilter = new BoolQueryBuilder();
    waybillIds.forEach(
        waybillId -> inFilter.should(QueryBuilders.termQuery("waybillId", waybillId)));
    queryBool.must(inFilter);

    // 桶聚合(bucket),按waybillId分组
    TermsAggregationBuilder termsAggregationBuilder =
        AggregationBuilders.terms("waybillIdAgg").field("waybillId").size(1000).minDocCount(1);
    // 嵌套桶,再按时间倒序取第一条数据
    TopHitsAggregationBuilder sort =
        AggregationBuilders.topHits("top1").size(1).sort("eventTime", SortOrder.DESC);
    termsAggregationBuilder.subAggregation(sort);

    SearchSourceBuilder searchSourceBuilder =
        SearchSourceBuilder.searchSource().query(queryBool).aggregation(termsAggregationBuilder);

    // 查询请求
    SearchRequest searchRequest = new SearchRequest(esIndexConfig.getIndexNameTrackingDetail());
    searchRequest.source(searchSourceBuilder);

    // 执行查询
    SearchResponse searchResponse =
        restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
    // 处理聚合查询结果
    Aggregations aggregations = searchResponse.getAggregations();
    Terms terms = aggregations.get("waybillIdAgg");
    List<TrackingDetail> trackingDetails =
        terms.getBuckets().stream()
            .map(
                t -> {
                  Aggregation top1 = t.getAggregations().get("top1");
                  Optional<SearchHit> first =
                      Arrays.stream(((ParsedTopHits) top1).getHits().getHits()).findFirst();
                  if (first.isPresent()) {
                    Map<String, Object> trackingDetailMap = first.get().getSourceAsMap();
                    return BeanUtil.fillBeanWithMap(trackingDetailMap, new TrackingDetail(), false);
                  }
                  return null;
                })
            .collect(Collectors.toList());