Spring Boot 集成 Elasticsearch 的更多方法(第二部分) - 其他类别
最编程
2024-01-24 17:11:01
...
因为我的项目中既用了 es, 又用了 mysql, es只是用来存储日志的 .
所以为了统一, 我还是创建了 LogService
和 LogServiceImpl
, 没有 mapper, 怎删改查的逻辑我写在 LogServiceImpl
里了.
LogService
public interface LogService {
// 插入logs
void insertLogs(List<Log> logs);
// 查找logs
List<Log> searchLogs(String username, Long startTime, Long endTime);
}
LogServiceImpl
实现类中用到了 JSON.toJSONString
, 别忘记引入 fastjson
依赖:
<!-- fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>
@Service
public class LogServiceImpl implements LogService {
@Autowired
private RestHighLevelClient restHighLevelClient;
@Override
public void insertLogs(List<Log> logs) {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout("10s");
// es里的批量插入
try {
for (Log log:logs) {
bulkRequest.add(
new IndexRequest("logs").source(JSON.toJSONString(log), XContentType.JSON)
);
}
restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public List<Log> searchLogs(String username, Long startTime, Long endTime) {
SearchRequest searchRequest = new SearchRequest("logs"); // logs是索引名
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
// 用来存放查到的所有log日志数据
List<Log> logs = new ArrayList<>();
// 查询条件
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
// 查找某个范围内的时间, time是es中的字段
boolQuery.filter(QueryBuilders.rangeQuery("time").gte(startTime).lte(endTime));
// 精确查找某个用户, 用must函数
boolQuery.must(QueryBuilders.matchQuery("username",username));
searchSourceBuilder.query(boolQuery);
searchRequest.source(searchSourceBuilder);
try {
// 发送请求
SearchResponse response = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
// 处理返回的数据, es返回的数据结构跟mysql不一样, 返回的是一个 SearchHit[] 对象, 所以这里再处理一下
for (SearchHit hit: response.getHits().getHits()) {
Log log = JSON.parseObject(JSON.toJSONString(hit.getSourceAsMap()), Log.class);
logs.add(log);
}
} catch (IOException e) {
e.printStackTrace();
}
return logs;
}
}
es返回的结构
关于 es 查询返回的结构, 可以再看一下:
我们拿到的 response
的结构是这样的 (数据是我伪造的可能有出入, 大概结构是这样的):
{
"took":1,
"time_out":false,
"_shards": {
"total":1,
"succesful":1,
"skipped":0,
"failed":0
},
"hits": {
"total":{
"value":45,
"relation":"eq"
},
"max_score":1.0,
"hits":[
{
"_index":"logs",
"_type":"_doc",
"_id":"jsOFp_jdfk_operR",
"_score":1.0,
"_source": {
"username":"xiaoming",
"content":"I am content",
"startTime":20240101,
"endTime":20240201
}
},
{
"_index":"logs",
"_type":"_doc",
"_id":"jswep_joerfk_opeoRR",
"_score":1.0,
"_source": {
"username":"xiaohong",
"content":"I am content2",
"startTime":20240101,
"endTime":20240201
}
}
]
}
}
response.getHits().getHits()
的结构是这样的:
[
{
"_index":"logs",
"_type":"_doc",
"_id":"jsOFp_jdfk_operR",
"_score":1.0,
"_source": {
"username":"xiaoming",
"content":"I am content",
"startTime":20240101,
"endTime":20240201
}
},
{
"_index":"logs",
"_type":"_doc",
"_id":"jswep_joerfk_opeoRR",
"_score":1.0,
"_source": {
"username":"xiaohong",
"content":"I am content2",
"startTime":20240101,
"endTime":20240201
}
}
]
在循环里面 hit.getSourceAsMap()
拿到的结构是这样的, 只拿到了 _source
的部分:
{
"username":"xiaohong",
"content":"I am content2",
"startTime":20240101,
"endTime":20240201
}
测试案例
下面是一些 es 的测试案例, 作为参考.
在建测试类的时候, 不要忘记加 @RunWith(SpringRunner.class)
注解(因为配置类是写在config文件夹下面的, 需要 @Autowired
注入一起启动)
@SpringBootTest
@RunWith(SpringRunner.class)
public class EsTest {
@Autowired
private RestHighLevelClient restHighLevelClient;
//索引的创建
@Test
public void testCreateIndex() throws IOException {
//1.创建索引请求
CreateIndexRequest request = new CreateIndexRequest("java_test");
//2.执行创建请求,请求后获得响应
CreateIndexResponse createIndexResponse = restHighLevelClient.indices().create(request, RequestOptions.DEFAULT);
System.out.println(createIndexResponse.isAcknowledged());
}
//获取索引,只能判断存不存在
@Test
public void testExistIndex() throws IOException {
GetIndexRequest request = new GetIndexRequest("java_test");
boolean exists = restHighLevelClient.indices().exists(request, RequestOptions.DEFAULT);
System.out.println(exists);
}
//删除索引
@Test
public void testDeleteIndex() throws IOException {
DeleteIndexRequest request = new DeleteIndexRequest("java_test");
AcknowledgedResponse response = restHighLevelClient.indices().delete(request, RequestOptions.DEFAULT);
System.out.println(response.isAcknowledged());
}
//添加文档
@Test
public void testAddDocument() throws IOException {
//创建对象
User user = new User("张三", 18);
//创建请求
IndexRequest request = new IndexRequest("java_test");
//规则 put /java_test/_doc/1
request.id("1");
request.timeout(TimeValue.timeValueSeconds(1));
//将数据放入请求 json
request.source(JSON.toJSONString(user), XContentType.JSON);
//客户端发送请求,获取响应结果
IndexResponse indexResponse = restHighLevelClient.index(request, RequestOptions.DEFAULT);
System.out.println(indexResponse.toString());
System.out.println(indexResponse.status());
}
//获取文档,判断是否存在
@Test
public void testExistDocument() throws IOException {
GetRequest request = new GetRequest("java_test","1");
//不获取返回的_source的上下文
request.fetchSourceContext(new FetchSourceContext(false));
request.storedFields("_none_");
boolean exists = restHighLevelClient.exists(request, RequestOptions.DEFAULT);
System.out.println(exists);
}
//获取文档的信息
@Test
public void testGetDocument() throws IOException {
GetRequest request = new GetRequest("java_test","1");
GetResponse response = restHighLevelClient.get(request, RequestOptions.DEFAULT);
String sourceAsString = response.getSourceAsString();
System.out.println(response);//返回全部内容和命令是一样的
System.out.println(sourceAsString);//打印文档内容
}
//更新文档的信息
@Test
public void testUpdateDocument() throws IOException {
UpdateRequest request = new UpdateRequest("java_test","1");
User user = new User("李四", 22);
request.doc(JSON.toJSONString(user),XContentType.JSON);
UpdateResponse response = restHighLevelClient.update(request, RequestOptions.DEFAULT);
System.out.println(response.status());
}
//删除文档记录
@Test
public void testDeleteDocument() throws IOException {
DeleteRequest request = new DeleteRequest("java_test", "1");
DeleteResponse deleteResponse = restHighLevelClient.delete(request, RequestOptions.DEFAULT);
System.out.println(deleteResponse.status());
}
//真实项目一般都会批量插入数据
@Test
public void testBulkRequest() throws IOException {
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.timeout("10s");
List<User> userList = new ArrayList<>();
userList.add(new User("zhangan1", 18));
userList.add(new User("zhangan2", 18));
userList.add(new User("zhangan3", 18));
userList.add(new User("lisi1", 18));
userList.add(new User("lisi2", 18));
userList.add(new User("lisi3", 18));
//批处理请求
for (int i = 0; i < userList.size(); i++) {
//批量更新和批量删除,就在这里修改对应的请求就可以了
bulkRequest.add(
new IndexRequest("java_test")
.id("" + (i+1))
.source(JSON.toJSONString(userList.get(i)), XContentType.JSON)
);
}
BulkResponse bulkResponse = restHighLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
System.out.println(bulkResponse.hasFailures());//是否失败,返回false 代表成功
}
//查询
@Test
public void testSearch() throws IOException {
SearchRequest searchRequest = new SearchRequest("java_test");
//构建搜索条件
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
//查询条件,我们可以使用QueryBuilds工具类来实现
//QueryBuilders.termQuery() 精确
//QueryBuilders.matchAllQuery() 匹配所有
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("name", "lisi1");
sourceBuilder.query(termQueryBuilder);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
searchRequest.source(sourceBuilder);
SearchResponse searchResponse = restHighLevelClient.search(searchRequest, RequestOptions.DEFAULT);
System.out.println(JSON.toJSONString(searchResponse.getHits()));
System.out.println("===============================================");
for (SearchHit searchHit:searchResponse.getHits().getHits()) {
System.out.println(searchHit.getSourceAsString());
}
}
}