Elasticsearch.Nest 教程系列 5-3 索引:Ingest Pipeline | Ingest 管道

  • 本系列博文是“伪”官方文档翻译(更加本土化),并非完全将官方文档进行翻译,而是在查阅、测试原始文档并转换为自己真知灼见后的“准”翻译。有不同见解 / 说明不周的地方,还请海涵、不吝拍砖 :)

  • 官方文档见此:https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/introduction.html

  • 本系列对应的版本环境:ElasticSearch@7.3.1,NEST@7.3.1,IDE 和开发平台默认为 VS2019,.NET CORE 2.1


Ingest 管道是一系列按照声明的顺序执行的处理器。

假设有以下 POCO 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Person
{
public int Id { get; set; }
public string FirstName { get; set; }
public string LastName { get; set; }
public string IpAddress { get; set; }
public GeoIp GeoIp { get; set; }
}

public class GeoIp
{
public string CityName { get; set; }
public string ContinentName { get; set; }
public string CountryIsoCode { get; set; }
public GeoLocation Location { get; set; }
public string RegionName { get; set; }
}

创建一个 Ingestion 管道:
eg:假设为 Person 文档建立索引,创建一个 Ingestion 管道,在被索引到 ES 之前对其进行相关操作:

  • 将 LastName 转换为大写

  • 将姓名的首字母索引到其他字段—这里用 initials 字段

  • 将 IpAddress 转换为具体地址。

以上操作需求可以通过创建自定义映射并创建 Ingestion 管道来实现。且直接使用 Person 类,而无需进行任何更改。

创建索引:

1
2
3
4
5
6
7
8
9
10
_client.Indices.Create("person", c => c
.Map<Person>(p => p
.AutoMap() //使用自动映射
.Properties(props => props
.Keyword(t => t.Name("initials")) //添加一个额外的字段(initials)来存储姓名的首字母
.Ip(t => t.Name(dv => dv.IpAddress)) //映射 IpAdress属性 为 IP 地址类型
.Object<GeoIp>(t => t.Name(dv => dv.GeoIp)) //把 GeoIp 类型映射为 object
)
)
);

发出的请求为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
PUT /person
{
"mappings": {
"properties": {
"id": {
"type": "integer"
},
"firstName": {
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
},
"type": "text"
},
"lastName": {
"fields": {
"keyword": {
"ignore_above": 256,
"type": "keyword"
}
},
"type": "text"
},
"ipAddress": {
"type": "ip"
},
"geoIp": {
"type": "object"
},
"initials": {
"type": "keyword"
}
}
}
}

创建管道:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
_client.Ingest.PutPipeline("person-pipeline", p => p
.Processors(ps => ps
.Uppercase<Person>(s => s
.Field(t => t.LastName) //把 LastName 转换为大写
)
.Script(s => s
.Lang("painless") //使用 painless 脚本填充新字段 initials,规则为取姓名的首字母
.Source("ctx.initials = ctx.firstName.substring(0,1) + ctx.lastName.substring(0,1)")
)
.GeoIp<Person>(s => s //使用 ingest-geoip 插件转换 ip 地址为具体地址
.Field(i => i.IpAddress)
.TargetField(i => i.GeoIp)
)
)
);

发出的命令为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
PUT /_ingest/pipeline/person-pipeline
{
"processors": [
{
"uppercase": {
"field": "lastName"
}
},
{
"script": {
"lang": "painless",
"source": "ctx.initials = ctx.firstName.substring(0,1) + ctx.lastName.substring(0,1)"
}
},
{
"geoip": {
"field": "ipAddress",
"target_field": "geoIp"
}
}
]
}

索引一份文档到 ES

1
2
3
4
5
6
7
8
9
var person = new Person
{
Id = 1,
FirstName = "Martijn",
LastName = "Laarman",
IpAddress = "139.130.4.5"
};
// 索引文档的时候 使用上面声明的 person-pipeline 管道。
var indexResponse = _client.Index(person, p => p.Index("person").Pipeline("person-pipeline"));

发出的请求为:

1
2
3
4
5
6
7
PUT /person/_doc/1?pipeline=person-pipeline
{
"id": 1,
"firstName": "Martijn",
"lastName": "Laarman",
"ipAddress": "139.130.4.5"
}

存储的文档内容为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
{
"_source" : {
"firstName" : "Martijn",
"lastName" : "LAARMAN",
"geoIp" : {
"continent_name" : "Oceania",
"region_iso_code" : "AU-NSW",
"city_name" : "Belrose",
"country_iso_code" : "AU",
"region_name" : "New South Wales",
"location" : {
"lon" : 151.2167,
"lat" : -33.7333
}
},
"initials" : "ML",
"ipAddress" : "139.130.4.5",
"id" : 1
}
}

批量添加时增加超时时间

索引文档的时候如果指定了管道,会增加索引的开销。如果进行大批量请求,增加默认索引的超时时间可以避免一些异常出现。

eg:使用 Bulk api 的时候增加超时时间的设定:

1
2
3
4
5
6
7
8
9
10
11
_client.Bulk(b => b
.Index("people")
.Pipeline("person-pipeline")
.Timeout("5m") //设定服务端批量操作超时时间。
.Index<Person>(/*snip*/)
.Index<Person>(/*snip*/)
.Index<Person>(/*snip*/)
.RequestConfiguration(rc => rc
.RequestTimeout(TimeSpan.FromMinutes(5)) //设定 HTTP 请求的超时时间。
)
);