Elasticsearch.Nest 教程系列 5-1 索引:Indexing | 索引(添加)文档

  • 本系列博文是“伪”官方文档翻译(更加本土化),并非完全将官方文档进行翻译,而是在查阅、测试原始文档并转换为自己真知灼见后的“准”翻译。有不同见解 / 说明不周的地方,还请海涵、不吝拍砖 :)

  • 官方文档见此:https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/introduction.html

  • 本系列对应的版本环境:ElasticSearch@7.3.1,NEST@7.3.1,IDE 和开发平台默认为 VS2019,.NET CORE 2.1


索引(Indexing):是指讲文档添加到 ES 中的过程,目前 Nest 提供了以下 3 种 Indexing 方式:

  • Indexing Documents
  • Ingest Node
  • Ingest Pipelines

NEST 提供了多种索引文档的方式:

假设有 Person 类:

1
2
3
4
5
6
public class Person
{
public int Id { get; set; }
public string FirstName { get; set; }
public string LastName { get; set; }
}

person 索引 mappings

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
{
"person" : {
"mappings" : {
"properties" : {
"firstName" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"id" : {
"type" : "integer"
},
"lastName" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
}

1.Single Document 单文档索引

IndexDocument 使用:

  • 通过 IndexDocument 方法,可以一次同步/异步的方式索引(添加)单个文档到 ES 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var person = new Person
{
Id = 1,
FirstName = "Martijn",
LastName = "Laarman"
};

//同步
var indexResponse = client.IndexDocument(person);
if (!indexResponse.IsValid)
{
// 可以使用 IsValid 属性来判断请求是否成功
}

//异步
var indexResponseAsync = await client.IndexDocumentAsync(person);

Index 使用:

  • 如果在索引(添加)文档的时候,需要增加额外的参数,可以使用 Index 方法。

1
2
3
4
5
6
7
8
9
10
11
var person = new Person
{
Id = 1,
FirstName = "Martijn",
LastName = "Laarman"
};

//方法一:通过委托
client.Index(person, i => i.Index("people"));
//方法二:通过 IndexRequest 对象
client.Index(new IndexRequest<Person>(person, "people"));

Multiple Document 多文档索引

Nest 提供了 IndexMany(Async) 和 Bulk(Async) 来进行多文档索引(添加)。

通过 IndexMany 和 IndexManyAsync 来批量添加文档

Nest 客户端提供了 IndexMany 和 IndexManyAsync 方法,可以让你快捷的索引(添加)多个文档。

  • 需要注意:这 2 个方法都是在单个 HTTP 请求中索引(添加)所有文档,因此,对于大文档集合而言,并不推荐使用这 2 个方法(对于大文档集合,可以考虑使用 BulkAllObservable  帮助类。)。

  • 实际是在单个 HTTP 请求中使用 _bulk 请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
var people = new[]
{
new Person { Id = 1, FirstName = "Martijn", LastName = "Laarman"},
new Person { Id = 2, FirstName = "Stuart" ,LastName = "Cam"},
new Person { Id = 3, FirstName = "Russ", LastName = "Cam"}
};

var indexManyResponse = _client.IndexMany(people);

if (indexManyResponse.Errors) //通过检查 Errors,可以让你知道批量操作是否有错误。
{
foreach (var itemWithError in indexManyResponse.ItemsWithErrors)
{
//你可以通过遍历 ItemsWithErrors 来获取错误信息。
Console.WriteLine("Failed to index document {0}: {1}", itemWithError.Id, itemWithError.Error);
}
}

// Alternatively, documents can be indexed asynchronously
var indexManyAsyncResponse = await _client.IndexManyAsync(people);

请求 URL 如下:

1
2
3
4
5
6
7
POST /_bulk
{"index":{"_id":"1","_index":"person"}}
{"id":1,"firstName":"Martijn","lastName":"Laarman"}
{"index":{"_id":"2","_index":"person"}}
{"id":2,"firstName":"Stuart","lastName":"Cam"}
{"index":{"_id":"3","_index":"person"}}
{"id":3,"firstName":"Russ","lastName":"Cam"}

生成的文档如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
{
"hits" : [
{
"_index" : "person",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"id" : 1,
"firstName" : "Martijn",
"lastName" : "Laarman"
}
},
{
"_index" : "person",
"_type" : "_doc",
"_id" : "2",
"_score" : 1.0,
"_source" : {
"id" : 2,
"firstName" : "Stuart",
"lastName" : "Cam"
}
},
{
"_index" : "person",
"_type" : "_doc",
"_id" : "3",
"_score" : 1.0,
"_source" : {
"id" : 3,
"firstName" : "Russ",
"lastName" : "Cam"
}
}
]
}

通过 Bulk 和 BulkAsync 来批量添加文档

如果在索引文档的时候你需要更加惊喜的控制,你可以使用 Bulk(Async) 方法,并使用描述器来自定义批量调用。

跟 IndexMany(Async) 一样, 实际是在单个 HTTP 请求中使用 _bulk 请求:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var people = new[]
{
new Person { Id = 1, FirstName = "Martijn", LastName = "Laarman"},
new Person { Id = 2, FirstName = "Stuart" ,LastName = "Cam"},
new Person { Id = 3, FirstName = "Russ", LastName = "Cam"}
};

//同步
var bulkIndexResponse = _client.Bulk(b => b
.Index("person")
.IndexMany(people));

// 异步
var asyncBulkIndexResponse = await _client.BulkAsync(b => b
.Index("person")
.IndexMany(people));

请求 URL 为:

1
2
3
4
5
6
7
POST /person/_bulk
{"index":{"_id":"1"}}
{"id":1,"firstName":"Martijn","lastName":"Laarman"}
{"index":{"_id":"2"}}
{"id":2,"firstName":"Stuart","lastName":"Cam"}
{"index":{"_id":"3"}}
{"id":3,"firstName":"Russ","lastName":"Cam"}

使用 BulkAllObservable 帮助类进行多文档索引

BulkAllObservable 帮助类可以让你专注于索引的整体目标,而不必关心重试、回退或分块机制。

BulkAllObservable 内部使用了 BulkAll 方法来批量添加,如下:

故你也可以直接使用 BulkAll 方法和 Wait 扩展方法来索引多个文档:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
var people = new[]
{
new Person { Id = 1, FirstName = "Martijn", LastName = "Laarman"},
new Person { Id = 2, FirstName = "Stuart" ,LastName = "Cam"},
new Person { Id = 3, FirstName = "Russ", LastName = "Cam"}
};

var bulkAllObserver = _client.BulkAll(people, b => b
.Index("person")
.BackOffTime("30s")//集群繁忙,报429错误码的时候,等待多久进行重试
.BackOffRetries(2) //重试次数
.RefreshOnCompleted() //
.MaxDegreeOfParallelism(Environment.ProcessorCount)
.Size(1000) //每次 bulk 请求时的文档数量。
).Wait(TimeSpan.FromMinutes(15), next =>
{//执行索引并等待 15min,BulkAll 请求虽然时异步的,但是是一个阻塞操作。

});
  • BulkAllObservable 提供了在索引失败的情况下自动重试/退回的功能,并控制在单个 HTTP 请求中索引文档数量的功能。

  • 如果文档数量很多,这可能会导致许多HTTP请求,每个HTTP请求包含1000个文档(最后一个请求可能包含相对较小数量的文档,具体取决于总数)。

请求 URL 如下:

1
2
3
4
5
6
7
8
9
POST /person/_bulk
{"index":{"_id":"1"}}
{"id":1,"firstName":"Martijn","lastName":"Laarman"}
{"index":{"_id":"2"}}
{"id":2,"firstName":"Stuart","lastName":"Cam"}
{"index":{"_id":"3"}}
{"id":3,"firstName":"Russ","lastName":"Cam"}

POST /person/_refresh

BulkAllObservable 还提供了一些其他高级特性:

  • BufferToBulk: 允许在将批量请求发送到服务器之前自定义批量请求中的各个操作。

  • RetryDocumentPredicate: 允许你来控制当索引失败时是否要进行重试。

  • DroppedDocumentCallback: 回调函数,当文档索引(添加)失败时触发。

示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
_client.BulkAll(people, b => b
.BufferToBulk((descriptor, list) => //在请求钱自定义一些私有操作。
{
foreach (var item in list)
{
descriptor.Index<Person>(bi => bi
.Index(item.Id % 2 == 0 ? "even-index" : "odd-index") // 将 id 为 偶数或者奇数的文档分别放到 even-index 和 odd-index 索引中
.Document(item)
);
}
})
.RetryDocumentPredicate((item, person) => //根据设定条件来决定是否需要重新进行索引
{
return item.Error.Index == "even-index" && person.FirstName == "Martijn";
})
.DroppedDocumentCallback((item, person) => // 当文档索引(添加)失败时触发该回调。
{
Console.WriteLine($"Unable to index: {item} {person}");
}));