Elasticsearch.Nest 教程系列 5-1 索引:Indexing | 索引(添加)文档


索引(Indexing):是指讲文档添加到 ES 中的过程,目前 Nest 提供了以下 3 种 Indexing 方式:

  • Indexing Documents
  • Ingest Node
  • Ingest Pipelines

NEST 提供了多种索引文档的方式:

假设有 Person 类:

public class Person
{
    public int Id { get; set; }
    public string FirstName { get; set; }
    public string LastName { get; set; }
}

person 索引 mappings

{
  "person" : {
    "mappings" : {
      "properties" : {
        "firstName" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "id" : {
          "type" : "integer"
        },
        "lastName" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        }
      }
    }
  }
}

1.Single Document 单文档索引

IndexDocument 使用:

  • 通过 IndexDocument 方法,可以一次同步/异步的方式索引(添加)单个文档到 ES 中。


    ` csharp
    var person = new Person
    {
    Id = 1,
    FirstName = “Martijn”,
    LastName = “Laarman”
    };

//同步
var indexResponse = client.IndexDocument(person);
if (!indexResponse.IsValid)
{
// 可以使用 IsValid 属性来判断请求是否成功
}

//异步
var indexResponseAsync = await client.IndexDocumentAsync(person);


**Index  使用:**
- 如果在索引(添加)文档的时候,需要增加额外的参数,可以使用 Index 方法。

<p class="code-caption" data-lang="csharp" data-line_number="backend" data-trim_indent="backend" data-label_position="outer" data-labels_left="Code" data-labels_right=":" data-labels_copy="Copy Code"><span class="code-caption-label"></span></p>
``` csharp 
var person = new Person
{
    Id = 1,
    FirstName = "Martijn",
    LastName = "Laarman"
};

//方法一:通过委托
client.Index(person, i => i.Index("people")); 
//方法二:通过 IndexRequest 对象
client.Index(new IndexRequest<Person>(person, "people"));

Multiple Document 多文档索引

Nest 提供了 IndexMany(Async) 和 Bulk(Async) 来进行多文档索引(添加)。

通过 IndexMany 和 IndexManyAsync 来批量添加文档

Nest 客户端提供了 IndexMany 和 IndexManyAsync 方法,可以让你快捷的索引(添加)多个文档。

  • 需要注意:这 2 个方法都是在单个 HTTP 请求中索引(添加)所有文档,因此,对于大文档集合而言,并不推荐使用这 2 个方法(对于大文档集合,可以考虑使用 BulkAllObservable 帮助类。)。
  • 实际是在单个 HTTP 请求中使用 _bulk 请求


    ` csharp
    var people = new[]
    {
    new Person { Id = 1, FirstName = “Martijn”, LastName = “Laarman”},
    new Person { Id = 2, FirstName = “Stuart” ,LastName = “Cam”},
    new Person { Id = 3, FirstName = “Russ”, LastName = “Cam”}
    };

var indexManyResponse = _client.IndexMany(people);

if (indexManyResponse.Errors) //通过检查 Errors,可以让你知道批量操作是否有错误。
{
foreach (var itemWithError in indexManyResponse.ItemsWithErrors)
{
//你可以通过遍历 ItemsWithErrors 来获取错误信息。
Console.WriteLine(“Failed to index document {0}: {1}”, itemWithError.Id, itemWithError.Error);
}
}

// Alternatively, documents can be indexed asynchronously
var indexManyAsyncResponse = await _client.IndexManyAsync(people);


请求 URL 如下:
<p class="code-caption" data-lang="sh" data-line_number="backend" data-trim_indent="backend" data-label_position="outer" data-labels_left="Code" data-labels_right=":" data-labels_copy="Copy Code"><span class="code-caption-label"></span></p>
``` sh 
POST /_bulk
{"index":{"_id":"1","_index":"person"}}
{"id":1,"firstName":"Martijn","lastName":"Laarman"}
{"index":{"_id":"2","_index":"person"}}
{"id":2,"firstName":"Stuart","lastName":"Cam"}
{"index":{"_id":"3","_index":"person"}}
{"id":3,"firstName":"Russ","lastName":"Cam"}

生成的文档如下:

{
    "hits" : [
      {
        "_index" : "person",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "id" : 1,
          "firstName" : "Martijn",
          "lastName" : "Laarman"
        }
      },
      {
        "_index" : "person",
        "_type" : "_doc",
        "_id" : "2",
        "_score" : 1.0,
        "_source" : {
          "id" : 2,
          "firstName" : "Stuart",
          "lastName" : "Cam"
        }
      },
      {
        "_index" : "person",
        "_type" : "_doc",
        "_id" : "3",
        "_score" : 1.0,
        "_source" : {
          "id" : 3,
          "firstName" : "Russ",
          "lastName" : "Cam"
        }
      }
    ]
}

