【RocketMQ合集-03】RocketMQ 常见问题梳理

【RocketMQ合集-03】RocketMQ 常见问题梳理 目录一、MQ 核心价值与选型对比1. 引入 MQ 的三个核心作用2. 主流 MQ 选型二、消息丢失问题面试必问1. 生产阶段丢失2. Broker 存储阶段丢失2.1 核心原理2.2 分层使用策略3. 消费阶段丢失三、消息重复消费与幂等性1. 重复消费的根本原因2. 三种主流幂等方案方案 1数据库唯一键约束方案 2Redis 去重方案 3状态机幂等四、消息顺序性问题1. 全局顺序 vs 局部顺序2. RocketMQ 局部顺序实现原理3. 代码4. 存在的问题五、消息积压问题1. 积压的危害与排查2. 紧急处理方案3. 存在的问题六、分布式事务消息1. 适用场景2. 能力边界澄清区分本地事务与分布式事务3. RocketMQ 事务消息原理半消息机制七、补充八、面试速记总结一、MQ 核心价值与选型对比1. 引入 MQ 的三个核心作用解耦上游系统只负责发消息下游系统按需订阅。比如订单系统不用直接调用库存、积分、物流接口新增下游业务时无需修改上游代码降低系统耦合度。异步将非核心流程从主链路剥离缩短接口响应时间。比如下单后发短信、发站内信不用等这些操作执行完再返回给用户。削峰填谷用 MQ 承接突发流量下游系统按自身能力消费避免直接打垮数据库或服务。典型场景是秒杀、大促活动。2. 主流 MQ 选型对比维度RocketMQKafkaRabbitMQ开发语言JavaScala/JavaErlang单机吞吐量网上看的10 万级10 万级 万级时效性毫秒级毫秒级微秒级核心优势事务消息、定时消息、消息重试、死信队列电商业务适配性强超高吞吐日志大数据场景生态完善功能极其丰富交换机模型灵活运维简单适用场景电商订单、支付、金融等业务型系统日志采集、大数据实时计算中小型项目、对功能复杂度要求高的业务二、消息丢失问题面试必问消息丢失只会发生在三个环节生产发送阶段、Broker 存储阶段、消费处理阶段对应三套解决方案。1. 生产阶段丢失原因网络抖动导致消息发出后 Broker 未收到或 Broker 已收到但响应包丢失生产者误以为发送失败。解决方案同步发送 失败自动重试且校验发送状态高并发场景用异步发送 回调处理失败核心支付、订单场景使用事务消息保证原子性代码示例RocketMQ 生产者可靠发送配置DefaultMQProducer producer new DefaultMQProducer(order_producer_group); producer.setNamesrvAddr(127.0.0.1:9876); // 发送失败内部重试次数生产环境建议3-5次 producer.setRetryTimesWhenSendFailed(3); // 存储失败时自动切换到其他Broker节点重试 producer.setRetryAnotherBrokerWhenNotStoreOK(true); producer.start(); // 构建消息 Message msg new Message( OrderTopic, ORDER_001, 订单创建:001.getBytes(StandardCharsets.UTF_8) ); try { SendResult result producer.send(msg); // 必须校验状态只有SEND_OK才代表Broker已成功接收 if (SendStatus.SEND_OK.equals(result.getSendStatus())) { System.out.println(发送成功消息ID result.getMsgId()); } else { // FLUSH_DISK_TIMEOUT、FLUSH_SLAVE_TIMEOUT 等异常状态 log.error(消息发送状态异常{}需人工补偿, result.getSendStatus()); } } catch (MQClientException e) { // 重试全部失败记录日志落库后续定时任务补偿 log.error(消息发送彻底失败订单号{}, ORDER_001, e); }踩坑之前我是默认send()不抛异常就是成功实际上 Broker 可能返回刷盘超时、从同步超时等状态此时消息可能已经丢失必须做状态校验。2. Broker 存储阶段丢失2.1 核心原理原因Broker 收到消息后还没刷到磁盘就宕机主从架构下主节点宕机从节点还没同步到消息。核心策略按业务等级分层配置刷盘与主从复制策略平衡可靠性与性能。2.2 分层使用策略业务等级典型场景刷盘策略主从复制策略性能损耗可靠性说明核心级支付、交易、资金流水SYNC_FLUSH同步刷盘SYNC_MASTER同步双写20%~30%极高单节点宕机、单机柜断电几乎零数据丢失业务级订单状态、库存变更、物流通知ASYNC_FLUSH异步刷盘SYNC_MASTER同步双写5%~10%高单节点宕机不丢数据整机房宕机有极大概率丢失日志级系统日志、埋点数据、监控指标ASYNC_FLUSH异步刷盘ASYNC_MASTER异步复制1%一般允许极少量数据丢失优先保障性能解决方案刷盘策略核心业务用同步刷盘SYNC_FLUSH消息落盘后才返回成功非核心日志用异步刷盘提升性能。主从复制核心业务用同步复制SYNC_MASTER主从双写成功再返回非核心用异步复制。Broker 配置文件broker.conf关键项# 核心业务配置 flushDiskTypeSYNC_FLUSH brokerRoleSYNC_MASTER # 日志业务配置 # flushDiskTypeASYNC_FLUSH # brokerRoleASYNC_MASTER3. 消费阶段丢失原因消费者收到消息后还没处理完业务就自动提交了消费确认随后服务宕机Broker 认为消息已消费不会再投递。解决方案关闭自动确认业务逻辑执行完成后再手动返回消费成功状态。消费者可靠消费DefaultMQPushConsumer consumer new DefaultMQPushConsumer(order_consumer_group); consumer.setNamesrvAddr(127.0.0.1:9876); consumer.subscribe(OrderTopic, *); // 注册并发消费监听器 consumer.registerMessageListener(new MessageListenerConcurrently() { Override public ConsumeConcurrentlyStatus consumeMessage( ListMessageExt msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { String body new String(msg.getBody(), StandardCharsets.UTF_8); // 执行业务逻辑扣库存、生成积分等 doOrderBusiness(body); } // 全部业务执行成功再返回消费成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (Exception e) { log.error(消费失败触发重试, e); // 返回稍后重试Broker会重新投递 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start();原理说明RocketMQ 默认重试 16 次间隔时间逐级拉长10s、30s、1min…2h多次失败后消息会进入死信队列。三、消息重复消费与幂等性1. 重复消费的根本原因网络不可靠 MQ 的 “至少投递一次At Least Once” 设计。常见场景生产端重试导致 Broker 收到多条相同消息消费成功但确认响应丢失Broker 重发消费者上下线触发重平衡已消费但未确认的消息被重新分配核心结论任何 MQ 都无法保证绝对不重复幂等性必须由业务层实现。2. 三种主流幂等方案方案 1数据库唯一键约束适合有唯一业务 ID 的场景订单号、支付流水号。在业务表或消费流水表建立唯一索引重复消息插入时直接报错自动忽略。Transactional public void processOrder(String orderId) { // 先查询做性能优化减少数据库冲突 if (orderMapper.existsByOrderId(orderId)) { return; } try { orderMapper.insert(new Order(orderId, ...)); } catch (DuplicateKeyException e) { // 唯一键冲突说明已消费直接返回成功 log.info(消息重复跳过{}, orderId); } }高并发下 “先查后插” 有线程安全问题数据库唯一索引是必须的兜底。缺陷依赖数据库高并发下有性能瓶颈唯一键冲突会抛出异常对接口友好度有影响不适用于无唯一业务 ID 的场景。兜底方案独立消费流水表记录所有消息处理状态定时对账扫描异常数据人工兜底。方案 2Redis 去重适合高并发、对性能要求高的场景。实现逻辑用SETNX命令业务 ID 作为 key消费成功则设置成功设置失败代表已消费。需设置过期时间配合事务保证一致性。缺陷Redis 宕机、主从切换或 key 过期时幂等控制失效属于弱一致极端场景下 SETNX 与过期时间非原子操作需 Lua 脚本保证可能出现重复无法彻底避免重复仅做前置拦截。兜底方案下游数据库唯一键作为最终防线Redis 仅用于降低冲突概率提升接口性能。方案 3状态机幂等实现逻辑适合有状态流转的业务订单待支付→已支付→已发货。每个状态只能流转一次重复消息过来时判断当前状态若已流转则直接跳过。缺陷仅适用于有明确状态流转的场景状态多了逻辑复杂度高并发场景下可能出现 ABA 问题状态错乱。兜底方案状态流转时增加版本号乐观锁更新时校验版本号防止并发下状态错乱。四、消息顺序性问题1. 全局顺序 vs 局部顺序全局顺序整个 Topic 所有消息严格按发送顺序消费需要单队列 单线程吞吐量极低几乎不用。局部顺序同一业务 ID 的消息有序不同业务 ID 之间无序99% 的业务场景都用这个。 例同一个订单的创建、支付、发货消息必须有序不同订单之间互不影响。2. RocketMQ 局部顺序实现原理生产端按业务 ID 哈希取模把同组消息固定发送到同一个 MessageQueue 中单个队列是 FIFO 有序的。消费端使用顺序消费监听器单个队列由单线程消费保证消费顺序与存储顺序一致。3. 代码顺序消息生产SendResult result producer.send(msg, new MessageQueueSelector() { Override public MessageQueue select(ListMessageQueue mqs, Message msg, Object arg) { String orderId (String) arg; // 同订单号永远路由到同一个队列 int index Math.abs(orderId.hashCode()) % mqs.size(); return mqs.get(index); } }, ORDER_001);顺序消息消费// 注意使用MessageListenerOrderly而非并发监听器 consumer.registerMessageListener(new MessageListenerOrderly() { Override public ConsumeOrderlyStatus consumeMessage( ListMessageExt msgs, ConsumeOrderlyContext context) { for (MessageExt msg : msgs) { // 单线程按顺序处理 processOrderStep(new String(msg.getBody())); } return ConsumeOrderlyStatus.SUCCESS; } });顺序消费的吞吐量远低于并发消费消费失败会阻塞当前队列直到消息处理成功不要滥用顺序消息。4. 存在的问题哈希倾斜业务 ID 分布不均例如大量测试订单 ID 哈希到同一个队列导致单队列压力过大整体吞吐量上不去。建议使用均匀的哈希算法避免热点队列。异常阻塞整个队列单条消息消费失败会阻塞当前队列的所有后续消息顺序消费的核心机制一条坏消息会导致整组业务停滞。对于不可恢复的异常参数非法不能无限重试需记录后人工跳过。队列扩容破坏顺序Topic 队列数扩容后哈希取模的分母变化同一业务 ID 会路由到新队列导致扩容前后同 ID 消息分散在不同队列破坏顺序性。扩容前需评估业务影响必要时通过业务 ID 前缀做兼容。多生产者发送乱序同一业务 ID 的消息从不同生产者实例发送网络延迟不同可能导致后发的消息先到 Broker。同一业务链路的顺序消息必须由同一生产者实例按序发送。五、消息积压问题1. 积压的危害与排查消息积压指生产速度远大于消费速度Broker 堆积大量未消费消息。会导致消费延迟飙升、Broker 磁盘占满触发消息过期删除。排查思路看监控Topic 的积压量Lag、生产 TPS、消费 TPS定位是生产突增还是消费变慢。消费侧检查消费者 CPU / 内存、是否有慢 SQL、是否大量异常重试拖慢速度。生产侧是否有大促、批量任务导致流量突增。2. 紧急处理方案扩容消费者先增加 Topic 的队列数再横向扩容消费者实例。注意一个队列最多对应一个消费线程消费者数量超过队列数无效。临时降级非核心消息日志、统计可临时丢弃或转存后续补数核心消息绝对不能丢。优化消费逻辑把消费逻辑中的慢 IO 操作改成批量、异步减少单条消息处理时间3. 存在的问题盲目扩容消费者消费者数量超过 Topic 队列数时多余实例完全空闲无法提升消费能力必须先扩容队列再扩容消费者。批量拉取过大单批次拉取消息量过多业务处理超时导致频繁重试反而拖慢整体消费速度。无差别重试占用资源所有异常都返回重试大量不可恢复的异常如参数非法反复重试占用消费线程导致正常消息积压。需区分可重试异常网络抖动、数据库超时和不可重试异常参数错误。消息过期丢失风险Broker 默认消息保留 3 天可配置积压超过保留时长会被自动删除。紧急处理前需先临时调大消息保留时间避免数据丢失。六、分布式事务消息1. 适用场景解决 “本地事务执行” 和 “发消息” 的原子性问题。例扣库存和发订单消息必须同时成功 / 同时失败不能出现库存扣了但消息没发出去的情况。2. 能力边界澄清区分本地事务与分布式事务RocketMQ 事务消息本质是基于本地事务状态的消息最终一致性方案属于柔性事务并非强一致分布式事务。能解决的问题保证「服务内本地数据库事务」与「MQ 消息发送」的原子性 —— 本地事务成功消息一定能被消费者看到本地事务失败消息一定不会投递。不能解决的问题无法保证跨服务、跨数据库的强一致事务例如同时扣 A 库库存和 B 库余额无法保证消费者端的业务一定成功消费者侧仍需自行保证幂等与重试。本质定位是 “消息发送侧” 的事务保障不是完整的分布式事务解决方案。完整的跨服务最终一致性需要生产者事务消息 消费者幂等重试共同配合。3. RocketMQ 事务消息原理半消息机制事务消息实现TransactionMQProducer producer new TransactionMQProducer(tx_order_group); producer.setNamesrvAddr(127.0.0.1:9876); producer.setExecutorService(Executors.newFixedThreadPool(4)); // 注册事务监听器 producer.setTransactionListener(new TransactionListener() { // 半消息成功后执行本地事务 Override public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { String orderId (String) arg; try { // 执行本地数据库事务扣减库存 stockService.deductStock(orderId); return LocalTransactionState.COMMIT_MESSAGE; } catch (Exception e) { log.error(本地事务失败, e); return LocalTransactionState.ROLLBACK_MESSAGE; } } // Broker回查接口网络异常导致确认丢失时调用 Override public LocalTransactionState checkLocalTransaction(MessageExt msg) { String orderId new String(msg.getBody()); boolean success stockService.checkDeductSuccess(orderId); return success ? LocalTransactionState.COMMIT_MESSAGE : LocalTransactionState.ROLLBACK_MESSAGE; } }); producer.start();七、补充死信队列DLQ消息重试达到最大次数仍失败会进入死信队列不再自动投递需要人工监控和处理。延时消息RocketMQ 原生支持 18 个级别的延时消息1s~2h适合订单超时取消、支付超时关闭等场景。底层通过定时调度实现到期后才将消息转为可消费。重平衡Rebalance消费者上下线时队列会重新分配给存活的消费者保证消费能力不下降过程中可能出现短暂的重复消费属于正常现象。八、面试速记总结消息丢失三板斧生产端重试 状态校验、Broker 同步刷盘 同步主从、消费端业务完成再返回成功。幂等性三方案数据库唯一键最稳妥、Redis 高性能、状态机适配业务流所有方案都要有数据库兜底。顺序消息核心局部有序够用全局有序不用生产哈希选队列消费单线程处理异常会阻塞整队列。事务消息边界只保证本地事务 发消息的原子性是柔性事务不是跨服务强一致分布式事务。消息积压处理先扩队列再扩消费者区分可重试 / 不可重试异常警惕消息过期删除。选型一句话业务系统选 RocketMQ大数据日志选 Kafka小项目轻量选 RabbitMQ。