网易首页 > 网易号 > 正文 申请入驻

别再用 Redis List 实现消息队列了,Stream 专为队列而生

0
分享至









使用 Redis 的 List 实现消息队列有很多局限性,比如:

  • 没有良好的 ACK 机制;

  • 没有 ConsumerGroup 消费组概念;

  • 消息堆积。

  • List 是线性结构,想要查询指定数据需要遍历整个列表;

Stream 是 Redis 5.0 引入的一种专门为消息队列设计的数据类型,Stream 是一个包含 0 个或者多个元素的有序队列,这些元素根据 ID 的大小进行有序排列。

它实现了大部分消息队列的功能:

  • 消息 ID 系列化生成;

  • 消息遍历;

  • 消息的阻塞和非阻塞读;

  • Consumer Groups消费组;

  • ACK 确认机制。

  • 支持多播。

提供了很多消息队列操作命令,并且借鉴 Kafka 的Consumer Groups的概念,提供了消费组功能。

同时提供了消息的持久化和主从复制机制,客户端可以访问任何时刻的数据,并且能记住每一个客户端的访问位置,从而保证消息不丢失。

废话少说,先来看下如何使用,官网文档详见:

https://redis.io/topics/streams-intro

「云岚宗众弟子听命,击杀萧炎!」

当云山最后一字落下,那弥漫的紧绷气氛,顿时宣告破碎,悬浮半空的众多云岚宗长老背后双翼一振,便是咻咻的划过天际,追杀萧炎。

云山使用以下指令向队列中插入「追杀萧炎」命令,让长老带领子弟去执行。

XADD 云岚宗 * task kill name 萧炎
"1645936602161-0"

Stream 中的每个元素由键值对的形式组成,不同元素可以包含不同数量的键值对

该命令的语法如下:

XADD streamName id field value [field value ...]

消息队列名称后面的 「*」 ,表示让 Redis 为插入的消息自动生成唯一 ID,当然也可以自己定义。

消息 ID 由两部分组成:

  • 当前毫秒内的时间戳;

  • 顺序编号。从 0 为起始值,用于区分同一时间内产生的多个命令。

❝ 通过将元素 ID 与时间进行关联,并强制要求新元素的 ID 必须大于旧元素的 ID, Redis 从逻辑上将流变成了一种只执行追加操作(append only)的数据结构。 这种特性对于使用流实现消息队列和事件系统的用户来说是非常重要的: 用户可以确信,新的消息和事件只会出现在已有消息和事件之后,就像现实世界里新事件总是发生在已有事件之后一样,一切都是有序进行的。
基于 Spring Boot + MyBatis Plus + Vue & Element 实现的后台管理系统 + 用户小程序,支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能。 项目地址:https://github.com/YunaiV/ruoyi-vue-pro

云凌老狗使用如下指令接收云山的命令:

XREAD COUNT 1 BLOCK 0 STREAMS 云岚宗 0-0
1) 1) "\xe4\xba\x91\xe5\xb2\x9a\xe5\xae\x97"
2) 1) 1) "1645936602161-0"
2) 1) "task"
2) "kill"
3) "name"
4) "萧炎" # 萧炎
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]

该指令可以同时对多个流进行读取,每个心法对应含义如下:

  • COUNT:表示每个流中最多读取的元素个数;

  • BLOCK:阻塞读取,当消息队列没有消息的时候,则阻塞等待, 0 表示无限等待,单位是毫秒。

  • ID:消息 ID,在读取消息的时候可以指定 ID,并从这个 ID 的下一条消息开始读取,0-0 则表示从第一个元素开始读取

如果想使用 XREAD 进行顺序消费,每次读取后要记住返回的消息 ID,下次调用 XREAD 就将上一次返回的消息 ID 作为参数传递到下一次调用就可以继续消费后续的消息了。

❝ 云韵宗主,我今天刚到云岚宗,历史的消息就不接了,只想接收我使用 XREAD 阻塞等待的那一刻开始通过 XADD 发布的消息要咋整?

运行「」心法即可,心法的最后「」符号表示读取最新的阻塞消息,读取不到则一直死等。

等待过程中,其他长老向队列追加消息,则会立即读取到。

XREAD COUNT 1 BLOCK 0 STREAMS 云岚宗 $

❝ 这么容易就实现消息队列了么?说好的 ACK 机制呢?

