Redis 流

Redis 流简介

Redis 流是一种数据结构,其作用类似于仅追加日志,但还实现了多种操作来克服典型仅追加日志的一些限制。这些限制包括 O(1) 时间内的随机访问和复杂的消费策略,例如消费者组。您可以使用流来实时记录和同时联合事件。Redis 流用例的示例包括:

  • 事件源(例如跟踪用户操作、点击等)
  • 传感器监控(例如,现场设备的读数)
  • 通知(例如,将每个用户的通知记录存储在单独的流中)

Redis 为每个流条目生成一个唯一 ID。您可以使用这些 ID 稍后检索其关联条目,或读取和处理流中的所有后续条目。请注意,由于这些 ID 与时间相关,因此此处显示的 ID 可能会有所不同,并且与您在自己的 Redis 实例中看到的 ID 不同。

Redis 流支持多种修剪策略(以防止流无限制增长)和多种消费策略(请参阅XREADXREADGROUPXRANGE)。

基本命令

  • XADD向流中添加新条目。
  • XREAD读取一个或多个条目,从给定位置开始并随时间向前移动。
  • XRANGE返回两个提供的条目 ID 之间的条目范围。
  • XLEN返回流的长度。

查看流命令的完整列表

示例

  • 当我们的赛车手通过检查站时,我们会为每个赛车手添加一个流条目,其中包括赛车手的姓名、速度、位置和位置 ID:

  • 从 ID 开始读取两个流条目1692632086370-0

  • 从流末尾开始读取最多 100 个新的流条目,如果没有写入条目,则阻止最多 300 毫秒:

表现

向流中添加条目的复杂度为 O(1)。访问任何单个条目的复杂度为 O(n),其中n是 ID 的长度。由于流 ID 通常较短且长度固定,因此这实际上可简化为常数时间查找。有关原因的详细信息,请注意流是以基数树的形式实现的。

简单来说,Redis 流提供了高效的插入和读取功能。请参阅每个命令的时间复杂度以了解详细信息。

流基础知识

流是一种仅可追加的数据结构。基本写入命令称为XADD,用于将新条目追加到指定流。

每个流条目由一个或多个字段值对组成,有点像字典或 Redis 哈希:

上述对命令的调用使用自动生成的条目 ID(即命令返回的条目 ID,具体为)向流中的键 处XADD添加条目。它的第一个参数是键名,第二个参数是标识流内每个条目的条目 ID。但是,在本例中,我们传递了 ,因为我们希望服务器为我们生成一个新的 ID。每个新 ID 都将单调递增,因此更简单地说,每个新添加的条目都将具有比所有过去条目更高的 ID。服务器自动生成 ID 几乎总是您想要的,而明确指定 ID 的原因非常少见。我们稍后会详细讨论这一点。每个流条目都有一个 ID 这一事实是与日志文件的另一个相似之处,其中行号或文件内的字节偏移量可用于标识给定的条目。回到我们的示例,在键名和 ID 之后,下一个参数是组成我们流条目的字段-值对。rider: Castilla, speed: 29.9, position: 1, location_id: 2race:france1692632147973-0race:france*XADD

只需使用以下命令即可获取 Stream 内的项目数XLEN

条目 ID

XADD命令返回的条目 ID 明确标识给定流中的每个条目,它由两部分组成:

<millisecondsTime>-<sequenceNumber>

毫秒时间部分实际上是生成流 ID 的本地 Redis 节点中的本地时间,但是,如果当前毫秒时间恰好小于前一个条目时间,则将使用前一个条目时间,因此,如果时钟向后跳转,单调递增 ID 属性仍然成立。序列号用于在同一毫秒内创建的条目。由于序列号宽度为 64 位,因此实际上在同一毫秒内可以生成的条目数量没有限制。

这种 ID 的格式乍一看可能很奇怪,温和的读者可能会想知道为什么时间是 ID 的一部分。原因是 Redis 流支持按 ID 进行范围查询。由于 ID 与生成条目的时间相关,因此基本上可以免费查询时间范围。我们将在介绍命令时很快看到这一点XRANGE

如果由于某种原因,用户需要与时间无关但实际上与另一个外部系统 ID 相关联的增量 ID,如前所述,则该XADD命令可以采用显式 ID 而不是*触发自动生成的通配符 ID,如下例所示:

请注意,在这种情况下,最小 ID 为 0-1,并且命令不会接受等于或小于前一个 ID 的 ID:

如果您运行的是 Redis 7 或更高版本,您还可以提供仅由毫秒部分组成的显式 ID。在这种情况下,ID 的序列部分将自动生成。为此,请使用以下语法:

从 Streams 获取数据

现在我们终于能够通过 向我们的流中追加条目了XADD。但是,虽然将数据追加到流中很明显,但查询流以提取数据的方式却不那么明显。如果我们继续使用日志文件的类比,一种明显的方法是模仿我们通常使用 Unix 命令 的操作tail -f,也就是说,我们可以开始监听以获取追加到流中的新消息。请注意,与 Redis 的阻塞列表操作不同(在阻塞列表操作中,给定元素将到达像 这样的弹出操作BLPOP的单个客户端),使用流时,我们希望多个消费者能够看到追加到流中的新消息(就像许多tail -f进程可以看到添加到日志中的内容一样)。使用传统术语,我们希望流能够将消息发送给多个客户端。

