23-拓展 1:耳听八方 —— Stream
课程
1
开篇:授人以鱼不若授人以渔 —— Redis 可以用来做什么?
学习时长: 5分21秒
2
基础:万丈高楼平地起 —— Redis 基础数据结构
上次学到
学习时长: 16分14秒
3
应用 1:千帆竞发 —— 分布式锁
学习时长: 7分47秒
4
应用 2:缓兵之计 —— 延时队列
学习时长: 8分9秒
5
应用 3:节衣缩食 —— 位图
学习时长: 8分52秒
6
应用 4:四两拨千斤 —— HyperLogLog
学习时长: 14分17秒
7
应用 5:层峦叠嶂 —— 布隆过滤器
学习时长: 17分54秒
8
应用 6:断尾求生 —— 简单限流
学习时长: 4分37秒
9
应用 7:一毛不拔 —— 漏斗限流
学习时长: 7分22秒
10
应用 8:近水楼台 —— GeoHash
学习时长: 7分52秒
11
应用 9:大海捞针 —— Scan
学习时长: 8分42秒
12
原理 1:鞭辟入里 —— 线程 IO 模型
学习时长: 4分1秒
13
原理 2:交头接耳 —— 通信协议
学习时长: 3分34秒
14
原理 3:未雨绸缪 —— 持久化
学习时长: 5分27秒
15
原理 4:雷厉风行 —— 管道
学习时长: 3分51秒
16
原理 5:同舟共济 —— 事务
学习时长: 6分36秒
17
原理 6:小道消息 —— PubSub
学习时长: 7分7秒
18
原理 7:开源节流 —— 小对象压缩
学习时长: 7分14秒
19
原理 8:有备无患 —— 主从同步
学习时长: 4分9秒
20
集群 1:李代桃僵 —— Sentinel
学习时长: 3分52秒
21
集群 2:分而治之 —— Codis
学习时长: 7分28秒
22
集群 3:众志成城 —— Cluster
学习时长: 8分38秒
23
拓展 1:耳听八方 —— Stream
学习时长: 13分40秒
24
拓展 2:无所不知 —— Info 指令
学习时长: 4分4秒
25
拓展 3:拾遗补漏 —— 再谈分布式锁
学习时长: 2分18秒
26
拓展 4:朝生暮死 —— 过期策略
学习时长: 2分21秒
27
拓展 5:优胜劣汰 —— LRU
学习时长: 4分34秒
28
拓展 6:平波缓进 —— 懒惰删除
学习时长: 2分13秒
29
拓展 7:妙手仁心 —— 优雅地使用 Jedis
学习时长: 6分35秒
30
拓展 8:居安思危 —— 保护 Redis
学习时长: 2分19秒
31
拓展 9:隔墙有耳 —— Redis 安全通信
学习时长: 6分34秒
32
拓展 10:法力无边 —— Redis Lua 脚本执行原理
学习时长: 9分24秒
33
拓展 11:短小精悍 —— 命令行工具的妙用
学习时长: 9分21秒
34
源码 1:丝分缕析 —— 探索「字符串」内部
学习时长: 5分20秒
35
源码 2:循序渐进 —— 探索「字典」内部
学习时长: 7分24秒
36
源码 3:挨肩迭背 —— 探索「压缩列表」内部
学习时长: 10分42秒
37
源码 4:风驰电掣 —— 探索「快速列表」内部
学习时长: 3分49秒
38
源码 5:凌波微步 —— 探索「跳跃列表」内部
学习时长: 9分57秒
39
源码 6:破旧立新 —— 探索「紧凑列表」内部
学习时长: 2分42秒
40
源码 7:金枝玉叶 —— 探索「基数树」内部
学习时长: 5分36秒
41
源码 8:精益求精 —— LFU vs LRU
学习时长: 8分4秒
42
源码 9:如履薄冰 —— 懒惰删除的巨大牺牲
学习时长: 9分53秒
43
源码 10:跋山涉水 —— 深入字典遍历
学习时长: 9分24秒
44
源码 11:见缝插针 —— 探索 HyperLogLog 内部
学习时长: 13分3秒
45
尾声:百尺竿头 —— 继续深造指南
学习时长: 2分32秒
juejin_logo copyCreated with Sketch.

拓展 1:耳听八方 —— Stream