这里只是开胃菜,通过 XREAD 读取的数据其实并没有被删除,当重新执行XREAD COUNT 2 BLOCK 0 STREAMS 云岚宗 0-0指令的时候又会重新读取到。

所以我们还需要 ACK 机制,

接下来,我们来一个真正的消息队列。

基于微服务的思想,构建在 B2C 电商场景下的项目实战。核心技术栈,是 Spring Boot + Dubbo 。未来,会重构成 Spring Cloud Alibaba 。 项目地址:https://github.com/YunaiV/onemall

Redis Stream 的 ConsumerGroup(消费者组)允许用户将一个流从逻辑上划分为多个不同的流,并让 ConsumerGroup 的消费者去处理。

它是一个强大的支持多播的可持久化的消息队列。Redis Stream 借鉴了 Kafka 的设计。

Stream 的高可用是建立主从复制基础上的,它和其它数据结构的复制机制没有区别,也就是说在 Sentinel 和 Cluster 集群环境下 Stream 是可以支持高可用的。

Redis-Stream

  • Redis Stream 的结构如上图所示。有一个消息链表,每个消息都有一个唯一的 ID 和对应的内容;

  • 消息持久化;

  • 每个消费组的状态是独立的,不不影响,同一份的 Stream 消息会被所有的消费组消费;

  • 一个消费组可以由多个消费者组成,消费者之间是竞争关系,任意一个消费者读取了消息都会使 last_deliverd_id 往前移动;

  • 每个消费者有一个 pending_ids 变量,用于记录当前消费者读取了但是还没 ack 的消息。它用来保证消息至少被客户端消费了一次。

消费组实现的消息队列主要涉及以下三个指令:

  • XGROUP用于创建、销毁和管理消费者组。

  • XREADGROUP通过消费组从流中读取数据。

  • XACK是允许消费者将待处理消息标记为已正确处理的命令。

Stream 通过XGROUP CREATE指令创建消费组 (Consumer Group),需要传递起始消息 ID 参数用来初始化last_delivered_id变量。

我们使用 XADD 往 bossStream 队列插入一些消息:

XADD bossStream * name zhangsan age 26
XADD bossStream * name lisi age 2
XADD bossStream * name bigold age 40

如下指令,为消息队列名为 bossStream 创建「青龙门」和「六扇门」两个消费组。

# 语法如下
# XGROUP CREATE stream group start_id
XGROUP CREATE bossStream 青龙门 0-0 MKSTREAM
XGROUP CREATE bossStream 六扇门 0-0 MKSTREAM

  • stream:指定队列的名字;

  • group:指定消费组名字;

  • start_id:指定消费组在 Stream 中的起始 ID,它决定了消费者组从哪个 ID 之后开始读取消息,0-0从第一条开始读取,$表示从最后一条向后开始读取,只接收新消息。

  • MKSTREAM:默认情况下,XGROUP CREATE命令在目标流不存在时返回错误。可以使用可选MKSTREAM子命令作为 之后的最后一个参数来自动创建流。

让「青龙门」消费组的consumer1bossStream阻塞读取一条消息:

XREADGROUP GROUP 青龙门 consumer1 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
2) 1) 1) "1645957821396-0"
2) 1) "name"
2) "zhangsan"
3) "age"
4) "26"

语法如下:

XREADGROUP GROUP groupName consumerName [COUNT n] [BLOCK ms] STREAMS streamName [stream ...] id [id ...]

[] 内的表示可选参数,该命令与XREAD大同小异,区别在于新增GROUP groupName consumerName选项。

该选项的两个参数分别用于指定被读取的消费者组以及负责处理消息的消费者。

其中:

  • >:命令的最后参数>,表示从尚未被消费的消息开始读取;

  • BLOCK:阻塞读取;

敲黑板了

如果消息队列中的消息被消费组的一个消费者消费了,这条消息就不会再被这个消费组的其他消费者读取到。

比如consumer2执行读取操作:

XREADGROUP GROUP 青龙门 consumer2 COUNT 1 BLOCK 0 STREAMS bossStream >
1) 1) "bossStream"
2) 1) 1) "1645957838700-0"
2) 1) "name"
2) "lisi"
3) "age"
4) "2"

consumer2不能再读取到zhangsan了,而是读取下一条lisi因为这条消息已经被consumer1读取了。

