【搜索系列】ES文档基本操作CURD实例演示

文章目录
  1. I. 项目搭建
    1. 1. 项目依赖
    2. 2. 配置信息
  2. II. CURD实例
    1. 1. 配置
    2. 2. 添加数据
    3. 3. 查询数据
    4. 3. 增量更新数据
    5. 4. 全量更新
    6. 5. 删除数据
    7. 6. 条件删除数据
    8. 7. 测试case
  3. III. 不能错过的源码和相关知识点
    1. 0. 项目
    2. 1. 微信公众号: 一灰灰Blog

本文将作为es系列第二篇,在前文项目搭建的基础上,先来看一下es的基本操作姿势,如何实现CURD

I. 项目搭建

1. 项目依赖

本项目借助SpringBoot 2.2.1.RELEASE + maven 3.5.3 + IDEA进行开发

开一个web服务用于测试

1
2
3
4
5
6
<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
</dependency>
</dependencies>

2. 配置信息

配置文件application.yml,注意下面的配置信息,下面采用的是由我们自己来解析配置的方式

1
2
3
4
5
6
7
8
elasticsearch:
host: localhost
port: 9200
user: elastic
pwd: test123
connTimeout: 3000
socketTimeout: 5000
connectionRequestTimeout: 500

II. CURD实例

1. 配置

注意,本文介绍的es是添加了权限验证,因此我们在于es进行交互时,需要在请求头中携带验证信息,注意下面的实现姿势

读取配置,初始化RestHighLevelClient,和前文介绍的差不多

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
@Getter
@Configuration
public class ElasticsearchConfiguration {

@Value("${elasticsearch.host}")
private String host;

@Value("${elasticsearch.port}")
private int port;

@Value("${elasticsearch.connTimeout}")
private int connTimeout;

@Value("${elasticsearch.socketTimeout}")
private int socketTimeout;

@Value("${elasticsearch.connectionRequestTimeout}")
private int connectionRequestTimeout;

@Value("${elasticsearch.user}")
private String user;

@Value("${elasticsearch.pwd}")
private String pwd;

@Bean(destroyMethod = "close", name = "client")
public RestHighLevelClient initRestClient() {
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port))
.setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder
.setConnectTimeout(connTimeout)
.setSocketTimeout(socketTimeout)
.setConnectionRequestTimeout(connectionRequestTimeout));
return new RestHighLevelClient(builder);
}

@Bean
public RequestOptions requestOptions() {
String auth = "Basic " + Base64Utils.encodeToString((user + ":" + pwd).getBytes());
RequestOptions.Builder build = RequestOptions.DEFAULT.toBuilder();
build.addHeader("Authorization", auth);
return build.build();
}
}

2. 添加数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
public class BasicCurdDemo {
@Autowired
private RestHighLevelClient client;
@Autowired
private RequestOptions requestOptions;

private String TEST_ID = "11123-33345-66543-55231";

/**
* 新增数据
*/
public void addDoc(String indexName, Object obj, String id) throws IOException {
// 指定索引
IndexRequest request = new IndexRequest(indexName);
request.type("_doc");
// 文档内容,source传参,第一种时按照 fieldName, fieldValue 成对的方式传入;下面是采用json串 + 指定ContentType的方式传入
request.source(JSON.toJSONString(obj), XContentType.JSON);
// 指定特殊的id,不指定时自动生成id
request.id(id);
IndexResponse response = client.index(request, requestOptions);
System.out.println("添加数据返回结果: " + response.toString());
}
}

添加数据,注意是利用 IndexRequest 来构建请求对象,添加文档时有几个注意事项

  • request.source() : 具体需要上传的文档,就是通过它挂上去的,我们这里采用的是json方式
  • request.id(): 如果上传的文档需要指定id,则可以使用它;若未指定,则表明自动生成id

发起请求: client.index()

3. 查询数据

这里先介绍一个基础的根据id进行查询的实例case,更多的查询姿势后面会详细介绍

1
2
3
4
5
6
7
8
9
10
11
12
/**
* 查询结果
*
* @param indexName
* @param id
* @throws Exception
*/
public void get(String indexName, String id) throws IOException {
GetRequest getRequest = new GetRequest(indexName, "_doc", id);
GetResponse response = client.get(getRequest, requestOptions);
System.out.println("查询结果:" + response.toString());
}

3. 增量更新数据

根据主键进行更新文档,如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* 更新文档,根据id进行更新,增量更新,存在的字段,覆盖;新增的字段,插入;旧字段,保留
*
* @param indexName
* @param docId
* @param obj
* @throws IOException
*/
public void updateDoc(String indexName, String docId, Object obj) throws IOException {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(indexName);
updateRequest.type("_doc");
updateRequest.id(docId);
// 设置数据
updateRequest.doc(JSON.toJSONString(obj), XContentType.JSON);

UpdateResponse response = client.update(updateRequest, requestOptions);
System.out.println("更新数据返回:" + response.toString());
}

注意

  • 上面的实现属于增量更新策略
  • 即:新传的文档,若key之前已经存在,则覆盖更新;若之前不存在,则插入;之前文档中未被覆盖的数据依然保留

4. 全量更新