然而,这只是一种潜在的访问模式。我们也可以以完全不同的方式来看待流:不是将其视为消息系统,而是将其视为时间序列存储。在这种情况下,获取附加的新消息可能也很有用,但另一种自然的查询模式是按时间范围获取消息,或者使用游标迭代消息以逐步检查所有历史记录。这绝对是另一种有用的访问模式。

最后,如果我们从消费者的角度看待流,我们可能希望以另一种方式访问​​该流,即作为可以划分给正在处理此类消息的多个消费者的消息流,以便消费者组只能看到到达单个流的消息子集。通过这种方式,可以跨不同的消费者扩展消息处理,而无需单个消费者处理所有消息:每个消费者只会获得不同的消息进行处理。这基本上就是 Kafka (TM) 对消费者组所做的。通过消费者组读取消息是从 Redis 流读取的另一种有趣模式。

Redis Streams 通过不同的命令支持上述三种查询模式。下一节将介绍所有这些模式,从最简单、最直接的使用开始:范围查询。

按范围查询:XRANGE 和 XREVRANGE

要按范围查询流,我们只需指定两个 ID,即startend。返回的范围将包括以 start 或 end 作为 ID 的元素,因此范围是包含的。两个特殊的 ID-+分别表示可能的最小和最大 ID。

返回的每个条目都是两个项目的数组:ID 和字段值对列表。我们已经说过,条目 ID 与时间有关系,因为字符左侧的部分-是创建流条目的本地节点的 Unix 时间(以毫秒为单位),即创建条目时的时间(但请注意,流是使用完全指定的XADD命令复制的,因此副本将具有与主节点相同的 ID)。这意味着我可以使用 查询一段时间XRANGE。但是,为了做到这一点,我可能想要省略 ID 的序列部分:如果省略,则在范围的开头将假定为 0,而在结尾部分将假定为可用的最大序列号。这样,仅使用两毫秒的 Unix 时间进行查询,我们就可以以包含的方式获得在该时间范围内生成的所有条目。例如,如果我想查询两毫秒的时间段,我可以使用:

我在这个范围内只有一个条目。然而在实际数据集中,我可以查询小时范围,或者在短短两毫秒内可能有许多项目,返回的结果可能非常大。因此,在最后XRANGE支持可选的COUNT选项。通过指定计数,我可以只获取前N 个项目。如果我想要更多,我可以获取返回的最后一个 ID,将序列部分加一,然后再次查询。让我们在以下示例中看到这一点。假设流中race:france填充了 4 个项目。要开始迭代,每个命令获取 2 个项目,我从完整范围开始,但计数为 2。