使用消费者的另一个目的可以让组内的多个消费者分担读取消息,也就是每个消费者读取部分消息,从而实现均衡负载。

比如一个消费组有三个消费者 C1、C2、C3 和一个包含消息 1、2、3、4、5、6、7 的流:

为了保证消费者在消费的时候发生故障或者宕机重启后依然可以读取消息,Stream 内部有一个队列(pending List)保存每个消费者读取但是还没有执行 ACK 的消息

如果消费者使用了XREADGROUP GROUP groupName consumerName读取消息,但是没有给 Stream 发送XACK命令,消息依然保留。

比如查看bossStream中的 消费组「青龙门」中各个消费者已读取未确认的消息信息:

XPENDING bossStream 青龙门
1) (integer) 2
2) "1645957821396-0"
3) "1645957838700-0"
4) 1) 1) "consumer1"
2) "1"
2) 1) "consumer2"
2) "1"

  1. 1)未确认消息条数;

  2. 2) ~ 3)青龙门中所有消费者读取的消息最小和最大 ID;

查看consumer1读取了哪些数据,使用以下命令:

XPENDING bossStream 青龙门 - + 10 consumer1
1) 1) "1645957821396-0"
2) "consumer1"
3) (integer) 3758384
4) (integer) 1

所以当接收到消息并且消费成功以后,我们需要手动 ACK 通知 Streams,这条消息就会被删除了。命令如下:

XACK bossStream 青龙门 1645957821396-0 1645957838700-0
(integer) 2

语法如下:

XACK key group-key ID [ID ...]

消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 ack 确认消息已经被消费完成,整个流程的执行如下图所示:

Stream 整体流程

使用 maven 添加依赖


org.redisson
redisson-spring-boot-starter
3.16.7

添加 Redis 配置,码哥的 Redis 没有配置密码,大家根据实际情况配置即可。

