在 Redis Insight 中管理流和消费者组
了解如何在 Redis Insight 中管理流和消费者组
流是仅可追加的日志文件。向其中添加数据后,您无法更改它。这似乎是一个缺点;但是,流可用作日志或单一事实来源。它还可以用作以不同速度工作且不需要相互了解的进程之间的缓冲区。有关流的更多概念信息,请参阅Redis 流。
在本主题中,您将了解如何在 Redis Insight 中添加和使用流以及消费者组。
这是一个模拟温度和湿度传感器的流。与流交互的进程扮演两个角色之一:消费者和生产者。流的要点是它不会结束,因此您无法捕获整个数据集并对其进行处理。
在这个流中,传感器被视为生产者,用于广播数据。消费者从流中读取数据并对其进行一些处理。例如,如果温度高于某个阈值,它会发出一条消息,打开该单元的空调或通知维护人员。
可以让多个消费者执行不同的工作,一个测量湿度,另一个在一段时间内测量温度。Redis 将整个数据集的副本存储在内存中,而内存是一种有限的资源。为避免数据失控,可以在向流中添加内容时对其进行修剪。使用 向流中添加内容时XADD,您可以选择指定将流修剪为特定或近似数量的最新条目,或者仅包含 ID 高于指定 ID 的条目。您还可以使用密钥过期来管理流数据所需的存储。例如,通过将每天的数据写入 Redis 中它自己的流,并在一段时间(比如一周)后使每个流的密钥过期。ID 可以是任意数字,但流中每个新条目的 ID 值都必须高于最后添加到流中的 ID。
添加新条目
使用XADD来*作为 ID,Redis 会自动为您生成一个新 ID,该 ID 由毫秒精度的时间戳、破折号和序列号组成。例如1656416957625-0。然后提供要存储在新流条目中的字段名称和值。
检索内容的方法有多种。您可以按时间范围检索条目,也可以请求自您指定的时间戳或 ID 以来发生的所有事情。使用单个命令,您可以请求从给定日期的上午 10:30 到 11:15 的任何内容。
消费者群体
更现实的用例是具有许多温度传感器的系统,Redis 将其数据放入流中,记录它们到达的时间并对其进行排序。
右侧有两个消费者读取数据流。其中一个消费者会在温度超过一定值时发出警报,并向维护人员发送短信,告知他们需要采取一些措施;另一个消费者是数据仓库,负责获取数据并将其放入数据库。
它们彼此独立运行。在右上方,我们有另一种任务。我们假设警报和数据仓库非常快。您会收到一条消息,告知温度是否大于特定值,这可能需要一毫秒。警报可以跟上数据流。扩展消费者的一种方法是消费者组,它允许同一消费者或同一代码的多个实例作为一个团队来处理流。
在 Redis Insight 中管理流
您可以通过两种方式在 Redis Insight 中添加流:创建新流或添加到现有流。
要创建流,首先选择密钥类型(流)。您无法设置生存时间 (TTL),因为它无法放在流中的消息上;只能在 Redis 密钥上进行设置。将流命名为mystream 。然后,将条目 ID设置为*默认的时间戳。如果您有自己的 ID 生成策略,请输入序列中的下一个 ID。请记住,该 ID 必须高于流中任何其他条目的 ID。
然后,使用 + 输入字段和值以添加多个(例如,名称和位置)。现在,您有一个出现在Streams视图中的流,您可以继续向其中添加字段和值。
Redis Insight 会为您运行读取命令,以便您可以在Streams视图中查看流条目。Consumer Groups视图显示给定消费者组中的每个消费者以及 Redis 上次分配消息的时间、消息的 ID 是什么以及该过程发生了多少次,以及消费者是否已使用该命令告诉 Redis 您已完成该任务XACK。
通过 Redis Insight 中的传感器监控温度和湿度
此示例展示如何将现有流引入 Redis Insight 并使用它。
设置
- 安装Redis Insight。
- 下载并安装Node.js(LTS 版本)。
- 安装Redis。在 Docker 中,检查 Redis 是否在默认端口 6379 上本地运行(未设置密码)。
- 克隆此示例的代码存储库。有关此示例和安装提示的更多信息,请参阅README 。
- 在命令行上,导航到包含代码存储库的文件夹并安装 Node.js 包管理器 (npm)。
npm install运行生产者
要启动生产者(它将每隔几秒向流中添加一个新条目),请输入:
npm run producer
> streams@1.0.0 producer
> node producer.js
Starting producer...
Adding reading for location: 62, temperature: 40.3, humidity: 36.5
Added as 1632771056648-0
Adding reading for location: 96, temperature: 15.4, humidity: 70
Added as 1632771059039-0
...生产器无限期运行。选择Ctrl+C停止它。如果您想更快地将条目添加到流中,可以启动生产器的多个实例。
运行消费者
要启动每隔几秒从流中读取一次的使用者,请输入:
npm run consumer
> streams@1.0.0 consumer
> node consumer.js
Starting consumer...
Resuming from ID 1632744741693-0
Reading stream...
Received entry 1632771056648-0:
[ 'location', '62', 'temp', '40.3', 'humidity', '36.5' ]
Finished working with entry 1632771056648-0
Reading stream...
Received entry 1632771059039-0:
[ 'location', '96', 'temp', '15.4', 'humidity', '70' ]消费者将其读取的最后一个条目 ID 存储在 Redis 字符串的 key 处consumer:lastid。它使用此字符串在重新启动后从上次中断的地方继续。尝试通过停止它Ctrl+C并重新启动它。
一旦消费者处理完流中的每个条目,它将无限期地等待生产者的实例添加更多条目:
Reading stream...
No new entries since entry 1632771060229-0.
Reading stream...
No new entries since entry 1632771060229-0.
Reading stream...停止使用Ctrl+C。
运行消费者组
消费者组由多个协同工作的消费者实例组成。Redis 管理从流中读取的条目到消费者组成员的分配。组中的消费者将接收条目的子集,而整个组将接收所有条目。在消费者组中工作时,消费者进程必须确认收到/处理了每个条目。
使用多个终端窗口,启动消费者组消费者的三个实例,并为每个实例赋予唯一的名称:
npm run consumergroup consumer1
> streams@1.0.0 consumergroup
> node consumer_group.js -- "consumer1"
Starting consumer consumer1...
Consumer group temphumidity_consumers exists, not created.
Reading stream...
Received entry 1632771059039-0:
[ 'location', '96', 'temp', '15.4', 'humidity', '70' ]
Acknowledged processing of entry 1632771059039-0.
Reading stream...在第二个终端中:
npm run consumergroup consumer2第三部分:
npm run consumergroup consumer3消费者将无限期地运行,等待生产者实例在它们共同消费了整个流后将新消息添加到流中。请注意,在此模型中,每个消费者实例不会接收来自流的所有条目,但组的三个成员各自接收一个子集。
在 Redis Insight 中查看流
- 启动 Redis Insight。
- 选择
localhost:6379 - 选择STREAM。或者,从右上角选择全屏以展开视图。
现在,您可以在“流”和“消费者组”视图之间切换以查看数据。如本主题前面所述,流是仅附加日志,因此您无法修改条目的内容,但可以删除整个条目。当出现所谓的毒丸消息时,这很有用,这种消息可能会导致消费者崩溃。您可以在“流”XDEL视图中物理删除此类消息,也可以在命令行界面 (CLI) 中使用命令。
您可以继续在 CLI 上与流进行交互。例如,要获取流的当前长度,请使用以下命令XLEN:
XLEN ingest:temphumidity使用流来审计和处理银行、游戏、供应链、物联网、社交媒体等领域的事件。