这篇文章的主要部分是我在 2018 年 6 月 1 日 发表的内容,当时 Redis5.0 刚刚被 Antirez 放出来,老钱抢先分析了 Redis5.0 最重要的特性 Stream,也被技术圈大号「高可用架构」收录转载,受到了广泛关注。事后我又对文章的内容做了进一步优化,并整合了一些读者的疑难问题解答。

Redis5.0 被作者 Antirez 突然放了出来,增加了很多新的特色功能。而 Redis5.0 最大的新特性就是多出了一个数据结构 Stream,它是一个新的强大的支持多播的可持久化的消息队列,作者坦言 Redis Stream 狠狠地借鉴了 Kafka 的设计。

Redis Stream 的结构如上图所示,它有一个消息链表,将所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。消息是持久化的,Redis 重启后,内容还在。

每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用xadd指令追加消息时自动创建。

每个 Stream 都可以挂多个消费组,每个消费组会有个游标last_delivered_id在 Stream 数组之上往前移动,表示当前消费组已经消费到哪条消息了。每个消费组都有一个 Stream 内唯一的名称,消费组不会自动创建,它需要单独的指令xgroup create进行创建,需要指定从 Stream 的某个消息 ID 开始消费,这个 ID 用来初始化last_delivered_id变量。

每个消费组 (Consumer Group) 的状态都是独立的,相互不受影响。也就是说同一份 Stream 内部的消息会被每个消费组都消费到。

同一个消费组 (Consumer Group) 可以挂接多个消费者 (Consumer),这些消费者之间是竞争关系,任意一个消费者读取了消息都会使游标last_delivered_id往前移动。每个消费者有一个组内唯一名称。

消费者 (Consumer) 内部会有个状态变量pending_ids,它记录了当前已经被客户端读取的消息,但是还没有 ack。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为PEL,也就是Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。

消息 ID

消息 ID 的形式是timestampInMillis-sequence,例如1527846880572-5,它表示当前的消息在毫米时间戳1527846880572时产生,并且是该毫秒内产生的第 5 条消息。消息 ID 可以由服务器自动生成,也可以由客户端自己指定,但是形式必须是整数-整数,而且必须是后面加入的消息的 ID 要大于前面的消息 ID。

消息内容

消息内容就是键值对,形如 hash 结构的键值对,这没什么特别之处。

增删改查

  1. xadd 追加消息
  2. xdel 删除消息,这里的删除仅仅是设置了标志位,不影响消息总长度
  3. xrange 获取消息列表,会自动过滤已经删除的消息
  4. xlen 消息长度
  5. del 删除 Stream
# * 号表示服务器自动生成 ID,后面顺序跟着一堆 key/value
#  名字叫 laoqian,年龄 30 岁
127.0.0.1:6379> xadd codehole * name laoqian age 30  
1527849609889-0  # 生成的消息 ID
127.0.0.1:6379> xadd codehole * name xiaoyu age 29
1527849629172-0
127.0.0.1:6379> xadd codehole * name xiaoqian age 1
1527849637634-0
127.0.0.1:6379> xlen codehole
(integer) 3
# -表示最小值 , + 表示最大值
127.0.0.1:6379> xrange codehole - +
127.0.0.1:6379> xrange codehole - +
1) 1) 1527849609889-0
   2) 1) "name"
      2) "laoqian"
      3) "age"
      4) "30"
2) 1) 1527849629172-0
   2) 1) "name"
      2) "xiaoyu"
      3) "age"
      4) "29"
3) 1) 1527849637634-0
   2) 1) "name"
      2) "xiaoqian"
      3) "age"
      4) "1"
# 指定最小消息 ID 的列表
127.0.0.1:6379> xrange codehole 1527849629172-0 +  
1) 1) 1527849629172-0
   2) 1) "name"
      2) "xiaoyu"
      3) "age"
      4) "29"
2) 1) 1527849637634-0
   2) 1) "name"
      2) "xiaoqian"
      3) "age"
      4) "1"
# 指定最大消息 ID 的列表
127.0.0.1:6379> xrange codehole - 1527849629172-0
1) 1) 1527849609889-0
   2) 1) "name"
      2) "laoqian"
      3) "age"
      4) "30"
2) 1) 1527849629172-0
   2) 1) "name"
      2) "xiaoyu"
      3) "age"
      4) "29"
127.0.0.1:6379> xdel codehole 1527849609889-0
(integer) 1
# 长度不受影响
127.0.0.1:6379> xlen codehole
(integer) 3
# 被删除的消息没了
127.0.0.1:6379> xrange codehole - +
1) 1) 1527849629172-0
   2) 1) "name"
      2) "xiaoyu"
      3) "age"
      4) "29"