要继续对接下来的两个项目进行迭代,我必须选择返回的最后一个 ID,即1692632094485-0,并将前缀添加(到它。生成的独占范围间隔((1692632094485-0在本例中为 )现在可以用作下一次调用的新起始XRANGE参数:

现在我们已经从只有 4 个条目的流中检索了 4 个项目,如果我们尝试检索更多项目,我们将得到一个空数组:

由于查找XRANGE复杂度为O(log(N)) ,返回 M 个元素的复杂度为 O(M),因此在计数较少的情况下,该命令具有对数时间复杂度,这意味着迭代的每个步骤都很快。XRANGE事实上的流迭代器也是如此,并且不需要XSCAN命令。

该命令XREVRANGE相当于XRANGE但以相反的顺序返回元素,因此实际用途是XREVRANGE检查 Stream 中的最后一项是什么:

请注意,该XREVRANGE命令以相反的顺序采用启动停止参数。

使用 XREAD 聆听新内容

当我们不想按范围访问流中的项目时,通常我们想要的是订阅到达流的新项目。此概念可能与 Redis Pub/Sub(您订阅频道)或 Redis 阻止列表(您等待密钥来获取要获取的新元素)相关,但在使用流的方式上有根本区别:

  1. 一个流可以有多个客户端(消费者)等待数据。默认情况下,每个新项目都将传递给给定流中等待数据的每个消费者。此行为不同于阻止列表,在阻止列表中每个消费者将获得不同的元素。但是,向多个消费者分发的能力类似于 Pub/Sub。
  2. 虽然在 Pub/Sub 中,消息是即发即弃的,并且永远不会被存储,而使用阻止列表时,当客户端收到消息时,它会从列表中弹出(实际上被删除),但流的工作方式完全不同。所有消息都会无限期地附加在流中(除非用户明确要求删除条目):不同的消费者会通过记住最后收到的消息的 ID 来从自己的角度知道什么是新消息。
  3. 流消费者组提供了发布/订阅或阻止列表无法实现的控制级别,对同一流有不同的组,明确确认已处理的项目,检查待处理的项目的能力,声明未处理的消息,并且每个客户端都可以查看其私人的过去消息历史记录。

提供监听到达流的新消息功能的命令称为XREAD。它比 稍微复杂一些XRANGE,因此我们将开始展示简单的形式,稍后将提供整个命令布局。

以上是 的非阻塞形式XREAD。请注意,COUNT选项不是必需的,实际上该命令的唯一必需选项是STREAMS选项,它指定了键列表以及调用消费者已经看到的每个流的相应最大 ID,以便该命令将仅向客户端提供 ID 大于我们指定的 ID 的消息。

在上面的命令中,我们STREAMS race:france 0这样写道,我们希望 Stream 中所有消息race:france的 ID 都大于0-0。如您在上面的示例中所见,该命令返回键名,因为实际上可以使用多个键调用此命令来同时从不同的流中读取。例如,我可以这样写:STREAMS race:france race:italy 0 0。请注意,在STREAMS选项之后,我们需要提供键名,然后提供 ID。因此,STREAMS选项必须始终是最后一个选项。任何其他选项都必须位于STREAMS选项之前。

除了可以XREAD同时访问多个流,并且我们能够指定我们拥有的最后一个 ID 来获取较新的消息之外,在这个简单的形式中,该命令与 相比并没有太大不同XRANGE。然而,有趣的是,我们可以通过指定BLOCKXREAD参数轻松地将其转换为阻塞命令

> XREAD BLOCK 0 STREAMS race:france $

请注意,在上面的例子中,除了删除COUNT之外,我还指定了新的BLOCKmystream选项,超时时间为 0 毫秒(这意味着永不超时)。此外,我没有传递流的普通 ID,而是传递了特殊 ID $。这个特殊 ID 意味着XREAD应该使用流中已存储的最大 ID 作为最后一个 ID mystream,这样从我们开始监听开始,我们只会收到新消息tail -f。这在某种程度上类似于Unix 命令。

请注意,使用BLOCK选项时,我们不必使用特殊 ID $。我们可以使用任何有效的 ID。如果命令能够立即处理我们的请求而不会阻塞,它就会这样做,否则它将阻塞。通常,如果我们想从新条目开始使用流,我们会从 ID 开始$,然后继续使用收到的最后一条消息的 ID 进行下一次调用,依此类推。

的阻塞形式XREAD也能够监听多个 Stream,只需指定多个键名即可。如果请求可以同步处理,因为至少有一个流的元素大于我们指定的相应 ID,它将返回结果。否则,该命令将阻塞并返回第一个获得新数据的流的项目(根据指定的 ID)。

与阻塞列表操作类似,阻塞流读取从等待数据的客户端的角度来看是公平的,因为语义是 FIFO 样式。当有新项目可用时,第一个阻塞给定流的客户端将第一个解除阻塞。

XREAD除了COUNTBLOCK之外没有其他选项,因此它是一个非常基本的命令,具有将消费者附加到一个或多个流的特定目的。使用消费者组 API 可以使用更强大的流消费功能,但是通过消费者组读取数据是通过另一个名为 的命令实现的XREADGROUP,本指南的下一部分将介绍该命令。

消费者群体

当手头的任务是从不同的客户端使用相同的流时,XREAD已经提供了一种向 N 个客户端分发的方法,可能还会使用副本来提供更多的读取可扩展性。然而,在某些问题中,我们想要做的不是向许多客户端提供相同的消息流,而是向许多客户端提供来自同一流的不同消息子集。一个明显的例子是处理速度慢的消息:拥有 N 个不同的工作器来接收流的不同部分的能力使我们能够扩展消息处理,方法是将不同的消息路由到准备做更多工作的不同工作器。

实际上,如果我们想象有三个消费者 C1、C2、C3 和一个包含消息 1、2、3、4、5、6、7 的流,那么我们希望根据下图提供消息:

1 -> C1
2 -> C2
3 -> C3
4 -> C1
5 -> C2
6 -> C3
7 -> C1

为了实现这一点,Redis 使用了一个称为消费者组的概念。从实现的角度来看,Redis 消费者组与 Kafka (TM) 消费者组没有任何关系,这一点非常重要。但它们的功能相似,因此我决定保留 Kafka (TM) 的术语,因为它最初推广了这个想法。

消费者组就像一个从流中获取数据的伪消费者,实际上为多个消费者提供服务,并提供某些保证:

  1. 每条消息都会发送给不同的消费者,因此同一条消息不可能被传递给多个消费者。
  2. 在消费者组中,消费者通过名称进行标识,名称是区分大小写的字符串,实现消费者的客户端必须选择该字符串。这意味着即使在断开连接后,流消费者组仍会保留所有状态,因为客户端将再次声明自己是同一个消费者。但是,这也意味着客户端必须提供唯一标识符。
  3. 每个消费者组都有第一个从未消费的 ID的概念,这样,当消费者请求新消息时,它可以只提供以前未传递的消息。
  4. 但是,消费消息需要使用特定命令进行明确确认。Redis 将确认解释为:此消息已正确处理,因此可以从消费者组中将其逐出。
  5. 消费者组会跟踪当前处于待处理状态的所有消息,即已发送给消费者组中某个消费者但尚未确认已处理的消息。借助此功能,在访问流的消息历史记录时,每个消费者只会看到已发送给它的消息

从某种程度上来说,我们可以把消费者组想象成关于流的一定量的状态:

+----------------------------------------+
| consumer_group_name: mygroup           |
| consumer_group_stream: somekey         |
| last_delivered_id: 1292309234234-92    |
|                                        |
| consumers:                             |
|    "consumer-1" with pending messages  |
|       1292309234234-4                  |
|       1292309234232-8                  |
|    "consumer-42" with pending messages |
|       ... (and so forth)               |
+----------------------------------------+

如果你从这个角度来看,那么很容易理解消费者组可以做什么,它如何能够只向消费者提供其待处理消息的历史记录,以及消费者请求新消息时如何只使用大于的消息 ID 来提供服务last_delivered_id。同时,如果你将消费者组视为 Redis 流的辅助数据结构,那么很明显,单个流可以有多个消费者组,每个消费者组都有不同的消费者集合。实际上,同一个流甚至可以有客户端通过没有消费者组的 进行读取XREAD,也有客户端通过XREADGROUP不同的消费者组进行读取。

现在是时候放大来查看基本的消费者组命令了。它们如下:

  • XGROUP用于创建、销毁和管理消费者组。
  • XREADGROUP用于通过消费者组从流中读取。
  • XACK是允许消费者将待处理消息标记为已正确处理的命令。

创建消费者组

假设我已经有一个流类型的键race:france,为了创建一个消费者组,我只需要执行以下操作:

正如您在上面的命令中看到的,在创建消费者组时,我们必须指定一个ID,在示例中就是$。这是必要的,因为除其他状态外,消费者组必须知道在第一个消费者连接时接下来要提供什么消息,也就是说,当组刚刚创建时最后一个消息ID$是什么。如果我们像我们所做的那样提供,那么从现在开始只有到达流中的新消息才会提供给组中的消费者。如果我们指定,0消费者组将首先使用流历史记录中的所有消息。当然,您可以指定任何其他有效ID。您知道的是,消费者组将开始传递大于您指定的ID的消息。因为$表示流中当前最大的ID,所以指定$将具有仅消费新消息的效果。

XGROUP CREATE如果不存在,还支持自动创建流,使用可选子MKSTREAM命令作为最后一个参数:

现在消费者组已创建,我们可以立即尝试使用命令通过消费者组读取消息XREADGROUP。我们将从消费者(称为 Alice 和 Bob)读取消息,以查看系统如何向 Alice 或 Bob 返回不同的消息。

XREADGROUP与 非常相似,XREAD并提供相同的BLOCK选项,否则它是一个同步命令。但是,有一个必须始终指定的强制选项,即GROUP ,它有两个参数:消费者组的名称和尝试读取的消费者的名称。还支持选项COUNTXREAD ,并且与 中的选项相同。

我们将向 race:italy 流中添加 riders,并尝试使用消费者组读取一些内容:注意:此处 rider 是字段名称,name 是关联值。请记住,流项目是小型字典。

XREADGROUP回复就像XREAD回复一样。但请注意GROUP <group-name> <consumer-name>上面提供的内容。它表明我想使用消费者组从流中读取数据mygroup,我是消费者Alice。每次消费者对消费者组执行操作时,它都必须指定其名称,以唯一地标识组内的此消费者。

上面的命令行中还有一个非常重要的细节,在强制STREAMS选项之后,为 key 请求的 IDmystream是特殊 ID >。此特殊 ID 仅在消费者组上下文中有效,其含义是:迄今为止从未向其他消费者传递过消息

这几乎总是你想要的,但是也可以指定一个真实的 ID,例如0或任何其他有效 ID,但是在这种情况下,我们请求的XREADGROUP只是向我们提供待处理消息的历史记录,在这种情况下,永远不会在组中看到新消息。因此,基本上XREADGROUP根据我们指定的 ID 具有以下行为:

  • 如果 ID 是特殊 ID >,则命令将仅返回迄今为止从未传递给其他消费者的新消息,并且作为副作用,将更新消费者组的最后一个 ID
  • 如果 ID 是任何其他有效的数字 ID,则该命令将允许我们访问待处理消息的历史记录。也就是说,已传递给此指定消费者(由提供的名称标识)且迄今为止从未确认的消息集XACK

我们可以立即测试此行为,指定 ID 为 0,不使用任何COUNT选项:我们只会看到唯一的待处理消息,即有关 Castilla 的消息:

但是,如果我们确认该消息已处理,它将不再是待处理消息历史记录的一部分,因此系统将不再报告任何内容:

如果您还不知道它是如何XACK工作的,请不要担心,其理念只是已处理的消息不再是我们可以访问的历史记录的一部分。

现在轮到 Bob 读一些内容了:

Bob 请求最多两条消息,并且正在通过同一个组读取mygroup。因此,Redis 只会报告新消息。如您所见,“Castilla”消息未送达,因为它已经送达给 Alice,因此 Bob 收到 Royce 和 Sam-Bodden 等消息。

这样,Alice、Bob 和组中的任何其他消费者都可以从同一个流中读取不同的消息,读取尚未处理的消息的历史记录,或将消息标记为已处理。这允许创建不同的拓扑和语义来使用来自流的消息。

有几件事需要记住:

  • 消费者第一次被提及时就会自动创建,无需明确创建。
  • 尽管XREADGROUP您可以同时读取多个键,但要使其正常工作,您需要在每个流中创建一个具有相同名称的消费者组。这不是一个常见的需求,但值得一提的是,该功能在技术上是可用的。
  • XREADGROUP是一个写入命令,因为即使它从流中读取,消费者组也会作为读取的副作用被修改,因此它只能在主实例上调用。

下面是使用消费者组用 Ruby 语言编写的消费者实现示例。Ruby 代码旨在让几乎所有有经验的程序员都能阅读,即使他们不懂 Ruby:

require 'redis'

if ARGV.length == 0
    puts "Please specify a consumer name"
    exit 1
end

ConsumerName = ARGV[0]
GroupName = "mygroup"
r = Redis.new

def process_message(id,msg)
    puts "[#{ConsumerName}] #{id} = #{msg.inspect}"
end

$lastid = '0-0'

puts "Consumer #{ConsumerName} starting..."
check_backlog = true
while true
    # Pick the ID based on the iteration: the first time we want to
    # read our pending messages, in case we crashed and are recovering.
    # Once we consumed our history, we can start getting new messages.
    if check_backlog
        myid = $lastid
    else
        myid = '>'
    end

    items = r.xreadgroup('GROUP',GroupName,ConsumerName,'BLOCK','2000','COUNT','10','STREAMS',:my_stream_key,myid)

    if items == nil
        puts "Timeout!"
        next
    end

    # If we receive an empty reply, it means we were consuming our history
    # and that the history is now empty. Let's start to consume new messages.
    check_backlog = false if items[0][1].length == 0

    items[0][1].each{|i|
        id,fields = i

        # Process the message
        process_message(id,fields)

        # Acknowledge the message as processed
        r.xack(:my_stream_key,GroupName,id)

        $lastid = id
    }
end

如您所见,这里的想法是从使用历史记录(即待处理消息列表)开始。这很有用,因为消费者可能之前崩溃过,因此在重新启动时,我们希望重新读取已发送给我们的但未确认的消息。请注意,我们可能会多次或一次处理一条消息(至少在消费者失败的情况下,但也存在 Redis 持久性和复制的限制,请参阅有关此主题的特定部分)。

一旦历史记录被使用,我们就会得到一个空的消息列表,我们可以切换到使用>特殊 ID 来使用新消息。

从永久故障中恢复

上述示例允许我们编写参与同一消费者组的消费者,每个消费者处理一部分消息,并在从故障中恢复时重新读取仅发送给它们的待处理消息。然而,在现实世界中,消费者可能会永久失败并且永远无法恢复。如果消费者因某种原因停止后永远无法恢复,那么他们的待处理消息会怎样?

Redis 消费者组提供了一种功能,可用于这些情况,以便认领特定消费者的待处理消息,这样这些消息将更改所有权并重新分配给不同的消费者。该功能非常明确。消费者必须检查待处理消息列表,并且必须使用特殊命令认领特定消息,否则服务器将永远保留待处理消息并分配给旧消费者。通过这种方式,不同的应用程序可以选择是否使用此功能,以及如何使用它。

此过程的第一步只是一个命令,它提供对消费者组中待处理条目的可观察性,称为XPENDING。这是一个只读命令,始终可以安全调用,并且不会更改任何消息的所有权。在最简单的形式中,该命令使用两个参数调用,即流的名称和消费者组的名称。

以这种方式调用时,该命令会输出消费者组中待处理消息的总数(本例中为两条)、待处理消息中较低和较高的消息 ID,最后是消费者列表及其待处理消息的数量。我们只有 Bob 有两条待处理消息,因为 Alice 请求的单条消息是使用 确认的XACK

我们可以通过给予更多参数来请求更多信息XPENDING,因为完整的命令签名如下:

XPENDING <key> <groupname> [[IDLE <min-idle-time>] <start-id> <end-id> <count> [<consumer-name>]]

通过提供开始和结束 ID(可以是-和 ,+XRANGE)以及计数来控制命令返回的信息量,我们能够了解有关待处理消息的更多信息。如果我们想将输出限制为仅针对给定消费者的待处理消息,则可以使用可选的最后一个参数消费者名称,但在以下示例中不会使用此功能。

现在我们有了每条消息的详细信息:ID、消费者名称、空闲时间(以毫秒为单位),即自上次将消息传递给某个消费者以来经过了多少毫秒,最后是给定消息的传递次数。我们有两条来自 Bob 的消息,它们空闲了 60000 多毫秒,大约一分钟。

请注意,没有人阻止我们仅使用 来检查第一条消息的内容XRANGE

我们只需在参数中重复两次相同的 ID。现在我们有了一些想法,Alice 可能会决定,在 1 分钟不处理消息后,Bob 可能不会很快恢复,是时候认领这些消息并代替 Bob 恢复处理了。为此,我们使用命令XCLAIM

此命令非常复杂,完整形式中充满了选项,因为它用于复制消费者组更改,但我们仅使用通常需要的参数。在这种情况下,它很简单:

XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>

基本上,对于这个特定的键和组,我希望指定的消息 ID 将更改所有权,并将其分配给指定的消费者名称<consumer>。但是,我们还提供了最小空闲时间,因此只有当所述消息的空闲时间大于指定的空闲时间时,操作才会起作用。这很有用,因为可能有两个客户端正在同时重试认领一条消息:

Client 1: XCLAIM race:italy italy_riders Alice 60000 1692632647899-0
Client 2: XCLAIM race:italy italy_riders Lora 60000 1692632647899-0

但是,作为副作用,认领消息将重置其空闲时间并增加其传递次数计数器,因此第二个客户端将无法认领该消息。通过这种方式,我们可以避免对消息进行简单的重新处理(即使在一般情况下您无法获得一次处理)。

这是命令执行的结果:

Alice 成功认领了该消息,她现在可以处理该消息并确认它,即使原始消费者没有恢复,她也可以继续处理该消息。

从上面的示例中可以清楚地看出,成功认领给定消息的副作用是,该XCLAIM命令还会返回该消息。但这不是强制性的。可以使用JUSTID选项来仅返回成功认领的消息的 ID。如果您想要减少客户端和服务器之间使用的带宽(以及命令的性能),并且由于消费者的实现方式是它会不时重新扫描待处理消息的历史记录,因此您对该消息不感兴趣,这将非常有用。

认领也可以通过单独的进程来实现:该进程只检查待处理消息列表,并将空闲消息分配给看似活跃的消费者。可以使用 Redis 流的可观察性功能之一来获取活跃消费者。这是下一节的主题。

自动认领

Redis 6.2 中添加的命令XAUTOCLAIM实现了我们上面描述的声明过程。 XPENDINGXCLAIM为不同类型的恢复机制提供基本构建块。此命令通过让 Redis 管理它来优化通用过程,并为大多数恢复需求提供简单的解决方案。

XAUTOCLAIM识别空闲的待处理消息并将其所有权转移给消费者。该命令的签名如下所示:

XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT count] [JUSTID]

因此,在上面的例子中,我可以使用自动声明来声明一条消息,如下所示:

与 类似XCLAIM,该命令会回复已认领消息的数组,但它还会返回一个流 ID,以便迭代待处理的条目。流 ID 是一个游标,我可以在下一次调用中使用它来继续认领空闲的待处理消息:

XAUTOCLAIM返回“0-0”流 ID 作为游标时,这意味着它已到达消费者组待处理条目列表的末尾。这并不意味着没有新的空闲待处理消息,因此该过程XAUTOCLAIM将从流的开头继续调用。

领取和送货柜台

输出中显示的计数器XPENDING是每条消息的发送次数。计数器以两种方式递增:当通过 成功认领消息时,XCLAIM或当XREADGROUP使用调用来访问待处理消息的历史记录时。

出现故障时,消息会被多次传递,但最终通常会得到处理和确认,这是很正常的。但是,处理某些特定消息可能会出现问题,因为该消息被破坏或被篡改,从而触发了处理代码中的错误。在这种情况下,消费者将不断无法处理此特定消息。因为我们有传递尝试的计数器,所以我们可以使用该计数器来检测由于某种原因无法处理的消息。因此,一旦传递计数器达到您选择的给定大数字,将此类消息放入另一个流并向系统管理员发送通知可能是更明智的做法。这基本上是 Redis Streams 实现死信概念的方式。

流可观察性

缺乏可观察性的消息系统很难使用。不知道谁在消费消息、哪些消息待处理、给定流中活跃的消费者组集,一切都变得不透明。因此,Redis 流和消费者组有不同的方法来观察正在发生的事情。我们已经介绍了XPENDING,它允许我们检查给定时刻正在处理的消息列表,以及它们的空闲时间和传递次数。

然而,我们可能想做的不止这些,该XINFO命令是一个可观察性接口,可以与子命令一起使用以获取有关流或消费者组的信息。

此命令使用子命令来显示有关流及其消费者组的状态的不同信息。例如XINFO STREAM报告有关流本身的信息。

输出显示了有关流内部编码方式的信息,还显示了流中的第一条和最后一条消息。另一条可用信息是与此流关联的消费者组的数量。我们可以进一步挖掘以获取有关消费者组的更多信息。

正如您在此命令和上一个输出中看到的,该XINFO命令输出一系列字段值项。由于它是一个可观察性命令,因此它允许人类用户立即了解报告了哪些信息,并允许该命令在将来通过添加更多字段来报告更多信息,而不会破坏与旧客户端的兼容性。其他必须更节省带宽的命令(例如XPENDING)只报告信息而不报告字段名称。

上述示例的输出(其中使用了GROUPS子命令)通过观察字段名称应该很清楚。我们可以通过检查组中注册的消费者来更详细地检查特定消费者组的状态。

如果您不记得命令的语法,只需向命令本身寻求帮助:

> XINFO HELP
1) XINFO <subcommand> [<arg> [value] [opt] ...]. Subcommands are:
2) CONSUMERS <key> <groupname>
3)     Show consumers of <groupname>.
4) GROUPS <key>
5)     Show the stream consumer groups.
6) STREAM <key> [FULL [COUNT <count>]
7)     Show information about the stream.
8) HELP
9)     Prints this help.

