Ingest 管道是一系列按照声明的顺序执行的处理器。
假设有以下 POCO 类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class Person { public int Id { get ; set ; } public string FirstName { get ; set ; } public string LastName { get ; set ; } public string IpAddress { get ; set ; } public GeoIp GeoIp { get ; set ; } } public class GeoIp { public string CityName { get ; set ; } public string ContinentName { get ; set ; } public string CountryIsoCode { get ; set ; } public GeoLocation Location { get ; set ; } public string RegionName { get ; set ; } }
创建一个 Ingestion 管道:
eg:假设为 Person 文档建立索引,创建一个 Ingestion 管道,在被索引到 ES 之前对其进行相关操作:
以上操作需求可以通过创建自定义映射并创建 Ingestion 管道来实现。且直接使用 Person 类,而无需进行任何更改。
创建索引:
1 2 3 4 5 6 7 8 9 10 _client.Indices.Create("person" , c => c .Map<Person>(p => p .AutoMap() .Properties(props => props .Keyword(t => t.Name("initials" )) .Ip(t => t.Name(dv => dv.IpAddress)) .Object<GeoIp>(t => t.Name(dv => dv.GeoIp)) ) ) );
发出的请求为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 PUT /person { "mappings" : { "properties" : { "id" : { "type" : "integer" }, "firstName" : { "fields" : { "keyword" : { "ignore_above" : 256, "type" : "keyword" } }, "type" : "text" }, "lastName" : { "fields" : { "keyword" : { "ignore_above" : 256, "type" : "keyword" } }, "type" : "text" }, "ipAddress" : { "type" : "ip" }, "geoIp" : { "type" : "object" }, "initials" : { "type" : "keyword" } } } }
创建管道:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 _client.Ingest.PutPipeline("person-pipeline" , p => p .Processors(ps => ps .Uppercase<Person>(s => s .Field(t => t.LastName) ) .Script(s => s .Lang("painless" ) .Source("ctx.initials = ctx.firstName.substring(0,1) + ctx.lastName.substring(0,1)" ) ) .GeoIp<Person>(s => s .Field(i => i.IpAddress) .TargetField(i => i.GeoIp) ) ) );
发出的命令为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 PUT /_ingest/pipeline/person-pipeline { "processors" : [ { "uppercase" : { "field" : "lastName" } }, { "script" : { "lang" : "painless" , "source" : "ctx.initials = ctx.firstName.substring(0,1) + ctx.lastName.substring(0,1)" } }, { "geoip" : { "field" : "ipAddress" , "target_field" : "geoIp" } } ] }
索引一份文档到 ES
1 2 3 4 5 6 7 8 9 var person = new Person{ Id = 1 , FirstName = "Martijn" , LastName = "Laarman" , IpAddress = "139.130.4.5" }; var indexResponse = _client.Index(person, p => p.Index("person" ).Pipeline("person-pipeline" ));
发出的请求为:
1 2 3 4 5 6 7 PUT /person/_doc/1?pipeline=person-pipeline { "id" : 1, "firstName" : "Martijn" , "lastName" : "Laarman" , "ipAddress" : "139.130.4.5" }
存储的文档内容为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 { "_source" : { "firstName" : "Martijn" , "lastName" : "LAARMAN" , "geoIp" : { "continent_name" : "Oceania" , "region_iso_code" : "AU-NSW" , "city_name" : "Belrose" , "country_iso_code" : "AU" , "region_name" : "New South Wales" , "location" : { "lon" : 151.2167 , "lat" : -33.7333 } } , "initials" : "ML" , "ipAddress" : "139.130.4.5" , "id" : 1 } }
批量添加时增加超时时间
索引文档的时候如果指定了管道,会增加索引的开销。如果进行大批量请求,增加默认索引的超时时间可以避免一些异常出现。
eg:使用 Bulk api 的时候增加超时时间的设定:
1 2 3 4 5 6 7 8 9 10 11 _client.Bulk(b => b .Index("people" ) .Pipeline("person-pipeline" ) .Timeout("5m" ) .Index<Person>() .Index<Person>() .Index<Person>() .RequestConfiguration(rc => rc .RequestTimeout(TimeSpan.FromMinutes(5 )) ) );