2) 1) 1527849637634-0
   2) 1) "name"
      2) "xiaoqian"
      3) "age"
      4) "1"
# 删除整个 Stream
127.0.0.1:6379> del codehole
(integer) 1

独立消费

我们可以在不定义消费组的情况下进行 Stream 消息的独立消费,当 Stream 没有新消息时,甚至可以阻塞等待。Redis 设计了一个单独的消费指令xread,可以将 Stream 当成普通的消息队列 (list) 来使用。使用 xread 时,我们可以完全忽略消费组 (Consumer Group) 的存在,就好比 Stream 就是一个普通的列表 (list)。

# 从 Stream 头部读取两条消息
127.0.0.1:6379> xread count 2 streams codehole 0-0
1) 1) "codehole"
   2) 1) 1) 1527851486781-0
         2) 1) "name"
            2) "laoqian"
            3) "age"
            4) "30"
      2) 1) 1527851493405-0
         2) 1) "name"
            2) "yurui"
            3) "age"
            4) "29"
# 从 Stream 尾部读取一条消息,毫无疑问,这里不会返回任何消息
127.0.0.1:6379> xread count 1 streams codehole $
(nil)
# 从尾部阻塞等待新消息到来,下面的指令会堵住,直到新消息到来
127.0.0.1:6379> xread block 0 count 1 streams codehole $
# 我们从新打开一个窗口,在这个窗口往 Stream 里塞消息
127.0.0.1:6379> xadd codehole * name youming age 60
1527852774092-0
# 再切换到前面的窗口,我们可以看到阻塞解除了,返回了新的消息内容
# 而且还显示了一个等待时间,这里我们等待了 93s
127.0.0.1:6379> xread block 0 count 1 streams codehole $
1) 1) "codehole"
   2) 1) 1) 1527852774092-0
         2) 1) "name"
            2) "youming"
            3) "age"
            4) "60"
(93.11s)

客户端如果想要使用 xread 进行顺序消费,一定要记住当前消费到哪里了,也就是返回的消息 ID。下次继续调用 xread 时,将上次返回的最后一个消息 ID 作为参数传递进去,就可以继续消费后续的消息。

block 0 表示永远阻塞,直到消息到来,block 1000 表示阻塞 1s,如果 1s 内没有任何消息到来,就返回 nil

127.0.0.1:6379> xread block 1000 count 1 streams codehole $
(nil)
(1.07s)

创建消费组

Stream 通过xgroup create指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化last_delivered_id变量。

#  表示从头开始消费
127.0.0.1:6379> xgroup create codehole cg1 0-0
OK
# $ 表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略
127.0.0.1:6379> xgroup create codehole cg2 $
OK
# 获取 Stream 信息
127.0.0.1:6379> xinfo stream codehole
 1) length
 2) (integer) 3  # 共 3 个消息
 3) radix-tree-keys
 4) (integer) 1
 5) radix-tree-nodes
 6) (integer) 2
 7) groups
 8) (integer) 2  # 两个消费组
 9) first-entry  # 第一个消息
10) 1) 1527851486781-0
    2) 1) "name"
       2) "laoqian"
       3) "age"
       4) "30"
11) last-entry  # 最后一个消息
12) 1) 1527851498956-0
    2) 1) "name"
       2) "xiaoqian"
       3) "age"
       4) "1"
# 获取 Stream 的消费组信息
127.0.0.1:6379> xinfo groups codehole
1) 1) name
   2) "cg1"
   3) consumers
   4) (integer) 0  # 该消费组还没有消费者
   5) pending
   6) (integer) 0  # 该消费组没有正在处理的消息
2) 1) name
   2) "cg2"
   3) consumers  # 该消费组还没有消费者
   4) (integer) 0
   5) pending
   6) (integer) 0  # 该消费组没有正在处理的消息

消费

Stream 提供了 xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID。它同 xread 一样,也可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack 指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。

# > 号表示从当前消费组的 last_delivered_id 后面开始读
# 每当消费者读取一条消息,last_delivered_id 变量就会前进
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
1) 1) "codehole"
   2) 1) 1) 1527851486781-0
         2) 1) "name"
            2) "laoqian"
            3) "age"
            4) "30"
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
1) 1) "codehole"
   2) 1) 1) 1527851493405-0
         2) 1) "name"
            2) "yurui"
            3) "age"
            4) "29"
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 2 streams codehole >
1) 1) "codehole"
   2) 1) 1) 1527851498956-0
         2) 1) "name"
            2) "xiaoqian"
            3) "age"
            4) "1"
      2) 1) 1527852774092-0
         2) 1) "name"
            2) "youming"
            3) "age"
            4) "60"