另外一个根据条件进行更新的使用case如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* 条件更新
*
* @param indexName
* @param query
* @param data
* @throws IOException
*/
public void updateByCondition(String indexName, Map<String, String> query, Map<String, Object> data) throws IOException {
UpdateByQueryRequest updateRequest = new UpdateByQueryRequest(indexName);
for (Map.Entry<String, String> entry : query.entrySet()) {
QueryBuilder queryBuilder = new TermQueryBuilder(entry.getKey(), entry.getValue());
updateRequest.setQuery(queryBuilder);
}

// 更新值脚本,精确的更新方式
// ctx._source['xx'].add('新增字段')
// 条件判定 if(ctx._source.addr == 'hubei') { ctx._source.addr = 'wuhan';}
String source = "ctx._source.name='1hui';";
Script script = new Script(source);
updateRequest.setScript(script);

BulkByScrollResponse response = client.updateByQuery(updateRequest, requestOptions);
System.out.println("条件更新返回: " + response.toString());
get(indexName, TEST_ID);
System.out.println("0---------------------0");

// 采用全量覆盖式更新,直接使用data中的数据,覆盖之前的文档内容
source = "ctx._source=params";
script = new Script(ScriptType.INLINE, "painless", source, data);
updateRequest.setScript(script);
response = client.updateByQuery(updateRequest, requestOptions);
System.out.println("条件更新返回: " + response.toString());
get(indexName, TEST_ID);
}

5. 删除数据

直接根据id进行删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* 根据id进行删除
*
* @param indexName
* @param id
* @throws IOException
*/
public void delete(String indexName, String id) throws IOException {
DeleteRequest deleteRequest = new DeleteRequest(indexName);
deleteRequest.type("_doc");
deleteRequest.id(id);

DeleteResponse response = client.delete(deleteRequest, requestOptions);
System.out.println("删除后返回" + response.toString());
}

6. 条件删除数据

根据条件进行匹配删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 条件删除
*
* @param indexName
* @param query
* @throws IOException
*/
public void deleteByQuery(String indexName, Map<String, String> query) throws IOException {
DeleteByQueryRequest request = new DeleteByQueryRequest(indexName);
request.types("_doc");
for (Map.Entry<String, String> entry : query.entrySet()) {
QueryBuilder queryBuilder = new TermQueryBuilder(entry.getKey(), entry.getValue());
request.setQuery(queryBuilder);
}
BulkByScrollResponse response = client.deleteByQuery(request, requestOptions);
System.out.println("条件删除:" + response.toString());
get(indexName, TEST_ID);
}

7. 测试case

写一个测试demo,将上面的case都跑一遍

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public void testOperate() throws IOException {
String index = "basic_demo";
Map<String, Object> doc = newMap("name", "一灰灰", "age", 10, "skills", Arrays.asList("java", "python"));
// 新增
addDoc(index, doc, TEST_ID);
// 查询
get(index, TEST_ID);
// 更新
doc.clear();
doc.put("name", "一灰灰blog");
doc.put("addr", "hubei");
updateDoc(index, TEST_ID, doc);
get(index, TEST_ID);

updateByCondition(index, newMap("addr", "hubei"), newMap("name", "yihuihui", "site", "https://hhui.top"));
get(index, TEST_ID);

// 删除文档
delete(index, TEST_ID);
}

public <K, V> Map<K, V> newMap(K k, V v, Object... kv) {
Map<K, V> map = new HashMap<>();
map.put(k, v);
for (int i = 0; i < kv.length; i += 2) {
map.put((K) kv[i], (V) kv[i + 1]);
}
return map;
}

输出如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 1. 添加数据
添加数据返回结果: IndexResponse[index=basic_demo,type=_doc,id=11123-33345-66543-55231,version=1,result=created,seqNo=34,primaryTerm=4,shards={"total":2,"successful":1,"failed":0}]

# 2. 查询数据
查询结果:{"_index":"basic_demo","_type":"_doc","_id":"11123-33345-66543-55231","_version":1,"_seq_no":34,"_primary_term":4,"found":true,"_source":{"skills":["java","python"],"name":"一灰灰","age":10}}

# 3. 增量更新
2022-03-28 19:56:08.781 WARN 18332 --- [/O dispatcher 1] org.elasticsearch.client.RestClient : request [POST http://localhost:9200/basic_demo/_doc/11123-33345-66543-55231/_update?timeout=1m] returned 1 warnings: [299 Elasticsearch-7.12.0-78722783c38caa25a70982b5b042074cde5d3b3a "[types removal] Specifying types in document update requests is deprecated, use the endpoint /{index}/_update/{id} instead."]
更新数据返回:UpdateResponse[index=basic_demo,type=_doc,id=11123-33345-66543-55231,version=2,seqNo=35,primaryTerm=4,result=updated,shards=ShardInfo{total=2, successful=1, failures=[]}]
查询结果:{"_index":"basic_demo","_type":"_doc","_id":"11123-33345-66543-55231","_version":2,"_seq_no":35,"_primary_term":4,"found":true,"_source":{"skills":["java","python"],"name":"一灰灰blog","age":10,"addr":"hubei"}}

# 4. 全量条件更新
条件更新返回: BulkByScrollResponse[took=970ms,timed_out=false,sliceId=null,updated=1,created=0,deleted=0,batches=1,versionConflicts=0,noops=0,retries=0,throttledUntil=0s,bulk_failures=[],search_failures=[]]
查询结果:{"_index":"basic_demo","_type":"_doc","_id":"11123-33345-66543-55231","_version":3,"_seq_no":36,"_primary_term":4,"found":true,"_source":{"skills":["java","python"],"name":"1hui","addr":"hubei","age":10}}

III. 不能错过的源码和相关知识点

0. 项目

系列博文

源码

1. 微信公众号: 一灰灰Blog

尽信书则不如,以上内容,纯属一家之言,因个人能力有限,难免有疏漏和错误之处,如发现bug或者有更好的建议,欢迎批评指正,不吝感激

下面一灰灰的个人博客,记录所有学习和工作中的博文,欢迎大家前去逛逛

一灰灰blog


打赏 如果觉得我的文章对您有帮助,请随意打赏。
分享到