StackExchange.Redis 系列 8:Stream 数据类型说明
本系列博文是“伪”官方文档翻译,并非完全将官方文档进行翻译,而是我在查阅、测试原始文档并转换为自己东西后进行的“准”翻译。
本系列本博文基于 redis 5.0.6,系列中部分博文跟官方文档有出入,有不同见解 / 说明不当的地方,还请大家不吝拍砖。
说明
Stream 数据类型是在 Redis 5.0 版本新引入的。它以更抽象的方式来模拟日志数据结构。
目前 StackExchange.Redis 客户端已经实现了所有原生 Stream 相关的命令。
关于 Stream 的介绍可以见此。
往 Streams 中写入数据
-
Stream 中的每条消息/条目都由 StreamEntry 类型表示。每个 Stream 条目包含一个唯一的 ID 和一个 Name/Value 对。
eg:通过单个 name/value对 往 Stream 中添加一条简单消息
1 | var db = redis.GetDatabase(); |
-
StreamAdd 方法返回的 messageId 由将消息添加到 Stream 中毫秒时间和序列号组成。序列号的作用是防止并发导致 messageId 一样而引发冲突。
eg:将多个 name/value对 添加到 Stream 中
1 | var values = new NameValueEntry[] |
你可以自定义 messageId,方式如下:
1 | db.StreamAdd("event_stream", "foo_name", "bar_value", messageId: "0-1", maxLength: 100); |
从 Streams 中读取数据
可以通过 StreamRead 方法或者 StreamRange 方法读取数据。
1 | //读取 messageId="0-0"的 stream 中的'所有'消息。 |
-
你可以通过 count 参数来限制返回消息的数量
StreamRead 方法允许你同时从多个 stream 中读取。
1 | var streams = db.StreamRead(new StreamPosition[] |
-
你可以用 countPerStream 可选参数来限定每个 Stream 返回的消息数量。
StreamRange 方法允许你返回一个 Stream 中一定范围内的条目:
1 | var messages = db.StreamRange("event_stream", minId: "-", maxId: "+"); |
-
- 和 + 表示最小和最大的 id 号。
-
你还可以使用 messageOrder 参数来反向从 stream 中读取数据。
-
StreamRange 方法同样提供了 count 参数来限定返回的条目数量。
1 | var messages = db.StreamRange("event_stream", |
StreamInfo 方法
StreamInfo 方法提供了读取 Stream 基本信息的功能:如第一条条目,最后一条条目,流的长度,消费者组的数量等等。这些信息可以让处理 stream 更高效。
1 | var info = db.StreamInfo("event_stream"); |
Consumer Groups
通过 consumer groups 可以让你扩展跨多个 workers 或 consumers的流处理。更多关于 Consumer Groups 可以见此。
以下方法创建了个一个 consumer group,并告诉 redis 服务器从 stream 的哪个位置开始读取数据。如果你在第一次创建 stream 之前调用 StreamCreateConsumerGroup,
StreamCreateConsumerGroup 方法会默认给你创建一个 stream,你可以通过将
createStream 参数设置为 false 来覆盖该行为:
1 | // Returns true if created, otherwise false. |
-
$ 符号表示 consumer group 仅读取 consumer group 创建之后的消息。如果你想要从流中读取已经存在的消息,那么你可以在流中提供任一位置:
1 | // Begin reading from the first position in the stream. |
使用 StreamReadGroup 方法将消息读入 consumer。该方法的参数接收一个 message ID。
将 ID 传递给 StreamReadGroup 时,redis 将仅返回给定 consumer 的待处理消息,或者 consumer 已读取的消息。
1 | // Read the first pending message for the "consumer_1" consumer. |
还可以通过调用 StreamPending 和 StreamPendingMessages 方法来检索待处理消息信息。
StreamPending 返回待处理的消息数量,每个 consumer 待处理的消息以及最高和最低级的待处理消息的 id:
1 | var pendingInfo = db.StreamPending("events_stream", "events_cg"); |
使用 StreamPendingMessages 方法可以获得给定 consumer 待处理消息的详细信息:
1 | // Read the first pending message for the consumer. |
调用 StreamAcknowledge 方法后,consumer 的待处理消息才会被确认,消息一旦被确认,StreamReadGroup 将无法再访问该消息
1 | // Returns the number of messages acknowledged. |
StreamClaim 方法可以将一条消息的所有权更改为其他使用者:
1 | // Change ownership to consumer_2 for the first 5 messages pending for consumer_1. |
其他通过 consumer groups 处理 stream 的方法可以参考 stream 的单元测试。