当前位置:首页 » 《随便一记》 » 正文

Redis精通系列——Stream_李子捌的博客

5 人参与  2021年11月11日 12:23  分类 : 《随便一记》  评论

点击全文阅读


  本文已收录于专栏

《Redis精通系列》

上千人点赞收藏,全套Redis学习资料,大厂必备技能!


目录

1、简介

2、Stream内部探索

2.1 Stream 结构

2.2 四个唯一

2.3 消息ID

2.4 消息内容

3、Stream指令

3.1 指令汇总

3.2 XADD

3.2 XTRIM

3.3 XDEL

3.4 XLEN

3.5 XRANGE

3.6 XREVRANGE

3.7 XREAD

3.8 XGROUP CREATE

3.9 XREADGROUP GROUP

3.10 XACK

3.11 XPENDING

3.11 XCLAIM

3.13 XINFO

4、关于Stream优化内存的事情


1、简介

Stream弥补了Redis作为MQ(message queue)技术选型上的不足之处;Redis 5.0发布的Stream相比Pub/Sub模块,Stream支持消息持久化,结合sentinel或cluster使其成为了一个比较可靠的消息队列。尽管我认为它很难成为公司MQ的技术选型产品,但是关于Stream的使用和特性(消费组),仍值得一探究竟。

Stream对标消息队列,因此几乎具备了MQ所有的特性,以下列出Stream所具有的部分特性:

  • 消息顺序存储
  • 消息ID序列化规则生成
  • 消息的遍历
  • 消息阻塞/非阻塞式获取
  • 客户端分组消费消息
  • 消息确认机制
  • 消息异常机制
  • 消息队列监控

在文中也会说到Stream的这些特性。

2、Stream内部探索

2.1 Stream 结构

在探索Stream的内部结构之前,先看一张清晰的Stream结构图:

stream.png

如下是关于上图的名词解析:

  • Message Content:消息内容
  • Consumer group:消费组,通过XGROUP CREATE 命令创建,一个消费组可以有多个消费者
  • Last_delivered_id:游标,每个消费组有一个游标,任意消费者读取消息后,游标都会向前移动
  • Consumer:消费者,消费组中的消费者
  • Pending_ids:状态变量,每个消费者会有一个状态变量,用于记录被当前消费者读取,但是并未ack的消息id

2.2 四个唯一

Stream内部维护了一个消息链表,以此使得消息能够具有队列的特性。在Stream中有四个唯一需要了解:

  1. 每个Stream都具有唯一的名称
  2. 每个消息(Message)都具有一个由系统分配或者客户端指定唯一ID
  3. 每个Stream中的消费组(Consumer_Group)具有唯一名称
  4. 每个消费组(Consumer_Group)中的消费者(Consumer)具有唯一名称

2.3 消息ID

Stream的消息ID可以由服务端自动生成,也可以由客户端传入,如下图是自动生成的结构:

image.png


系统自动生成的规则

<millisecondsTime>-<sequenceNumber>

millisecondsTime指的是Redis节点服务器的本地时间,如果存在当前的毫秒时间戳比以前已经存在的数据的时间戳小的话(本地时间钟后跳),那么系统将会采用以前相同的毫秒创建新的ID。
sequenceNumber指的是序列号,在相同的millisecondsTime毫秒下,序列号从0开始递增,序列号是64位长度,理论上在统一毫秒内生成的数据量无法到达这个级别,因此不用担心sequenceNumber会不够用。

客户端显示传入规则
Redis对于ID有强制要求,格式必须是-,最小ID为0-1,并且后续ID不能小于前一个ID

2.4 消息内容

Stream的消息内容,也就是图中的Message Content它的结构类似Hash结构,以key-value的形式存在。

3、Stream指令

3.1 指令汇总

Stream的指令根据可以分为两类,分别是消息队列相关指令,消费组相关指令。
消息队列相关指令:

指令名称指令作用
XADD添加消息到队列末尾
XTRIM限制Stream的长度,如果已经超长会进行截取
XDEL删除消息
XLEN获取Stream中的消息长度
XRANGE获取消息列表(可以指定范围),忽略删除的消息
XREVRANGE和XRANGE相比区别在于反向获取,ID从大到小
XREAD获取消息(阻塞/非阻塞),返回大于指定ID的消息

消费组相关指令:

