这是一个或许对你有用的社群 一对一交流/面试小册/简历优化/求职解惑欢迎加入「芋道快速开发平台」知识星球。下面是星球提供的部分资料《项目实战视频》从书中学往事上“练”《互联网高频面试题》面朝简历学习春暖花开《架构 x 系统设计》摧枯拉朽掌控面试高频场景题《精进 Java 学习指南》系统学习互联网主流技术栈《必读 Java 源码专栏》知其然知其所以然这是一个或许对你有用的开源项目国产Star破10w的开源项目前端包括管理后台、微信小程序后端支持单体、微服务架构RBAC权限、数据权限、SaaS多租户、商城、支付、工作流、大屏报表、ERP、CRM、AI大模型、IoT物联网等功能多模块https://gitee.com/zhijiantianya/ruoyi-vue-pro微服务https://gitee.com/zhijiantianya/yudao-cloud视频教程https://doc.iocoder.cn【国内首批】支持 JDK17/21SpringBoot3、JDK8/11Spring Boot2双版本来源苏三说技术前言1 数据一致性问题的原因2 消息不丢的方案2.1 事务消息的两阶段提交2.2 持久化配置2.3 副本配置3 应对重复消费的方案3.1 唯一ID3.2 幂等设计3.3 死信队列4 系统架构设计4.1 生产者端4.2 消费者端4.3 终极方案5 血泪经验十条总结前言上个月我们有个电商系统出了个灵异事件用户支付成功了但订单状态死活不改成“已发货”。折腾了半天才定位到问题订单服务的MQ消息像人间蒸发一样消失了。这个Bug让我明白MQ消息队列的数据一致性设计绝对能排进分布式系统三大噩梦之一今天这篇文章跟大家一起聊聊MQ如何保证数据一致性希望对你会有所帮助。基于 Spring Boot MyBatis Plus Vue Element 实现的后台管理系统 用户小程序支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能项目地址https://github.com/YunaiV/ruoyi-vue-pro视频教程https://doc.iocoder.cn/video/1 数据一致性问题的原因这些年在Kafka、RabbitMQ、RocketMQ踩过的坑总结成四类致命原因生产者悲剧消息成功进Broker却没写入磁盘就断电。消费者悲剧消息消费成功但业务执行失败。轮盘赌局网络抖动导致消息重复投递。数据孤岛数据库和消息状态割裂下完单没发券这些情况都会导致MQ产生数据不一致的问题。那么如何解决这些问题呢基于 Spring Cloud Alibaba Gateway Nacos RocketMQ Vue Element 实现的后台管理系统 用户小程序支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能项目地址https://github.com/YunaiV/yudao-cloud视频教程https://doc.iocoder.cn/video/2 消息不丢的方案我们首先需要解决消息丢失的问题。2.1 事务消息的两阶段提交以RocketMQ的事务消息为例工作原理就像双11的预售定金伪代码如下// 发送事务消息核心代码 TransactionMQProducer producer new TransactionMQProducer(group); producer.setTransactionListener(new TransactionListener() { // 执行本地事务比如扣库存 public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { return doBiz() ? LocalTransactionState.COMMIT : LocalTransactionState.ROLLBACK; } // Broker回调检查本地事务状态 public LocalTransactionState checkLocalTransaction(MessageExt msg) { return checkDB(msg.getTransactionId()) ? COMMIT : ROLLBACK; } });真实场景中别忘了在checkLocalTransaction里做好妥协查询查流水表或分布式事务日志。去年在物流系统救火就遇到过事务超时的坑——本地事务成功了但因网络问题没收到Commit导致Broker不断回查。2.2 持久化配置RabbitMQ的坑都在配置表里配置项例子作用队列持久化durabletrue队列元数据不丢消息持久化deliveryMode2消息存入磁盘Lazy Queuex-queue-modelazy消息直接写盘不读取进内存Confirm机制publisher-confirm-type生产者确认消息投递成功RabbitMQ本地存储备份交换机双重保护代码如下channel.queueDeclare(order_queue, true, false, false, new HashMapString, Object(){{ put(x-dead-letter-exchange, dlx_exchange); // 死信交换机 }});去年双十一订单系统就靠这个组合拳硬刚流量峰值主队列消息积压触发阈值时自动转移消息到备份队列给应急服务处理。2.3 副本配置消息队列保命绝招Kafkaacksall 副本数≥3RocketMQ同步刷盘 主从同步策略PulsarBookKeeper多副本存储上周帮一个金融系统迁移到Kafka为了数据安全启用了最高配置。server.properties配置如下acksall min.insync.replicas2 unclean.leader.election.enablefalse结果发现吞吐量只剩原来的三分之一但客户说“钱比速度重要”——这一行哪有银弹全是取舍。不同的业务场景情况不一样。3 应对重复消费的方案接下来需要解决消息的重复消费问题。3.1 唯一ID订单系统的架构课代表代码// 雪花算法生成全局唯一ID Snowflake snowflake new Snowflake(datacenterId, machineId); String bizId ORDER_ snowflake.nextId(); // 查重逻辑Redis原子操作 String key msg: bizId; if(redis.setnx(key, 1)) { redis.expire(key, 72 * 3600); processMsg(); }先使用雪花算法生成全局唯一ID然后使用Redis的setnx命令加分布式锁来保证请求的唯一性。某次促销活动因Redis集群抖动导致重复扣款。后来改用本地布隆过滤器分布式Redis 双校验总算解决这个世纪难题。3.2 幂等设计针对不同业务场景的三种对策场景代码示例关键点强一致性SELECT FOR UPDATE先查后更新数据库行锁最终一致性版本号控制类似CAS乐观锁重试3次补偿型事务设计反向操作如退款、库存回滚操作日志必须落库去年重构用户积分系统时就靠着这个三板斧把错误率从0.1%降到了0.001%积分变更幂等示例如下public void addPoints(String userId, String orderId, Long points) { if (pointLogDao.exists(orderId)) return; User user userDao.selectForUpdate(userId); // 悲观锁 user.setPoints(user.getPoints() points); userDao.update(user); pointLogDao.insert(new PointLog(orderId)); // 幂等日志 }这里使用了数据库行锁实现的幂等性。3.3 死信队列RabbitMQ的终极保命配置如下// 消费者设置手动ACK channel.basicConsume(queue, false, deliverCallback, cancelCallback); // 达到重试上限后进入死信队列 public void process(Message msg) { try { doBiz(); channel.basicAck(deliveryTag); } catch(Exception e) { if(retryCount 3) { channel.basicNack(deliveryTag, false, true); } else { channel.basicNack(deliveryTag, false, false); // 进入DLX } } }消费者端手动ACK消息。在消费者端消费消息时如果消费失败次数达到重试上限后进入死信队列。这个方案救了社交系统的推送服务——通过DLX收集全部异常消息凌晨用补偿Job重跑。4 系统架构设计接下来从系统架构设计的角度聊聊MQ要如何保证数据一致性4.1 生产者端对于实效性要求不太高的业务场景可以使用本地事务表定时任务扫描的补偿方案。流程图如下4.2 消费者端消费者端为了防止消息风暴要设置合理的并发消费线程数。流程图如下4.3 终极方案对于实时性要求比较高的业务场景可以使用 事务消息本地事件表 的黄金组合.流程图如下5 血泪经验十条消息必加唯一业务ID别用MQ自带的ID消费逻辑一定要幂等重复消费是必然事件数据库事务和消息发送必须二选一或者用事务消息消费者线程数不要超过分区数*2Kafka的教训死信队列必须加监控报警别等客服找你测试环境一定要模拟网络抖动chaos engineering消息体要兼容版本号血的教训警告不要用消息队列做业务主流程它只配当辅助消费者offset定时存库防止重平衡丢消息业务指标和MQ监控要联动比如订单量和消息量的波动要同步总结MQ消息队列像金融系统的SWIFT结算网络看似简单实则处处杀机。真正的高手不仅要会调参更要设计出能兼容可靠性与性能的架构。记住分布式系统的数据一致性不是银弹而是通过层层防御达成的动态平衡。就像当年我在做资金结算系统时老板说的那句震耳发聩的话“宁可慢十秒不可错一分”。欢迎加入我的知识星球全面提升技术能力。 加入方式“长按”或“扫描”下方二维码噢星球的内容包括项目实战、面试招聘、源码解析、学习路线。文章有帮助的话在看转发吧。 谢谢支持哟 (*^__^*
MQ的数据一致性,如何保证?
这是一个或许对你有用的社群 一对一交流/面试小册/简历优化/求职解惑欢迎加入「芋道快速开发平台」知识星球。下面是星球提供的部分资料《项目实战视频》从书中学往事上“练”《互联网高频面试题》面朝简历学习春暖花开《架构 x 系统设计》摧枯拉朽掌控面试高频场景题《精进 Java 学习指南》系统学习互联网主流技术栈《必读 Java 源码专栏》知其然知其所以然这是一个或许对你有用的开源项目国产Star破10w的开源项目前端包括管理后台、微信小程序后端支持单体、微服务架构RBAC权限、数据权限、SaaS多租户、商城、支付、工作流、大屏报表、ERP、CRM、AI大模型、IoT物联网等功能多模块https://gitee.com/zhijiantianya/ruoyi-vue-pro微服务https://gitee.com/zhijiantianya/yudao-cloud视频教程https://doc.iocoder.cn【国内首批】支持 JDK17/21SpringBoot3、JDK8/11Spring Boot2双版本来源苏三说技术前言1 数据一致性问题的原因2 消息不丢的方案2.1 事务消息的两阶段提交2.2 持久化配置2.3 副本配置3 应对重复消费的方案3.1 唯一ID3.2 幂等设计3.3 死信队列4 系统架构设计4.1 生产者端4.2 消费者端4.3 终极方案5 血泪经验十条总结前言上个月我们有个电商系统出了个灵异事件用户支付成功了但订单状态死活不改成“已发货”。折腾了半天才定位到问题订单服务的MQ消息像人间蒸发一样消失了。这个Bug让我明白MQ消息队列的数据一致性设计绝对能排进分布式系统三大噩梦之一今天这篇文章跟大家一起聊聊MQ如何保证数据一致性希望对你会有所帮助。基于 Spring Boot MyBatis Plus Vue Element 实现的后台管理系统 用户小程序支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能项目地址https://github.com/YunaiV/ruoyi-vue-pro视频教程https://doc.iocoder.cn/video/1 数据一致性问题的原因这些年在Kafka、RabbitMQ、RocketMQ踩过的坑总结成四类致命原因生产者悲剧消息成功进Broker却没写入磁盘就断电。消费者悲剧消息消费成功但业务执行失败。轮盘赌局网络抖动导致消息重复投递。数据孤岛数据库和消息状态割裂下完单没发券这些情况都会导致MQ产生数据不一致的问题。那么如何解决这些问题呢基于 Spring Cloud Alibaba Gateway Nacos RocketMQ Vue Element 实现的后台管理系统 用户小程序支持 RBAC 动态权限、多租户、数据权限、工作流、三方登录、支付、短信、商城等功能项目地址https://github.com/YunaiV/yudao-cloud视频教程https://doc.iocoder.cn/video/2 消息不丢的方案我们首先需要解决消息丢失的问题。2.1 事务消息的两阶段提交以RocketMQ的事务消息为例工作原理就像双11的预售定金伪代码如下// 发送事务消息核心代码 TransactionMQProducer producer new TransactionMQProducer(group); producer.setTransactionListener(new TransactionListener() { // 执行本地事务比如扣库存 public LocalTransactionState executeLocalTransaction(Message msg, Object arg) { return doBiz() ? LocalTransactionState.COMMIT : LocalTransactionState.ROLLBACK; } // Broker回调检查本地事务状态 public LocalTransactionState checkLocalTransaction(MessageExt msg) { return checkDB(msg.getTransactionId()) ? COMMIT : ROLLBACK; } });真实场景中别忘了在checkLocalTransaction里做好妥协查询查流水表或分布式事务日志。去年在物流系统救火就遇到过事务超时的坑——本地事务成功了但因网络问题没收到Commit导致Broker不断回查。2.2 持久化配置RabbitMQ的坑都在配置表里配置项例子作用队列持久化durabletrue队列元数据不丢消息持久化deliveryMode2消息存入磁盘Lazy Queuex-queue-modelazy消息直接写盘不读取进内存Confirm机制publisher-confirm-type生产者确认消息投递成功RabbitMQ本地存储备份交换机双重保护代码如下channel.queueDeclare(order_queue, true, false, false, new HashMapString, Object(){{ put(x-dead-letter-exchange, dlx_exchange); // 死信交换机 }});去年双十一订单系统就靠这个组合拳硬刚流量峰值主队列消息积压触发阈值时自动转移消息到备份队列给应急服务处理。2.3 副本配置消息队列保命绝招Kafkaacksall 副本数≥3RocketMQ同步刷盘 主从同步策略PulsarBookKeeper多副本存储上周帮一个金融系统迁移到Kafka为了数据安全启用了最高配置。server.properties配置如下acksall min.insync.replicas2 unclean.leader.election.enablefalse结果发现吞吐量只剩原来的三分之一但客户说“钱比速度重要”——这一行哪有银弹全是取舍。不同的业务场景情况不一样。3 应对重复消费的方案接下来需要解决消息的重复消费问题。3.1 唯一ID订单系统的架构课代表代码// 雪花算法生成全局唯一ID Snowflake snowflake new Snowflake(datacenterId, machineId); String bizId ORDER_ snowflake.nextId(); // 查重逻辑Redis原子操作 String key msg: bizId; if(redis.setnx(key, 1)) { redis.expire(key, 72 * 3600); processMsg(); }先使用雪花算法生成全局唯一ID然后使用Redis的setnx命令加分布式锁来保证请求的唯一性。某次促销活动因Redis集群抖动导致重复扣款。后来改用本地布隆过滤器分布式Redis 双校验总算解决这个世纪难题。3.2 幂等设计针对不同业务场景的三种对策场景代码示例关键点强一致性SELECT FOR UPDATE先查后更新数据库行锁最终一致性版本号控制类似CAS乐观锁重试3次补偿型事务设计反向操作如退款、库存回滚操作日志必须落库去年重构用户积分系统时就靠着这个三板斧把错误率从0.1%降到了0.001%积分变更幂等示例如下public void addPoints(String userId, String orderId, Long points) { if (pointLogDao.exists(orderId)) return; User user userDao.selectForUpdate(userId); // 悲观锁 user.setPoints(user.getPoints() points); userDao.update(user); pointLogDao.insert(new PointLog(orderId)); // 幂等日志 }这里使用了数据库行锁实现的幂等性。3.3 死信队列RabbitMQ的终极保命配置如下// 消费者设置手动ACK channel.basicConsume(queue, false, deliverCallback, cancelCallback); // 达到重试上限后进入死信队列 public void process(Message msg) { try { doBiz(); channel.basicAck(deliveryTag); } catch(Exception e) { if(retryCount 3) { channel.basicNack(deliveryTag, false, true); } else { channel.basicNack(deliveryTag, false, false); // 进入DLX } } }消费者端手动ACK消息。在消费者端消费消息时如果消费失败次数达到重试上限后进入死信队列。这个方案救了社交系统的推送服务——通过DLX收集全部异常消息凌晨用补偿Job重跑。4 系统架构设计接下来从系统架构设计的角度聊聊MQ要如何保证数据一致性4.1 生产者端对于实效性要求不太高的业务场景可以使用本地事务表定时任务扫描的补偿方案。流程图如下4.2 消费者端消费者端为了防止消息风暴要设置合理的并发消费线程数。流程图如下4.3 终极方案对于实时性要求比较高的业务场景可以使用 事务消息本地事件表 的黄金组合.流程图如下5 血泪经验十条消息必加唯一业务ID别用MQ自带的ID消费逻辑一定要幂等重复消费是必然事件数据库事务和消息发送必须二选一或者用事务消息消费者线程数不要超过分区数*2Kafka的教训死信队列必须加监控报警别等客服找你测试环境一定要模拟网络抖动chaos engineering消息体要兼容版本号血的教训警告不要用消息队列做业务主流程它只配当辅助消费者offset定时存库防止重平衡丢消息业务指标和MQ监控要联动比如订单量和消息量的波动要同步总结MQ消息队列像金融系统的SWIFT结算网络看似简单实则处处杀机。真正的高手不仅要会调参更要设计出能兼容可靠性与性能的架构。记住分布式系统的数据一致性不是银弹而是通过层层防御达成的动态平衡。就像当年我在做资金结算系统时老板说的那句震耳发聩的话“宁可慢十秒不可错一分”。欢迎加入我的知识星球全面提升技术能力。 加入方式“长按”或“扫描”下方二维码噢星球的内容包括项目实战、面试招聘、源码解析、学习路线。文章有帮助的话在看转发吧。 谢谢支持哟 (*^__^*