与 Kafka(TM)分区的区别

Redis 流中的消费者组可能在某种程度上类似于 Kafka (TM) 基于分区的消费者组,但请注意,Redis 流在实际方面有很大不同。分区只是逻辑上的,消息只是放入单个 Redis 键中,因此不同客户端的服务方式取决于谁准备好处理新消息,而不是客户端正在从哪个分区读取消息。例如,如果消费者 C3 在某个时刻永久失败,Redis 将继续为 C1 和 C2 提供所有到达的新消息,就好像现在只有两个逻辑分区一样。

类似地,如果某个消费者处理消息的速度比其他消费者快得多,那么该消费者在相同时间内收到的消息也会相应增多。这是可能的,因为 Redis 会明确跟踪所有未确认的消息,并记住谁收到了哪条消息,以及第一条从未传递给任何消费者的消息的 ID。

但是,这也意味着,在 Redis 中,如果您确实想将同一流中的消息分区到多个 Redis 实例中,则必须使用多个键和一些分片系统(例如 Redis Cluster 或其他特定于应用程序的分片系统)。单个 Redis 流不会自动分区到多个实例。

我们可以从示意图上看出以下情况是正确的:

  • 如果您使用 1 个流 -> 1 个消费者,那么您将按顺序处理消息。
  • 如果您使用 N 个流和 N 个消费者,以便只有给定的消费者才会命中 N 个流的子集,那么您可以将上述 1 个流 -> 1 个消费者的模型进行扩展。
  • 如果您使用 1 个流 -> N 个消费者,那么您将对 N 个消费者进行负载平衡,但是在这种情况下,有关同一逻辑项的消息可能会无序使用,因为给定消费者处理消息 3 的速度可能比另一个消费者处理消息 4 的速度更快。