指令名称指令作用
XGROUP CREATE创建消费者组
XREADGROUP GROUP读取消费者组中的消息
XACKack消息,消息被标记为“已处理”
XGROUP SETID设置消费者组最后递送消息的ID
XGROUP DELCONSUMER删除消费者组
XPENDING打印待处理消息的详细信息
XCLAIM转移消息的归属权(长期未被处理/无法处理的消息,转交给其他消费者组进行处理)
XINFO打印Stream\Consumer\Group的详细信息
XINFO GROUPS打印消费者组的详细信息
XINFO STREAM打印Stream的详细信息

3.2 XADD

XADD 用于向Stream 队列中添加消息,如果指定的Stream 队列不存在,则该命令执行时会新建一个Stream 队列。

XADD的指令语法:

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value …]

如下通过XADD展示了定义ID的两种方式,具体可以看2.3。

image.png

3.2 XTRIM

XTRIM 用于对Stream的长度进行限定。

XTRIM 的指令语法:

XTRIM key MAXLEN|MINID [=|~] threshold [LIMIT count]

  • MAXLEN 允许的最大长度,如果长度超出则会抛弃队列前面的消息
  • MINID 允许的最小id,从某个id值开始保留,其余的将会被抛弃

image.png

3.3 XDEL

XDEL 用于删除消息。

XDEL 的指令语法:

XDEL key ID [ID …]

image.png

3.4 XLEN

XLEN 用于获取Stream 队列的消息的长度。

XLEN 的指令语法:

XLEN key

image.png

3.5 XRANGE

XRANGE 用于获取消息列表(可以指定范围),忽略删除的消息。

XRANGE 的指令语法:

XRANGE key start end [COUNT count]

  • start 表示开始值,-代表最小值
  • end 表示结束值,+代表最大值
  • count 表示最多获取多少个值

image.png

3.6 XREVRANGE

XREVRANGE 用于获取消息列表(可以指定范围),忽略删除的消息。与XRANGE 的区别在于,获取消息列表元素的方向是相反的,end在前,start在后。

XREVRANGE 的指令语法:

XREVRANGE key end start [COUNT count]

image.png

3.7 XREAD

XREAD 用于获取消息(阻塞/非阻塞),只会返回大于指定ID的消息。

XREAD 的指令语法:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key …] ID [ID …]

  • COUNT 最多读取多少条消息
  • BLOCK 是否已阻塞的方式读取消息,默认不阻塞,如果milliseconds设置为0,表示永远阻塞

$代表特殊ID,表示以当前Stream已经存储的最大的ID作为最后一个ID,当前Stream中不存在大于当前最大ID的消息,因此此时返回nil。

0-0代表从最小的ID开始获取Stream中的消息,当不指定count,将会返回Stream中的所有消息,注意也可以使用0(00/000也都是可以的……)。

image.png

 阻塞方式获取Stream中的指令,这里演示阻塞获取一条消息

image.png

image.png


3.8 XGROUP CREATE

XGROUP CREATE 用于创建消费者组。

XGROUP CREATE 的指令语法:

XGROUP [CREATE key groupname ID|$ [MKSTREAM]] [SETID key groupname ID|$] [DESTROY key groupname] [CREATECONSUMER key groupname consumername] [DELCONSUMER key groupname consumername]

XGROUP CREATE中的指令没什么复杂的,第一个中括号中的几个参数最为重要,如下图两种方式:

  • $表示从Stream尾部开始消费,会忽略Stream中目前已有的数据
  • 0表示从Stream头部开始消费

image.png

如果Stream不存在,XGROUP CREATE 语法将会报错,因此可以得出不允许在不存在的Stream上创建消费者组

image.png

3.9 XREADGROUP GROUP

XREADGROUP GROUP 用于读取消费者组中的消息。

XREADGROUP GROUP 的指令语法:

XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key …] ID [ID …]

注意,这里有一个比较重要的知识点,刚开始的时候可能容易搞错:
>这个特殊符号表示消息到目前为止,从未传递给其他消费者的消息
0表示指定消息ID,因为ID均大于0-0(0代指0-0),因此代表从Stream 的队列头部开始获取消息