# 再继续读取,就没有新消息了
127.0.0.1:6379> xreadgroup GROUP cg1 c1 count 1 streams codehole >
(nil)
# 那就阻塞等待吧
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
# 开启另一个窗口,往里塞消息
127.0.0.1:6379> xadd codehole * name lanying age 61
1527854062442-0
# 回到前一个窗口,发现阻塞解除,收到新消息了
127.0.0.1:6379> xreadgroup GROUP cg1 c1 block 0 count 1 streams codehole >
1) 1) "codehole"
   2) 1) 1) 1527854062442-0
         2) 1) "name"
            2) "lanying"
            3) "age"
            4) "61"
(36.54s)
# 观察消费组信息
127.0.0.1:6379> xinfo groups codehole
1) 1) name
   2) "cg1"
   3) consumers
   4) (integer) 1  # 一个消费者
   5) pending
   6) (integer) 5  # 共 5 条正在处理的信息还有没有 ack
2) 1) name
   2) "cg2"
   3) consumers
   4) (integer) 0  # 消费组 cg2 没有任何变化,因为前面我们一直在操纵 cg1
   5) pending
   6) (integer) 0
# 如果同一个消费组有多个消费者,我们可以通过 xinfo consumers 指令观察每个消费者的状态
127.0.0.1:6379> xinfo consumers codehole cg1  # 目前还有 1 个消费者
1) 1) name
   2) "c1"
   3) pending
   4) (integer) 5  # 共 5 条待处理消息
   5) idle
   6) (integer) 418715  # 空闲了多长时间 ms 没有读取消息了
# 接下来我们 ack 一条消息
127.0.0.1:6379> xack codehole cg1 1527851486781-0
(integer) 1
127.0.0.1:6379> xinfo consumers codehole cg1
1) 1) name
   2) "c1"
   3) pending
   4) (integer) 4  # 变成了 5 条
   5) idle
   6) (integer) 668504
# 下面 ack 所有消息
127.0.0.1:6379> xack codehole cg1 1527851493405-0 1527851498956-0 1527852774092-0 1527854062442-0
(integer) 4
127.0.0.1:6379> xinfo consumers codehole cg1
1) 1) name
   2) "c1"
   3) pending
   4) (integer) 0  # pel 空了
   5) idle
   6) (integer) 745505

Stream 消息太多怎么办?

读者很容易想到,要是消息积累太多,Stream 的链表岂不是很长,内容会不会爆掉?xdel 指令又不会删除消息,它只是给消息做了个标志位。

Redis 自然考虑到了这一点,所以它提供了一个定长 Stream 功能。在 xadd 的指令提供一个定长长度 maxlen,就可以将老的消息干掉,确保最多不超过指定长度。

127.0.0.1:6379> xlen codehole
(integer) 5
127.0.0.1:6379> xadd codehole maxlen 3 * name xiaorui age 1
1527855160273-0
127.0.0.1:6379> xlen codehole
(integer) 3

我们看到 Stream 的长度被砍掉了。如果 Stream 在未来可以提供按时间戳清理消息的规则那就更加完美了,但是目前还没有。

消息如果忘记 ACK 会怎样?

Stream 在每个消费者结构中保存了正在处理中的消息 ID 列表 PEL,如果消费者收到了消息处理完了但是没有回复 ack,就会导致 PEL 列表不断增长,如果有很多消费组的话,那么这个 PEL 占用的内存就会放大。

PEL 如何避免消息丢失?

在客户端消费者读取 Stream 消息时,Redis 服务器将消息回复给客户端的过程中,客户端突然断开了连接,消息就丢失了。但是 PEL 里已经保存了发出去的消息 ID。待客户端重新连上之后,可以再次收到 PEL 中的消息 ID 列表。不过此时 xreadgroup 的起始消息 ID 不能为参数>,而必须是任意有效的消息 ID,一般将参数设为 0-0,表示读取所有的 PEL 消息以及自last_delivered_id之后的新消息。

Stream 的高可用