因此,基本上 Kafka 分区更类似于使用 N 个不同的 Redis 键,而 Redis 消费者组是将来自给定流的消息发送到 N 个不同消费者的服务器端负载平衡系统。

封顶溪流

许多应用程序不希望永远将数据收集到流中。有时,在流中最多包含给定数量的项目很有用,而其他时候,一旦达到给定的大小,将数据从 Redis 移动到不在内存中且速度不那么快但适合存储未来几十年历史记录的存储很有用。Redis 流对此有一些支持。一个是命令的MAXLENXADD选项。此选项使用起来非常简单:

使用MAXLEN,当达到指定长度时,旧条目会自动被逐出,这样流就会保持恒定的大小。目前没有选项可以告诉流只保留不早于给定时间段的项目,因为这样的命令为了一致运行,可能会长时间阻塞以逐出项目。例如,想象一下如果出现插入峰值,然后是长时间暂停,然后是另一次插入,所有这些都具有相同的最大时间,会发生什么。流将阻塞以逐出暂停期间变得太旧的数据。因此,用户需要进行一些规划并了解所需的最大流长度。此外,虽然流的长度与使用的内存成正比,但按时间修剪不太容易控制和预测:它取决于插入率,而插入率通常会随时间而变化(当它不变时,按大小修剪就很简单了)。