在如下截图中,为何第一次 mystream 0 获取消息返回empty,在执行完 mystream > 之后,第二 mystream 0 却成功的获取到了消息,但是很明显mystream中刚添加了两条消息,第一次不应该失败才对呀?
这是因为,当指定ID进行消息获取时,命令将会让我们访问我们的历史待处理消息(曾被获取,但是未ack)。即传递给这个指定消费者(由提供的名称标识)的消息集,并且到目前为止从未使用XACK进行确认。

image.png

XREADGROUP GROUP 也可以像XREAD 一样使用阻塞的方式获取消息

image.png

当向mystream中添加消息后,阻塞读返回

image.png

3.10 XACK

XACK 用于标记为“已处理”。

XACK 的指令语法:

XACK key group ID [ID …]

结合 XREADGROUP GROUP 中指定ID的方式只能获取未ack的未处理消息的特性,测试XACK指令。从如下的测试示例中可以得出两个结论:

  • 消息首次ack成功,返回1,ack失败返回0
  • 3.9中的结论是正确的

image.png

3.11 XPENDING

XPENDING 用于打印待处理消息的详细信息。

XPENDING 指令是非常有用的,因为它可以打印待处理消息的信息。如果在一个消费者组中存在多个消费者,如果存在部分消费者永久的故障,无法再处理消息了,我们就可以通过XPENDING 指令来查看指定消费者组中的消费者未ack的消息,然后转移给其他消费者进行处理。

XPENDING 的指令语法:

XPENDING key group [[IDLE min-idle-time] start end count [consumer]]

XPENDING 返回值解析:

  1. 第一个参数表示当前消费者中待处理消息的总数
  2. 第二个参数表示待处理消息的最小ID
  3. 第三个参数表示待处理消息的最大ID
  4. 第四个参数表示消费者列表和未处理的消息数量

image.png

image.png


3.11 XCLAIM

XCLAIM 用于转移消息的归属权。

XCLAIM 的指令语法:

XCLAIM key group consumer min-idle-time ID [ID …] [IDLE ms] [TIME ms-unix-time] [RET

指令参数解析:

  • key 表示Stream的名称
  • group 表示需要转移消息的归属权的消费者组名称
  • consumer 表示接收消息的消费者名称
  • min-idle-time 表示最小空闲时间,只有后续指定ID的消息空闲时间大于指定的空闲时间,消息归属权转移指令才会生效
  • ID [] 需要转移归属权的消息ID,数组,可以是多个

示例中,将consumer-1中ID为1631719560149-0的未处理的消息的归属权转移到consumer-2下:

image.png

3.13 XINFO

XINFO 用于打印Stream\Consumer\Group的详细信息。

XINFO 的指令语法:

XINFO [CONSUMERS key groupname] [GROUPS key] [STREAM key] [HELP]

示例打印指定STREAM的详细消息

image.png

4、关于Stream优化内存的事情

使用Stream有两个点需要注意,如果使用不当都会导致内存消耗增大。

  1. 待处理消息过多,消息未及时ack
  2. Stream消息持续持久化,使用XDEL删除消息

关于第一点,待处理消息过多,消息未及时ack,其导致内存增加的原因是,Stream会为每个消费者维护一个PEL列表,PEL列表用于存储处理完但未及时ack的消息ID。我们在实际使用过程中,处理完的消息一定要及时ack,也有定时检查是否有消费者不可用导致消息堆积的情况。
XPENDING能查询出消费者中待处理的消息,就是因为有PEL的存在。

image.png

关于第二点,使用XDEL删除Stream中不在需要的消息,其导致内存增加的原因是,Stream的XDEL删除消息的指令,并不会从内存上删除消息,它只是给消息打上标记位,下次通过XRANGE指令忽略这些消息而已。因此我们可以设置Stream的最大长度,来解决这个问题,在XADD中使用MAXLEN指定Stream队列的长度,当消息超出长度就会将队列头消息清除掉。(不过这种处理方式一定要做到及时处理消息,避免消息的丢失。)

XADD key [NOMKSTREAM] [MAXLEN|MINID [=|~] threshold [LIMIT count]] *|ID field value [field value …]

 


点击全文阅读


本文链接:http://zhangshiyu.com/post/30773.html

消息  指令  消费者  
<< 上一篇 下一篇 >>

  • 评论(0)
  • 赞助本站

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

关于我们 | 我要投稿 | 免责申明

Copyright © 2020-2022 ZhangShiYu.com Rights Reserved.豫ICP备2022013469号-1