Stream 的高可用是建立主从复制基础上的,它和其它数据结构的复制机制没有区别,也就是说在 Sentinel 和 Cluster 集群环境下 Stream 是可以支持高可用的。不过鉴于 Redis 的指令复制是异步的,在 failover 发生时,Redis 可能会丢失极小部分数据,这点 Redis 的其它数据结构也是一样的。

分区 Partition

Redis 的服务器没有原生支持分区能力,如果想要使用分区,那就需要分配多个 Stream,然后在客户端使用一定的策略来生产消息到不同的 Stream。你也许会认为 Kafka 要先进很多,它是原生支持 Partition 的。关于这一点,我并不认同。记得 Kafka 的客户端也存在 HashStrategy 么,因为它也是通过客户端的 hash 算法来将不同的消息塞入不同分区的。

另外,Kafka 还支持动态增加分区数量的能力,但是这种调整能力也是很蹩脚的,它不会把之前已经存在的内容进行 rehash,不会重新分区历史数据。这种简单的动态调整的能力 Redis Stream 通过增加新的 Stream 就可以做到。

小结

Stream 的消费模型借鉴了 Kafka 的消费分组的概念,它弥补了 Redis Pub/Sub 不能持久化消息的缺陷。但是它又不同于 kafka,Kafka 的消息可以分 partition,而 Stream 不行。如果非要分 parition 的话,得在客户端做,提供不同的 Stream 名称,对消息进行 hash 取模来选择往哪个 Stream 里塞。

如果读者稍微研究过 Redis 作者的另一个开源项目 Disque 的话,这极可能是作者意识到 Disque 项目的活跃程度不够,所以将 Disque 的内容移植到了 Redis 里面。这只是本人的猜测,未必是作者的初衷。如果读者有什么不同的想法,可以在评论区一起参与讨论。

留言
Ctrl + Enter
全部评论(22)
BinK_1783的头像
删除
在照顾好自己的同时,肆无忌惮的前行
点赞
回复
张新宇君的头像
删除
你好,请问忘记ack会导致pel过大的问题有没有好的预防措施?
点赞
回复
FATMAN89的头像
删除
前亚马逊工程师
老钱您好,想问一下,您文章里的图是用什么软件工具制作的?
点赞
2
删除
像visio
点赞
回复
删除
draw.io
点赞
回复
叶落枫溪的头像
删除
xdel 删除消费ID后,消息的长度xlen也会变化啊
点赞
1
删除
我的也变了,可能redis改版了?
点赞
回复
山里汉子的头像
删除
怎么用微信购买?
点赞
回复
battle_field的头像
删除
kafka分区在客户端进行是因为需要支持用户指定分区方案。而且kafka有rebalance,一个消费者挂掉会将分区分给其他消费者使用,redis的stream就没办法做到,因为多个stream的消息分配以及消费者分配完全是用户来做的。所以kafka确实是支持原生partition的
3
回复
篱笆墙外的余晖的头像
删除
xdel 目前的stable版本是将消息删除,而不是标记吧
点赞
2
删除
今天也试了下,stable版本的xdel是将消息删除的
点赞
回复
删除
我的是5.0.3 也是删除
点赞
回复
fengwutianya的头像
删除
没有讲stream的代码实现,和kafka的实现对比
6
回复
Leon在掘金51447的头像
删除
针对生产速度远大于消费速度的场景来说,maxlen的大小设置就得好好斟酌了,很可能造成“丢”消息的情况。
另外想请教一下,如果生产时发生网络异常,如果判断该消息是否存到stream中了呢,还是说直接重试,让消费者去处理这种重复消息?
7
回复
程序猿5199的头像
删除
果然是“狠狠的借鉴了kafka”
2
回复
noodleprince的头像
删除
能讲讲stream的持久化机制么?既然比pub/sub有进步就在持久化,可惜文中并没有提到
点赞
1
删除
(作者)
简单一点说,Stream持久化机制和普通数据结构一样,rdb+aof
1
回复
嘴的头像
删除
后端 @ 无业
功能越来越强大了,轻业务类型的可以不用上mq了
点赞
1
删除
(作者)
有这个可能
点赞
回复
LeGrandmechantRenard的头像
删除
想吃软饭 @ 某支付公司
额学习了。第六种数据类型.
Redis5.0 被作者 Antirez 突然放了出来, 我看第一句就感觉好笑,仿佛是被关在监狱里似的。
点赞
2
删除
(作者)
antirez酝酿了好久
点赞
回复
删除
对象类型?
点赞
回复