但是使用MAXLEN进行修剪可能代价高昂:为了非常节省内存,流由宏节点表示为基数树。更改由几十个元素组成的单个宏节点并不是最佳选择。因此可以使用以下特殊形式的命令:

XADD race:italy MAXLEN ~ 1000 * ... entry fields here ...

MAXLEN~选项和实际计数之间的参数意味着,我实际上并不需要它正好是 1000 个项目。它可以是 1000 或 1010 或 1030,只要确保至少保存 1000 个项目即可。使用此参数,只有当我们可以删除整个节点时才会执行修剪。这使得它更加高效,而且这通常是您想要的。您会注意到客户端库对此有各种实现。例如,Python 客户端默认为近似值,必须明确设置为真实长度。

还有该命令,其执行的操作与上面的MAXLENXTRIM选项非常相似,只是它可以单独运行:

或者,对于XADD选项:

但是,XTRIM设计为接受不同的修剪策略。另一种修剪策略是MINID,它会驱逐 ID 低于指定值的条目。

作为XTRIM一个明确的命令,用户应该了解不同修剪策略可能存在的缺点。

XTRIM将来可能会添加的另一个有用的驱逐策略是按一定范围的 ID 进行删除,以便于使用XRANGE,并XTRIM在需要时将数据从 Redis 移动到其他存储系统。

