Elasticsearch.Nest 教程系列 5-3 索引:Ingest Pipeline | Ingest 管道


Ingest 管道是一系列按照声明的顺序执行的处理器。

假设有以下 POCO 类:

public class Person
{
    public int Id { get; set; }
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public string IpAddress { get; set; }
    public GeoIp GeoIp { get; set; }
}

public class GeoIp
{
    public string CityName { get; set; }
    public string ContinentName { get; set; }
    public string CountryIsoCode { get; set; }
    public GeoLocation Location { get; set; }
    public string RegionName { get; set; }
}

创建一个 Ingestion 管道:
eg:假设为 Person 文档建立索引,创建一个 Ingestion 管道,在被索引到 ES 之前对其进行相关操作:

  • 将 LastName 转换为大写
  • 将姓名的首字母索引到其他字段—这里用 initials 字段
  • 将 IpAddress 转换为具体地址。

以上操作需求可以通过创建自定义映射并创建 Ingestion 管道来实现。且直接使用 Person 类,而无需进行任何更改。

创建索引:

_client.Indices.Create("person", c => c
    .Map<Person>(p => p
        .AutoMap()    //使用自动映射
        .Properties(props => props
            .Keyword(t => t.Name("initials")) //添加一个额外的字段(initials)来存储姓名的首字母
            .Ip(t => t.Name(dv => dv.IpAddress)) //映射 IpAdress属性 为 IP 地址类型
            .Object<GeoIp>(t => t.Name(dv => dv.GeoIp)) //把 GeoIp 类型映射为 object
        )
    )
);

发出的请求为:

PUT /person
{
    "mappings": {
        "properties": {
            "id": {
                "type": "integer"
            },
            "firstName": {
                "fields": {
                    "keyword": {
                        "ignore_above": 256,
                        "type": "keyword"
                    }
                },
                "type": "text"
            },
            "lastName": {
                "fields": {
                    "keyword": {
                        "ignore_above": 256,
                        "type": "keyword"
                    }
                },
                "type": "text"
            },
            "ipAddress": {
                "type": "ip"
            },
            "geoIp": {
                "type": "object"
            },
            "initials": {
                "type": "keyword"
            }
        }
    }
}

创建管道:

_client.Ingest.PutPipeline("person-pipeline", p => p
    .Processors(ps => ps
        .Uppercase<Person>(s => s
            .Field(t => t.LastName) //把 LastName 转换为大写
        )
        .Script(s => s
            .Lang("painless")  //使用 painless 脚本填充新字段 initials,规则为取姓名的首字母
            .Source("ctx.initials = ctx.firstName.substring(0,1) + ctx.lastName.substring(0,1)")
        )
        .GeoIp<Person>(s => s //使用 ingest-geoip 插件转换 ip 地址为具体地址
            .Field(i => i.IpAddress)
            .TargetField(i => i.GeoIp)
        )
    )
);

发出的命令为:

PUT /_ingest/pipeline/person-pipeline
{
    "processors": [
        {
            "uppercase": {
                "field": "lastName"
            }
        },
        {
            "script": {
                "lang": "painless",
                "source": "ctx.initials = ctx.firstName.substring(0,1) + ctx.lastName.substring(0,1)"
            }
        },
        {
            "geoip": {
                "field": "ipAddress",
                "target_field": "geoIp"
            }
        }
    ]
}

索引一份文档到 ES

var person = new Person
{
    Id = 1,
    FirstName = "Martijn",
    LastName = "Laarman",
    IpAddress = "139.130.4.5"
};
// 索引文档的时候 使用上面声明的 person-pipeline 管道。
var indexResponse = _client.Index(person, p => p.Index("person").Pipeline("person-pipeline"));

发出的请求为:

PUT /person/_doc/1?pipeline=person-pipeline
{
    "id": 1,
    "firstName": "Martijn",
    "lastName": "Laarman",
    "ipAddress": "139.130.4.5"
}

存储的文档内容为:

{
    "_source" : {
          "firstName" : "Martijn",
          "lastName" : "LAARMAN",
          "geoIp" : {
            "continent_name" : "Oceania",
            "region_iso_code" : "AU-NSW",
            "city_name" : "Belrose",
            "country_iso_code" : "AU",
            "region_name" : "New South Wales",
            "location" : {
              "lon" : 151.2167,
              "lat" : -33.7333
            }
          },
          "initials" : "ML",
          "ipAddress" : "139.130.4.5",
          "id" : 1
        }
}

批量添加时增加超时时间

索引文档的时候如果指定了管道,会增加索引的开销。如果进行大批量请求,增加默认索引的超时时间可以避免一些异常出现。

eg:使用 Bulk api 的时候增加超时时间的设定:

_client.Bulk(b => b
    .Index("people")
    .Pipeline("person-pipeline")
    .Timeout("5m")   //设定服务端批量操作超时时间。
    .Index<Person>(/*snip*/)
    .Index<Person>(/*snip*/)
    .Index<Person>(/*snip*/)
    .RequestConfiguration(rc => rc
        .RequestTimeout(TimeSpan.FromMinutes(5))  //设定 HTTP 请求的超时时间。
    )
);