Redis 从入门到精通:Redis Stream —— 可靠消息队列

Redis 从入门到精通:Redis Stream —— 可靠消息队列 IT策士 10余年一线大厂经验专注 IT 思维、架构、职场进阶。我会在各个平台持续发布最新文章助你少走弯路。前面我们学了 List 做队列、Pub/Sub 做广播但它们都有一个硬伤消息可靠性不足。List 弹出的消息就没了客户端崩溃则消息丢失Pub/Sub 干脆不持久化订阅者不在线时消息直接蒸发。对于订单处理、异步任务、日志收集这类不能丢消息的场景需要的是可靠消息队列。Redis 5.0 推出的Stream正是为此而生。它像 Kafka 一样支持消息持久化、消费者组、ACK 确认和消息回溯又保持了 Redis 的简洁与高性能。本文将带你从命令到 Python 实战用 Stream 构建一个真正生产可用的消息队列。1. Stream 是什么为什么要用Stream 是 Redis 追加日志型数据结构用于存储时间序列的消息。每条消息有一个全局唯一的 ID 和若干键值对。它的核心能力消息持久化消息写入 Stream 后不会因消费者离线而丢失。消费者组同组消费者竞争消费同一条消息实现负载均衡。ACK 确认消费者处理完后发送XACK消息从“待确认”变为“已确认”。消息回溯可以按 ID 重新消费历史消息不会像 List 一样弹出即销毁。阻塞读取XREAD可阻塞等待新消息避免空轮询。对比其他队列方案一句话总结要可靠上 Stream。2. 核心命令速览2.1 添加消息XADD127.0.0.1:6379XADD orders * action create user_id1001amount99.91680000000000-0orders是 Stream 的 key。*表示让 Redis 自动生成消息 ID格式毫秒时间戳-序号。后面跟着若干 field-value 对构成了消息体。返回自动生成的 ID。可以手动指定 ID但强烈建议用*自动生成保证单调递增。2.2 读取消息XREAD# 读取所有消息从头开始127.0.0.1:6379XREAD STREAMS orders0-01)1)orders2)1)1)1680000000000-02)1)action2)create3)user_id4)10015)amount6)99.9# 阻塞等待新消息类似 BRPOP127.0.0.1:6379XREAD BLOCK5000STREAMS orders $(nil)# 5秒内没有新消息返回空0-0表示从头读$表示只读最新类似 tail -f。BLOCK毫秒数0 表示永久阻塞。2.3 消费者组与 XREADGROUPStream 可以创建多个消费者组每组独立维护消费进度。同组内的消费者竞争消费各自处理完后 ACK。# 创建消费者组从头部开始消费127.0.0.1:6379XGROUP CREATE orders group10-0 OK# 消费者 A 读取 group1 未确认的消息 表示从未消费过的新消息127.0.0.1:6379XREADGROUP GROUP group1 consumerA COUNT1STREAMS orders1)1)orders2)1)1)1680000000000-02)1)action2)create...# 确认消息处理完毕127.0.0.1:6379XACK orders group11680000000000-0(integer)1只返回从未投递给任何消费者组内成员的新消息。XACK将消息标记为已处理从待确认列表移除。2.4 查看待处理消息XPENDING如果有消费者拿到消息后崩溃消息会一直处于待确认状态XPENDING可以查看这些消息。# 查看待处理消息概要127.0.0.1:6379XPENDING orders group11)(integer)0# 待确认消息数# 如果消费者 A 挂掉消息会显示在 pending 列表中127.0.0.1:6379XPENDING orders group1 - 10(列出具体的待处理消息及其空闲时间)2.5 消息转移XCLAIM当一个消费者长时间未 ACK可能已死可以由另一个消费者通过XCLAIM将消息“抢”过来处理。127.0.0.1:6379XCLAIM orders group1 consumerB600001680000000000-0把空闲超过 60000 毫秒的消息转交给consumerB。3. Python 实战订单处理系统我们用 Stream 构建一个订单处理流水线一个生产者发布订单多个消费者组成消费组并行处理订单处理失败的消息重试或转移。3.1 环境准备确保 Redis 版本 ≥ 5.0Docker 镜像redis:7.2满足。3.2 生产者发布订单importredisimporttimeimportjsonimportrandom rredis.Redis(hostlocalhost,port6379,decode_responsesTrue)STREAM_KEYordersGROUP_NAMEorder_processors# 创建消费者组如果不存在try: r.xgroup_create(STREAM_KEY, GROUP_NAME,id0-0,mkstreamTrue)print(f消费者组 {GROUP_NAME} 已创建)except redis.exceptions.ResponseError as e:ifBUSYGROUPinstr(e): print(f消费者组 {GROUP_NAME} 已存在)else: raise def publish_order(order_id, user_id, amount):发布订单消息 msg{order_id:order_id,user_id:user_id,amount:amount,timestamp:time.time()}msg_idr.xadd(STREAM_KEY, msg)print(f[生产者] 发布订单 {order_id}: ID {msg_id})returnmsg_id# 模拟发布 10 个订单foriinrange(1,11): publish_order(fORD-{1000i}, random.randint(1,100), round(random.uniform(10,500),2))time.sleep(0.5)输出示例消费者组 order_processors 已存在[生产者]发布订单 ORD-1001: ID1680000001234-0[生产者]发布订单 ORD-1002: ID1680000001735-0...3.3 消费者处理订单并 ACK每个消费者从组中读取新消息模拟处理如扣减库存成功后 ACK。def process_order(msg_id, msg_data):模拟订单处理成功返回 True失败返回 False order_idmsg_data.get(order_id,unknown)amountfloat(msg_data.get(amount,0))print(f[消费者] 处理订单 {order_id} 金额 {amount})# 模拟处理随机成功或失败80% 成功successrandom.random()0.8ifsuccess: print(f[消费者] 订单 {order_id} 处理成功)else: print(f[消费者] 订单 {order_id} 处理失败)returnsuccess def start_consumer(consumer_name):启动一个消费者持续读取并处理消息 print(f[消费者 {consumer_name}] 启动)whileTrue: try:# 读取新消息每次最多 1 条阻塞 2 秒resultr.xreadgroup(GROUP_NAME, consumer_name,{STREAM_KEY:},count1,block2000)ifnot result:# 没有消息尝试处理本消费者的 pending 消息pendingr.xpending(STREAM_KEY, GROUP_NAME)ifpending[pending]0:# 读取自己的 pending 消息pending_msgsr.xpending_range(STREAM_KEY, GROUP_NAME,min-,max,count1,consumernameconsumer_name)ifpending_msgs:forpinpending_msgs: msg_idp[message_id]# 重新获取消息内容msgsr.xrange(STREAM_KEY,minmsg_id,maxmsg_id)ifmsgs: msg_datamsgs[0][1]print(f[消费者 {consumer_name}] 重试 pending 消息 {msg_id})ifprocess_order(msg_id, msg_data): r.xack(STREAM_KEY, GROUP_NAME, msg_id)continuestream_name, messagesresult[0]formsg_id, msg_datainmessages: print(f[消费者 {consumer_name}] 收到消息 {msg_id})ifprocess_order(msg_id, msg_data): r.xack(STREAM_KEY, GROUP_NAME, msg_id)else:# 处理失败不 ACK消息留在 pending 中# 后续可以由 XCLAIM 转移或手动重试pass except Exception as e: print(f[消费者 {consumer_name}] 异常: {e})time.sleep(1)# 启动消费者if__name____main__:importsys consumer_namesys.argv[1]iflen(sys.argv)1elseconsumer-1start_consumer(consumer_name)启动多个消费者终端python consumer.py consumer-Apython consumer.py consumer-B生产者发布消息后消费者输出示例[消费者 consumer-A]收到消息1680000001234-0[消费者]处理订单 ORD-1001 金额99.90[消费者]订单 ORD-1001 处理成功[消费者 consumer-B]收到消息1680000001735-0[消费者]处理订单 ORD-1002 金额250.00[消费者]订单 ORD-1002 处理失败注意消息不会重复消费A 和 B 竞争。3.4 处理失败消息死信队列与重试对于长时间 pending 的消息消费者崩溃或处理失败可以定时扫描用XCLAIM转移给健康消费者或超过最大重试次数后移入死信 Stream。def recover_pending(stream_key, group_name,idle_ms60000,max_retries3):恢复空闲消息将超时的 pending 消息转移给活跃消费者# 获取所有 pending 消息pendingr.xpending(stream_key, group_name)ifpending[pending]0:return# 获取空闲超过 idle_ms 的消息claimedr.xpending_range(stream_key, group_name,min-,max,count10)forpinclaimed: msg_idp[message_id]# 检查重试次数可存储在消息字段或额外 Redis keymsgsr.xrange(stream_key,minmsg_id,maxmsg_id)ifnot msgs:continuemsg_datamsgs[0][1]retry_countint(msg_data.get(retry,0))ifretry_countmax_retries:# 移入死信队列r.xadd(f{stream_key}:dead, msg_data)r.xack(stream_key, group_name, msg_id)r.xdel(stream_key, msg_id)print(f消息 {msg_id} 超过重试次数移入死信队列)elifp[time_since_delivered]idle_ms:# XCLAIM 转移给恢复消费者r.xclaim(stream_key, group_name,recovery_consumer, idle_ms, msg_id)print(f消息 {msg_id} 被 recovery_consumer 接管)# 定时任务调用whileTrue: recover_pending(orders,order_processors,idle_ms30000)time.sleep(10)3.5 异步消费者redis.asyncio在异步框架中Stream 同样适用。importasyncioimportredis.asyncio as aioredis async def async_consumer(consumer_name): rawait aioredis.from_url(redis://localhost,decode_responsesTrue)try: await r.xgroup_create(orders,async_group,id0-0,mkstreamTrue)except: passwhileTrue: resultawait r.xreadgroup(async_group, consumer_name,{orders:},count1,block2000)ifresult:formsg_id, msg_datainresult[0][1]: print(f[异步 {consumer_name}] 处理 {msg_id})await r.xack(orders,async_group, msg_id)await asyncio.sleep(0.1)asyncio.run(async_consumer(worker-1))4. Stream 高级特性消息裁剪XTRIM限制 Stream 长度避免无限膨胀。XADD ... MAXLEN ~ 1000近似裁剪。消息范围查询XRANGE/XREVRANGE按 ID 范围查询历史消息。消费组删除XGROUP DESTROY orders group1。监控XINFO STREAM orders查看 Stream 概览长度、最后 ID 等。5. 常见误区与最佳实践别忘了 ACK未 ACK 的消息会堆积在 pending 列表占内存且影响消费进度。合理设置 Stream 长度历史消息会一直保存用MAXLEN控制容量。消费者组名全局唯一不要把不同业务的消费组名重名。XREADGROUP 的 用法是读取新消息0-0是读取历史具体ID是读取未确认的。死信队列生产必须设计重试上限和死信转移避免无限重试阻塞队列。6. 动手试试搭建三消费者组一个 Stream 创建两个消费者组groupA和groupB每个组各有两个消费者验证同一条消息会被两个组独立消费组内竞争消费。模拟消费者崩溃消费者读到消息后不 ACK然后杀掉进程用XPENDING和XCLAIM将消息转移给另一个消费者。死信队列实现一个最多重试 2 次的处理逻辑超过后移入dead:ordersStream并定期巡检死信 Stream。性能测试生产者批量 XADD 10 万条消息观察消费者组吞吐量及XINFO STREAM长度变化。预期效果多组消费互不影响崩溃消息自动转移死信队列正确隔离失败消息批量处理稳定。7. 总结Redis Stream 把“可靠消息队列”集成到 Redis 内核中具备持久化、消费者组、ACK、消息回溯等专业 MQ 的核心特性且保持了 Redis 的简单与高性能。相比独立的 Kafka/RabbitMQ它更适合中轻量级异步任务、事件驱动架构且复用现有 Redis 基础设施大幅降低运维成本。掌握 Stream你就拥有了在 Redis 生态中构建可靠消息管道的能力。下一篇我们将进入性能调优用慢日志、基准测试、大 Key 优化等手段把 Redis 的性能彻底榨干。想了解更多还可以去各个平台搜索「IT策士」一起升级 IT 思维