流 API 中的特殊 ID

您可能已经注意到,Redis API 中可以使用几个特殊 ID。下面是简短的回顾,以便将来可以更清楚地理解它们。

前两个特殊 ID 是-+,以及 用于命令的范围查询XRANGE。这两个 ID 分别表示可能的最小 ID(基本上是0-1)和可能的最大的 ID(即18446744073709551615-18446744073709551615)。如您所见,用-+代替这些数字要简洁得多。

然后,我们想要使用 API 来表示流中 ID 最大的项目的 ID。这就是它的$含义。例如,如果我只想要带有 ID 的新条目,XREADGROUP我会使用此 ID 来表示我已经拥有所有现有条目,但没有将来会插入的新条目。同样,当我创建或设置消费者组的 ID 时,我可以将最后交付的项目设置为,$以便只向组中的消费者交付新条目。

正如您所见,$并不意味着+,它们是两个不同的东西,因为+是每个可能流中可能的最大 ID,而$是包含给定条目的给定流中的最大 ID。此外,API 通常只会理解+$,但它有助于避免加载具有多重含义的给定符号。

另一个特殊 ID 是>,它的特殊含义仅与消费者组有关,并且仅在XREADGROUP使用该命令时才有。这个特殊 ID 意味着我们只想要迄今为止从未交付给其他消费者的条目。所以基本上 ID>是消费者组的最后交付 ID

最后,特殊 ID*只能与XADD命令一起使用,意味着为新条目自动选择一个 ID。

因此,我们有,,,-和,并且都有不同的含义,并且大多数时候可以在不同的语境中使用。+$>*

持久性、复制和消息安全

与任何其他 Redis 数据结构一样,Stream 会异步复制到副本并持久保存到 AOF 和 RDB 文件中。然而,可能不太明显的是,消费者组的完整状态也会传播到 AOF、RDB 和副本,因此如果主服务器上有一条待处理的消息,副本服务器也会有相同的信息。同样,重新启动后,AOF 将恢复消费者组的状态。

