StackExchange.Redis 系列 8:Stream 数据类型说明

  • 本系列博文是“伪”官方文档翻译,并非完全将官方文档进行翻译,而是我在查阅、测试原始文档并转换为自己东西后进行的“准”翻译。
  • 原始文档见此:https://stackexchange.github.io/StackExchange.Redis/
  • 本系列本博文基于 redis 5.0.6,系列中部分博文跟官方文档有出入,有不同见解 / 说明不当的地方,还请大家不吝拍砖。

说明

  • Stream 数据类型是在 Redis 5.0 版本新引入的。它以更抽象的方式来模拟日志数据结构。
  • 目前 StackExchange.Redis 客户端已经实现了所有原生 Stream 相关的命令。
  • 关于 Stream 的介绍可以见此

往 Streams 中写入数据

  • Stream 中的每条消息/条目都由 StreamEntry 类型表示。每个 Stream 条目包含一个唯一的 ID 和一个 Name/Value 对。

eg:通过单个 name/value对 往 Stream 中添加一条简单消息

var db = redis.GetDatabase();
var messageId = db.StreamAdd("event_stream", "foo_name", "bar_value");// messageId = 1518951480106-0
  • StreamAdd 方法返回的 messageId 由将消息添加到 Stream 中毫秒时间和序列号组成。序列号的作用是防止并发导致 messageId 一样而引发冲突。

eg:将多个 name/value对 添加到 Stream 中

var values = new NameValueEntry[]
{
    new NameValueEntry("sensor_id", "1234"),
    new NameValueEntry("temp", "19.8")
}; 

var db = redis.GetDatabase();
var messageId = db.StreamAdd("sensor_stream", values);

你可以自定义 messageId,方式如下:

db.StreamAdd("event_stream", "foo_name", "bar_value", messageId: "0-1", maxLength: 100);

从 Streams 中读取数据

可以通过 StreamRead 方法或者 StreamRange 方法读取数据。

//读取 messageId="0-0"的 stream 中的'所有'消息。
var messages = db.StreamRead("event_stream", "0-0");
  • 你可以通过 count 参数来限制返回消息的数量

StreamRead 方法允许你同时从多个 stream 中读取。

var streams = db.StreamRead(new StreamPosition[]
{
    new StreamPosition("event_stream", "0-0"),
    new StreamPosition("score_stream", "0-0")
});

Console.WriteLine($"Stream = {streams.First().Key}");
Console.WriteLine($"Length = {streams.First().Entries.Length}");
  • 你可以用 countPerStream 可选参数来限定每个 Stream 返回的消息数量。

StreamRange 方法允许你返回一个 Stream 中一定范围内的条目:

var messages = db.StreamRange("event_stream", minId: "-", maxId: "+");
  • - 和 + 表示最小和最大的 id 号。
  • 你还可以使用 messageOrder 参数来反向从 stream 中读取数据。
  • StreamRange 方法同样提供了 count 参数来限定返回的条目数量。

var messages = db.StreamRange("event_stream", 
    minId: "0-0", 
    maxId: "+", 
    count: 100,
    messageOrder: Order.Descending);

StreamInfo 方法

StreamInfo 方法提供了读取 Stream 基本信息的功能:如第一条条目,最后一条条目,流的长度,消费者组的数量等等。这些信息可以让处理 stream 更高效。

var info = db.StreamInfo("event_stream");

Console.WriteLine(info.Length);Console.WriteLine(info.FirstEntry.Id);Console.WriteLine(info.LastEntry.Id);

Consumer Groups

通过 consumer groups 可以让你扩展跨多个 workers 或 consumers的流处理。更多关于 Consumer Groups 可以见此

以下方法创建了个一个 consumer group,并告诉 redis 服务器从 stream 的哪个位置开始读取数据。如果你在第一次创建 stream 之前调用 StreamCreateConsumerGroup,
StreamCreateConsumerGroup 方法会默认给你创建一个 stream,你可以通过将
createStream 参数设置为 false 来覆盖该行为:

// Returns true if created, otherwise false.
db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", "$");
// or
db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", StreamPosition.NewMessages);
  • $ 符号表示 consumer group 仅读取 consumer group 创建之后的消息。如果你想要从流中读取已经存在的消息,那么你可以在流中提供任一位置:

    // Begin reading from the first position in the stream.
    db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", "0-0");
    

使用 StreamReadGroup 方法将消息读入 consumer。该方法的参数接收一个 message ID。

将 ID 传递给 StreamReadGroup 时,redis 将仅返回给定 consumer 的待处理消息,或者 consumer 已读取的消息。

// Read the first pending message for the "consumer_1" consumer.
var message = db.StreamReadGroup("events_stream", "events_cg", "consumer_1", "0-0", count: 1);

还可以通过调用 StreamPending 和 StreamPendingMessages 方法来检索待处理消息信息。

StreamPending 返回待处理的消息数量,每个 consumer 待处理的消息以及最高和最低级的待处理消息的 id:

var pendingInfo = db.StreamPending("events_stream", "events_cg");

Console.WriteLine(pendingInfo.PendingMessageCount);
Console.WriteLine(pendingInfo.LowestPendingMessageId);
Console.WriteLine(pendingInfo.HighestPendingMessageId);
Console.WriteLine($"Consumer count: {pendingInfo.Consumers.Length}.");
Console.WriteLine(pendingInfo.Consumers.First().Name);
Console.WriteLine(pendingInfo.Consumers.First().PendingMessageCount);

使用 StreamPendingMessages 方法可以获得给定 consumer 待处理消息的详细信息:

// Read the first pending message for the consumer.
var pendingMessages = db.StreamPendingMessages("events_stream",
    "events_cg",
    count: 1,
    consumerName: "consumer_1",
    minId: pendingInfo.LowestPendingMessageId);

Console.WriteLine(pendingMessages.Single().MessageId);
Console.WriteLine(pendingMessages.Single().IdleTimeInMilliseconds);

调用 StreamAcknowledge 方法后,consumer 的待处理消息才会被确认,消息一旦被确认,StreamReadGroup 将无法再访问该消息

// Returns the number of messages acknowledged.
db.StreamAcknowledge("events_stream", "events_cg", pendingMessage.MessageId);

StreamClaim 方法可以将一条消息的所有权更改为其他使用者:

// Change ownership to consumer_2 for the first 5 messages pending for consumer_1.
var pendingMessages = db.StreamPendingMessages("events_stream", 
    "events_cg", 
    count: 5, 
    consumerName: "consumer_1", 
    minId: "0-0");

db.StreamClaim("events_stream",
    "events_cg",
    claimingConsumer: "consumer_2",
    minIdleTimeInMs: 0,
    messageIds: pendingMessages.Select(m => m.MessageId).ToArray());

其他通过 consumer groups 处理 stream 的方法可以参考 stream 的单元测试。