通过 Bulk 和 BulkAsync 来批量添加文档

如果在索引文档的时候你需要更加惊喜的控制,你可以使用 Bulk(Async) 方法,并使用描述器来自定义批量调用。

跟 IndexMany(Async) 一样, 实际是在单个 HTTP 请求中使用 _bulk 请求:

var people = new[]
{
    new Person { Id = 1, FirstName = "Martijn", LastName = "Laarman"},
    new Person { Id = 2, FirstName = "Stuart" ,LastName = "Cam"},
    new Person { Id = 3, FirstName = "Russ", LastName = "Cam"}
};

//同步
var bulkIndexResponse = _client.Bulk(b => b
    .Index("person")
    .IndexMany(people));

// 异步
var asyncBulkIndexResponse = await _client.BulkAsync(b => b
    .Index("person")
    .IndexMany(people));

请求 URL 为:

POST /person/_bulk
{"index":{"_id":"1"}}
{"id":1,"firstName":"Martijn","lastName":"Laarman"}
{"index":{"_id":"2"}}
{"id":2,"firstName":"Stuart","lastName":"Cam"}
{"index":{"_id":"3"}}
{"id":3,"firstName":"Russ","lastName":"Cam"}

使用 BulkAllObservable 帮助类进行多文档索引

BulkAllObservable 帮助类可以让你专注于索引的整体目标,而不必关心重试、回退或分块机制。

BulkAllObservable 内部使用了 BulkAll 方法来批量添加,如下:

故你也可以直接使用 BulkAll 方法和 Wait 扩展方法来索引多个文档:

var people = new[]
{
    new Person { Id = 1, FirstName = "Martijn", LastName = "Laarman"},
    new Person { Id = 2, FirstName = "Stuart" ,LastName = "Cam"},
    new Person { Id = 3, FirstName = "Russ", LastName = "Cam"}
};

var bulkAllObserver = _client.BulkAll(people, b => b
        .Index("person")
        .BackOffTime("30s")//集群繁忙,报429错误码的时候,等待多久进行重试
        .BackOffRetries(2) //重试次数
        .RefreshOnCompleted() //
        .MaxDegreeOfParallelism(Environment.ProcessorCount)
        .Size(1000) //每次 bulk 请求时的文档数量。
    ).Wait(TimeSpan.FromMinutes(15), next => 
     {//执行索引并等待 15min,BulkAll 请求虽然时异步的,但是是一个阻塞操作。

     });
  • BulkAllObservable 提供了在索引失败的情况下自动重试/退回的功能,并控制在单个 HTTP 请求中索引文档数量的功能。
  • 如果文档数量很多,这可能会导致许多HTTP请求,每个HTTP请求包含1000个文档(最后一个请求可能包含相对较小数量的文档,具体取决于总数)。

请求 URL 如下:

POST /person/_bulk
{"index":{"_id":"1"}}
{"id":1,"firstName":"Martijn","lastName":"Laarman"}
{"index":{"_id":"2"}}
{"id":2,"firstName":"Stuart","lastName":"Cam"}
{"index":{"_id":"3"}}
{"id":3,"firstName":"Russ","lastName":"Cam"}

POST /person/_refresh

BulkAllObservable 还提供了一些其他高级特性:

  • BufferToBulk: 允许在将批量请求发送到服务器之前自定义批量请求中的各个操作。
  • RetryDocumentPredicate: 允许你来控制当索引失败时是否要进行重试。
  • DroppedDocumentCallback: 回调函数,当文档索引(添加)失败时触发。

示例如下:

_client.BulkAll(people, b => b
    .BufferToBulk((descriptor, list) => //在请求钱自定义一些私有操作。
    {
        foreach (var item in list)
        {
            descriptor.Index<Person>(bi => bi
                .Index(item.Id % 2 == 0 ? "even-index" : "odd-index") // 将 id 为 偶数或者奇数的文档分别放到 even-index 和 odd-index 索引中
                .Document(item)
            );
        }
    })
    .RetryDocumentPredicate((item, person) =>  //根据设定条件来决定是否需要重新进行索引
    {
        return item.Error.Index == "even-index" && person.FirstName == "Martijn";
    })
    .DroppedDocumentCallback((item, person) => // 当文档索引(添加)失败时触发该回调。
    {
        Console.WriteLine($"Unable to index: {item} {person}");
    }));