Elasticsearch.Nest 教程系列 2-2 连接:Connection pools | 连接池使用


连接池是一个内部机制,它主要关注于集群中注册的节点以及判断 NEST 可以使用那些节点来发送客户端的请求。

  • P.S.:此连接池非彼连接池(跟ADO.NET 中的数据库连接池不同),NEST 的连接池并不负责管理跟 ES 服务器的 TCP 连接。
    • 到 ES 服务端的 TCP 连接由 DESKTOP CLR 中的 ServicePointManager 负责,更多关于 ServicePointManager 见此

ES 中的连接池的作用:负责管理 ES 集群中可以建立连接的节点,并且一个 IConnectionPool 示例与一个 ConnectionSetting 示例相关联(一一对应关联)。

由于单个 NEST 客户端建议使用’单例’,因此单个连接池实例的生命周期跟应用程序的生命周期相同。

一共包含以下 5 个类型的连接池

  • SingleNodeConnectionPool
  • CloudConnectionPool
  • StaticConnectionPool
  • SniffingConnectionPool
  • StickyConnectionPool

SingleNodeConnectionPool

一般 SingleNodeConnectionPool 适用于集群中只有一个节点或者只有单个复杂均衡实例的情况。

当没有在 ConnectionSettings 构造函数中显式指定连接池类型的时候,此类型是默认连接池,也是在所有类型中最简单的一种连接池类型。

这种类型的连接池,使用单个 URI 来连接到 ES 服务器,所有请求也都通过 URI 来发起调用。

SingleNodeConnectionPool 不会标记节点是否还存活,也跟 sniffing 和 pinging 行为没有关系。该连接池会确保该 URI 下的连接随时可用。

示例:

public ElasticSearchClient(ElasticSearchSettings esSettings)
{
    _esSettings = esSettings;
    var pool = new SingleNodeConnectionPool(new Uri(_esSettings.ServerUri));
    var settings = new ConnectionSettings(pool);
    settings.DefaultIndex(esSettings.DefaultIndex);
    settings.DefaultMappingFor<User>(usr => usr.IndexName("users"));
    _client = new ElasticClient(settings);
}

你也可以通过以下方式来直接使用 SingleNodeConnectionPool 来实例化一个 ElasticClient

var client = new ElasticClient(uri);
  • 内部实际使用的是 SingleNodeConnectionPool 连接池。
  • 虽然可以直接用 ElasticClient 类来进行实例化,但依然建议通过 ConnectionSettings 来显式指定连接池类型

CloudConnectionPool

CloudConnectionPool 适用于使用了 Elatic 云服务实例的连接。。

一个特定的 SingleNodeConnectionPool 子类,接收一个 Cloud ID 和 凭证。

  • Cloud ID 可以从 Elastic Cloud Web 控制台获取。
  • Cloud ID 可以从你的 Elastic Cloud 集群管理控制台中获取。
  • Cloud ID 的格式一般为:cluster_name:base_64_data,其中 base_64_data 是一个该云实例的 UUID,例:
    • host_name\$elasticsearch_uuid\$kibana_uuid\$apm_uuid
    • 其中,只有 host_name 和 ealsticsearch_uuid 始终可用。
      指定为 CloudConnectionPool,客户端会为连接设置为 Elastic Cloud 优化的默认值。

示例:

var credentials = new BasicAuthenticationCredentials("username", "password"); 
var pool = new CloudConnectionPool(cloudId, credentials); 
var client = new ElasticClient(new ConnectionSettings(pool));
  • username 和 password 可以从 Elastic Cloud 中的 ES 服务内获取。

跟 SingleNodeConnectionPool 一样,CloudConnectionPool 不会标记节点是否还存活,也跟 sniffing 和 pinging 行为没有关系。

同样,你可以直接用 ElasticClient 类来直接实例化一个使用了 CloudConnectionPool 的客户端,如下:

var client = new ElasticClient(cloudId, credentials);

StaticConnectionPool

如果您有一个小型集群且不想启用嗅探功能来发现集群拓扑的话,则使用 StaticConnectionPool 连接池非常适合。

StaticConnectionPool 不支持 sniffing,但是支持 pinging。

示例:

// 给定一个 Uri 集合
var uris = Enumerable.Range(9200, 5)
    .Select(port => new Uri($"http://localhost:{port}"));

// 让连接池使用这些 URI
var pool = new StaticConnectionPool(uris);
var client = new ElasticClient(new ConnectionSettings(pool));

// 也可以使用 Node 集合来进行实例化
var nodes = uris.Select(u => new Node(u));
pool = new StaticConnectionPool(nodes);
client = new ElasticClient(new ConnectionSettings(pool));

SniffingConnectionPool

SniffingConnectionPool 支持 sniffing 和 pinging。

SniffingConnectionPool 继承自 StaticConnectionPool,一个 sniffing 连接允许其在运行的时候 reseeded。

该连接池使用 ReaderWriterLockSlim 来确保线程安全(这种方式开销很小)。

示例:

// 给定一个 Uri 集合
var uris = Enumerable.Range(9200, 5)
    .Select(port => new Uri($"http://localhost:{port}"));

// 让连接池使用这些 URI
var pool = new SniffingConnectionPool(uris);
var client = new ElasticClient(new ConnectionSettings(pool));

// 也可以使用 Node 集合来进行实例化
var nodes = uris.Select(u=>new Node(u));
pool = new SniffingConnectionPool(nodes);
client = new ElasticClient(new ConnectionSettings(pool));
  • 使用 Node 的主要好处是在 sedding 的时候可以包括已知的节点角色,之后 NEST 可以使用这些角色来支持特定的 API 请求,如:先在有资格的主节点上进行 sniffing,之后将问题主节点从客户端调用中移除。

StickyConnectionPool

返回第一个活动节点以对其进行数据请求,底层通过 System.Threading.Interlocked 以线程安全的方式将“索引器”保留到最后一个活动节点。

// 给定一个 Uri 集合
var uris = Enumerable.Range(9200, 5)
    .Select(port => new Uri($"http://localhost:{port}"));

/* StickyConnectionPool 通过第二个入参:Func<Node, float>,传入一个 Node并返回一个权重值,通过权重值来进行倒序排序。
以下示例通过对节点进行评分,以使 rack_id  为 rack_one 的客户端节点得分最高
*/
var pool = new StickySniffingConnectionPool(uris, node =>
{
    var weight = 0f;

    if (node.ClientNode)
        weight += 10;

    if (node.Settings.TryGetValue("node.attr.rack_id", out var rackId) && rackId.ToString() == "rack_one")
        weight += 10;

    return weight;
});

var client = new ElasticClient(new ConnectionSettings(pool));