但请注意,Redis 流和消费者组使用 Redis 默认复制进行持久化和复制,因此:

  • 如果消息持久性在您的应用程序中很重要,则必须将 AOF 与强大的 fsync 策略一起使用。
  • 默认情况下,异步复制不会保证XADD命令或消费者组状态更改被复制:故障转移后,根据副本从主服务器接收数据的能力,可能会缺少某些内容。
  • WAIT命令可用于强制将更改传播到一组副本。但请注意,虽然这使得数据丢失的可能性很小,但由 Sentinel 或 Redis Cluster 操作的 Redis 故障转移过程仅会尽最大努力检查以故障转移到最新的副本,并且在某些特定故障条件下可能会升级缺少某些数据的副本。

因此,在使用 Redis 流和消费者组设计应用程序时,请确保了解应用程序在发生故障时应具有的语义属性,并进行相应的配置,评估它是否对您的用例足够安全。

从流中删除单个项目

流还具有一个特殊命令,用于仅通过 ID 从流中间删除项目。通常,对于仅附加数据结构,这可能看起来像一个奇怪的功能,但它实际上对于涉及隐私法规等的应用程序很有用。调用该命令XDEL并接收流的名称,后跟要删除的 ID:

但是在当前的实现中,直到宏节点完全空时内存才会真正被回收,因此您不应滥用此功能。

零长度流

流与其他 Redis 数据结构之间的区别在于,当其他数据结构不再包含任何元素时,调用删除元素的命令会产生副作用,即键本身将被删除。因此,例如,当调用删除有序集合中的最后一个元素时,有序集合将被完全删除。另一方面,流可以保留零个元素,这既是使用计数为零的MAXLENZREM选项的结果(和命令),也是因为调用了。XADDXTRIMXDEL

存在这种不对称的原因在于,Streams 可能有关联的消费者组,我们不希望仅仅因为 Stream 中不再有任何项目就丢失消费者组定义的状态。目前,即使 Stream 没有关联的消费者组,也不会删除该 Stream。

消费一条消息的总延迟

非阻塞流命令(例如XRANGEXREAD/或XREADGROUP不带 BLOCK 选项)与其他 Redis 命令一样同步执行,因此讨论此类命令的延迟毫无意义:查看 Redis 文档中命令的时间复杂度更有趣。可以说,在提取范围时,流命令至少与有序集合命令一样快,而且速度XADD非常快,如果使用流水线,在普通机器上每秒可以轻松插入 50 万到 100 万个项目。

然而,如果我们想要了解在消费者组中阻止消费者的背景下处理消息的延迟,从通过生成消息的那一刻到XADD消费者获得消息并随XREADGROUP消息返回的那一刻,延迟就会成为一个有趣的参数。

为被屏蔽的消费者提供服务的工作原理

在提供已执行测试的结果之前,有必要了解 Redis 使用什么模型来路由流消息(以及通常如何管理等待数据的任何阻塞操作)。

  • 被阻塞的客户端在哈希表中被引用,该哈希表将至少有一个阻塞消费者的键映射到等待该键的消费者列表。这样,给定一个接收数据的键,我们就可以解析所有等待此类数据的客户端。
  • 当发生写入时(在这种情况下XADD,当命令被调用时),它会调用该signalKeyAsReady()函数。此函数会将键放入需要处理的键列表中,因为这些键可能包含被阻止的消费者的新数据。请注意,这些就绪的键将在稍后处理,因此在同一事件循环周期中,该键可能会收到其他写入。
  • 最后,在返回事件循环之前,最终处理就绪的键。对于每个键,都会扫描等待数据的客户端列表,如果适用,这些客户端将接收到达的新数据。对于流,数据是消费者请求的适用范围内的消息。

如您所见,基本上,在返回事件循环之前,调用的客户端XADD和被阻止使用消息的客户端都会在输出缓冲区中收到它们的回复,因此调用者XADD应该在消费者收到新消息的同时收到来自 Redis 的回复。

该模型是基于推送的,因为将数据添加到消费者缓冲区将直接通过调用的操作来执行XADD,所以延迟往往是相当可预测的。

延迟测试结果

为了检查这些延迟特性,我们进行了一项测试,使用多个 Ruby 程序实例推送消息,其中附加字段为计算机毫秒时间,Ruby 程序从消费者组读取消息并进行处理。消息处理步骤包括将当前计算机时间与消息时间戳进行比较,以了解总延迟。

获得的结果:

Processed between 0 and 1 ms -> 74.11%
Processed between 1 and 2 ms -> 25.80%
Processed between 2 and 3 ms -> 0.06%
Processed between 3 and 4 ms -> 0.01%
Processed between 4 and 5 ms -> 0.02%

因此,99.9% 的请求的延迟 <= 2 毫秒,而异常值仍然非常接近平均值。

向流中添加几百万条未确认的消息不会改变基准测试的要点,大多数查询仍会以非常短的延迟进行处理。

几点说明:

  • 这里我们每次迭代处理最多 10k 条消息,这意味着参数COUNT设置XREADGROUP为 10000。这会增加很多延迟,但是为了使慢速消费者能够跟上消息流,这是必要的。因此,您可以预期实际延迟会小得多。
  • 与今天的标准相比,用于该基准测试的系统非常慢。

了解更多

给此页面评分
返回顶部 ↑