spring:
application:
name: redission
redis:
host: 127.0.0.1
port: 6379
ssl: false
@Slf4j
@Service
public class QueueService {

@Autowired
private RedissonClient redissonClient;

/**
* 发送消息到队列
*
* @param message
*/
public void sendMessage(String message) {
RStream stream = redissonClient.getStream("sensor#4921");
stream.add("speed", "19");
stream.add("velocity", "39%");
stream.add("temperature", "10C");
}

/**
* 消费者消费消息
*
* @param message
*/
public void consumerMessage(String message) {
RStream stream = redissonClient.getStream("sensor#4921");

stream.createGroup("sensors_data", StreamMessageId.ALL);

Map> messages = stream.readGroup("sensors_data", "consumer_1");
for (Map.Entry> entry : messages.entrySet()) {
Map msg = entry.getValue();
System.out.println(msg);

stream.ack("sensors_data", entry.getKey());
}

}

文章有帮助的话,在看,转发吧。

谢谢支持哟 (*^__^*)

特别声明:以上内容(如有图片或视频亦包括在内)为自媒体平台“网易号”用户上传并发布,本平台仅提供信息存储服务。

Notice: The content above (including the pictures and videos if any) is uploaded and posted by a user of NetEase Hao, which is a social media platform and only provides information storage services.

相关推荐
热点推荐
下半年首场寒潮下周抵达广西!最低气温将降至...

下半年首场寒潮下周抵达广西!最低气温将降至...

930老友记
2024-11-23 21:51:23
电视剧全网热度榜,《白夜破晓》无缘前三,第一热度高达78.90

电视剧全网热度榜,《白夜破晓》无缘前三,第一热度高达78.90

大头博士记
2024-11-22 12:36:15
造不出汽车的巴西,凭什么打败波音空客,成为世界第三航空强国?

造不出汽车的巴西,凭什么打败波音空客,成为世界第三航空强国?

小受谈历史
2024-11-22 09:58:14
公务员首次没发绩效奖金,在坐等;教师首次发放绩效奖金,但退半

公务员首次没发绩效奖金,在坐等;教师首次发放绩效奖金,但退半

郭爱华追问教育
2024-11-23 08:16:59
日本伦理电影大观:挑战传统观念的震撼之作

日本伦理电影大观:挑战传统观念的震撼之作

历史看阿敞
2024-11-14 09:14:12
瑞士一工人跌入720℃铝液熔炉,忍痛将自己拉出后,竟奇迹生还

瑞士一工人跌入720℃铝液熔炉,忍痛将自己拉出后,竟奇迹生还

寒士之言本尊
2024-11-15 21:57:34
WTT福冈总决赛男双:勒布伦兄弟3-2击败户上隼辅/篠塚大登夺冠

WTT福冈总决赛男双:勒布伦兄弟3-2击败户上隼辅/篠塚大登夺冠

直播吧
2024-11-23 21:28:39
尹恩惠晒自拍照:在中国街头游玩好自在,与糖葫芦合照很惬意!

尹恩惠晒自拍照:在中国街头游玩好自在,与糖葫芦合照很惬意!

易同学爱谈娱乐
2024-11-21 08:27:07
潘粤明和小女友上节目,50岁潘粤明对小女友一脸宠爱,感情甜蜜

潘粤明和小女友上节目,50岁潘粤明对小女友一脸宠爱,感情甜蜜

阿矗论古今
2024-11-22 09:08:53
一些女干部都如此不知廉耻吗?

一些女干部都如此不知廉耻吗?

梦马笔谈
2024-11-22 19:12:18
令人痛恨的俾路支解放军,为将其消灭,巴铁宣布发动全面军事行动

令人痛恨的俾路支解放军,为将其消灭,巴铁宣布发动全面军事行动

小遽历史
2024-11-22 17:20:25
正式官宣!江苏男篮超级外援突发离队,易立签下争议外援能否救火

正式官宣!江苏男篮超级外援突发离队,易立签下争议外援能否救火

老叶评球
2024-11-23 10:12:51
这张照片是不是江青拍的,有什么关系呢?

这张照片是不是江青拍的,有什么关系呢?

小刀99
2024-11-23 18:30:40
男篮再调12人名单!郭士强启用徐杰,辽篮2将被弃用,全力赢蒙古

男篮再调12人名单!郭士强启用徐杰,辽篮2将被弃用,全力赢蒙古

体坛大事记
2024-11-23 22:06:34
山西女教师出轨学生后续:完整对话公开,形象干练,曾上电视新闻

山西女教师出轨学生后续:完整对话公开,形象干练,曾上电视新闻

育学笔谈
2024-11-23 15:54:55
伊朗谴责在俄罗斯“暴力”逮捕伊朗学生

伊朗谴责在俄罗斯“暴力”逮捕伊朗学生

桂系007
2024-11-23 22:07:44
中美韩手机三季度营收断崖对比:苹果6761亿,三星4072亿,华为呢

中美韩手机三季度营收断崖对比:苹果6761亿,三星4072亿,华为呢

通文知史
2024-11-22 20:25:03
富人聚餐就是牛,筷子都要每个人两双!

富人聚餐就是牛,筷子都要每个人两双!

人情皆文史
2024-10-04 01:45:53
回顾:河北25岁女子虎园拍照,遭老虎一路尾随,半小时后被咬致死

回顾:河北25岁女子虎园拍照,遭老虎一路尾随,半小时后被咬致死

茶小姐说历史
2024-11-20 15:05:03
人民币汇率突发贬值!11月23日,今日面对的四大消息冲击市场!

人民币汇率突发贬值!11月23日,今日面对的四大消息冲击市场!

风口招财猪
2024-11-23 01:38:35
2024-11-24 01:00:49
程序员乔戈里
程序员乔戈里
本人百度java工程师
1633文章数 8899关注度
往期回顾 全部

科技要闻

华为徐直军首谈鸿蒙生态是什么

头条要闻

拜登政府常务副国务卿放话:我们在努力"拆散"中俄

头条要闻

拜登政府常务副国务卿放话:我们在努力"拆散"中俄

体育要闻

那个"最惨背景帝" 41岁还是五大联赛主力

娱乐要闻

德云社人事大变动!烧饼担任副总

财经要闻

钟睒睒的“愤怒”,谁能消解?

汽车要闻

对话张纯伟:80万!捷途立了一个新Flag

态度原创

本地
健康
旅游
房产
数码

本地新闻

云游中国 | 拒绝特种兵!北方也有“真江南”

花18万治疗阿尔茨海默病,值不值?

旅游要闻

吉林长春:机器狗上岗 服务雪场游客

房产要闻

丁村迎来大动作!首宗、百亩城更宅地挂出!楼面价2367元/㎡!

数码要闻

华擎推出 Mars RPL 系列迷你主机,板载12/13代英特尔酷睿处理器

无障碍浏览 进入关怀版