第9篇 消息不丢:三端协同防丢失方案

第9篇 消息不丢:三端协同防丢失方案 第9篇消息不丢 —— 三端协同构建零丢失的消息通道系列Kafka × Spring Boot参数精讲与生产落地实战本篇关键词消息丢失 ·acksall·min.insync.replicas· 手动提交 · 发送失败补偿 本篇导读Kafka 会不会丢消息几乎是每个用 Kafka 的工程师都被问到过的问题。标准答案是看你怎么配置。默认配置下消息可能在三个环节丢失Producer 端、Broker 端、Consumer 端。本篇逐一拆解给出完整的防丢方案并在最后画出一张防丢失全景图。一、先搞清楚Kafka 的消息投递语义语义含义如何实现At Most Once最多一次可能丢消息绝不重复自动提交 acks0At Least Once至少一次不丢消息可能重复手动提交 acksallExactly Once恰好一次不丢不重事务消息高成本复杂生产最佳实践At Least Once 消费者幂等 ≈ 事实上的 Exactly Once二、Producer 端三种丢消息场景与解法场景一acks1默认Leader 宕机时间线 T1: Producer → Leader 接收并写入 → 返回 ACKProducer 认为成功 T2: Follower 还在同步中还没有这条消息 T3: Leader 突然宕机 T4: Follower 成为新 Leader但它没有 T1 的那条消息 结果消息永久丢失Producer 毫不知情 ❌ 解法 props.put(ProducerConfig.ACKS_CONFIG, all); // 等所有 ISR 副本确认任何副本宕机都有其他副本保存数据 ✓场景二重试耗尽彻底放弃网络抖动 → 发送失败 → 触发重试 重试次数耗尽 / delivery.timeout.ms 到期 → 彻底失败 但 whenComplete 中没有处理失败情况 → 消息悄无声息地丢失 ❌ 解法发送失败必须有补偿机制场景三缓冲区打满抛异常生产速度 Broker 处理速度 → RecordAccumulator 缓冲区满了 → max.block.ms 超时后抛出 BufferExhaustedException → 调用方未捕获 → 消息丢失 ❌ 解法 1. 调大 buffer.memory 2. 捕获异常并补偿Producer 端的完整防丢配置BeanpublicProducerFactoryString,StringreliableProducerFactory(){MapString,ObjectpropsnewHashMap();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);// ① 等所有 ISR 副本确认关键props.put(ProducerConfig.ACKS_CONFIG,all);// ② 幂等防止重试导致重复写入开启后自动设置 retries 为合理值props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true);// ③ 总超时 5 分钟内无限重试超时后彻底失败props.put(ProducerConfig.RETRIES_CONFIG,Integer.MAX_VALUE);props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG,300000);returnnewDefaultKafkaProducerFactory(props);}发送失败必须有补偿ServiceSlf4jpublicclassReliableProducer{AutowiredprivateKafkaTemplateString,StringkafkaTemplate;AutowiredprivateFailedMessageRepositoryfailedMessageRepo;publicvoidsend(Stringtopic,Stringkey,Stringvalue){kafkaTemplate.send(topic,key,value).whenComplete((result,ex)-{if(exnull){log.debug(发送成功: topic{}, key{}, offset{},topic,key,result.getRecordMetadata().offset());}else{// delivery.timeout.ms 内无限重试后最终失败log.error(【告警】消息发送最终失败topic{}, key{}, error{},topic,key,ex.getMessage());// 补偿1持久化到数据库供定时任务重发failedMessageRepo.save(FailedMessage.builder().topic(topic).key(key).value(value).failedAt(LocalDateTime.now()).errorMsg(ex.getMessage()).retryCount(0).build());// 补偿2告警通知钉钉/企业微信/短信alertService.sendCritical(Kafka消息发送失败\ntopictopic\nkeykey);}});}}定时补发失败消息ComponentSlf4jpublicclassFailedMessageRetryTask{AutowiredprivateFailedMessageRepositoryrepo;AutowiredprivateKafkaTemplateString,StringkafkaTemplate;Scheduled(fixedDelay300000)// 每5分钟执行一次publicvoidretry(){ListFailedMessagefailedListrepo.findByRetryCountLessThan(5).stream().filter(m-m.getNextRetryAt().isBefore(LocalDateTime.now())).collect(Collectors.toList());for(FailedMessagemsg:failedList){try{kafkaTemplate.send(msg.getTopic(),msg.getKey(),msg.getValue()).get(30,TimeUnit.SECONDS);// 同步等待确认成功repo.delete(msg);log.info(补发成功: key{},msg.getKey());}catch(Exceptione){msg.setRetryCount(msg.getRetryCount()1);msg.setNextRetryAt(LocalDateTime.now().plusMinutes((long)Math.pow(2,msg.getRetryCount())));// 指数退避repo.save(msg);log.warn(补发失败: key{}, retryCount{},msg.getKey(),msg.getRetryCount());}}}}三、Broker 端数据存进去了还是可能丢场景一单副本Broker 宕机Topic 只有 1 个副本replication.factor1 该 Broker 磁盘损坏 → 数据永久丢失 ❌ 解法生产环境副本数必须 3场景二min.insync.replicas配置不合理acksall但 min.insync.replicas1默认值 含义ISR 中至少有 1 个副本确认即可 当 ISR 只剩 Leader 一个副本时acksall acks1 ❌ 解法Broker 端配置 min.insync.replicas2 含义ISR 中至少 2 个副本确认才响应 Producer 若 ISR 不足 2 个 → 拒绝写入返回 NOT_ENOUGH_REPLICAS Producer 重试不会丢消息 ✓代价暂时不可写场景三unclean.leader.election.enabletrueISR 中所有副本都下线了但有 ISR 外的落后副本还活着 若 unclean.leader.election.enabletrue 允许这个落后副本成为 Leader 代价该副本没有同步的那部分消息 → 永久丢失 ❌ Kafka 默认值false安全 生产环境绝对不要改成 trueBroker 端防丢失配置# server.properties需要 Kafka 运维配合 # 副本数建议3副本 default.replication.factor3 # 至少2个ISR副本确认才响应 min.insync.replicas2 # 禁止不在ISR中的落后副本成为Leader宁可停服也不丢数据 unclean.leader.election.enablefalse # 可选强化刷盘减少 Page Cache 丢失的概率 # 注意过于频繁会严重影响性能谨慎评估 # log.flush.interval.messages10000 # log.flush.interval.ms1000副本数与min.insync.replicas搭配原则推荐搭配3副本容忍1个宕机 replication.factor 3 min.insync.replicas 2 acks all 效果任意1个副本宕机系统仍可读写 ✓ 2个副本同时宕机拒绝写入不丢数据 过于严格3副本任意宕机即停服 min.insync.replicas 3 效果任意1个副本宕机 → 写入失败可用性太低不推荐 过于宽松等同于acks1 min.insync.replicas 1 效果无额外保障acksall 形同虚设不推荐四、Consumer 端拉到了还是可能丢场景一自动提交已在第6篇详细讲解此处简要回顾 T0: poll() 拉取 msg1、msg2、msg3 T5: 自动提交触发Offset 提交到 msg4 T6: 处理 msg2 时服务崩溃 T7: 重启后从 msg4 开始 → msg2、msg3 丢失 ❌ 解法关闭自动提交手动提交场景二先提交后处理// ❌ 错误顺序先提交后处理KafkaListener(topicsorder-events)publicvoidwrong(ConsumerRecordString,Stringrecord,Acknowledgmentack){ack.acknowledge();// ← 先提交orderService.process(...);// ← 后处理若此处异常 → 消息已提交但未处理 → 丢失}// ✅ 正确顺序先处理后提交KafkaListener(topicsorder-events)publicvoidcorrect(ConsumerRecordString,Stringrecord,Acknowledgmentack){orderService.process(record.value());// ← 先处理ack.acknowledge();// ← 成功后提交}五、防丢失全景图┌─────────────────────────────────────────────────────────────────────────┐ │ Kafka 消息防丢失全景图 │ ├──────────────────┬──────────────────────┬───────────────────────────────┤ │ Producer 端 │ Broker 端 │ Consumer 端 │ ├──────────────────┼──────────────────────┼───────────────────────────────┤ │ acksall │ replication.factor3 │ enable.auto.commitfalse │ │ │ │ │ │ enable.idempotence│ min.insync.replicas2│ ack-modeMANUAL_IMMEDIATE │ │ true │ │ │ │ │ unclean.leader. │ 先业务后 ack.acknowledge() │ │ delivery.timeout │ election.enablefalse│ │ │ .ms300000 │ │ 异常时不 ack触发重投递 │ │ │ │ │ │ whenComplete 捕获 │ │ 幂等消费设计防止重复执行 │ │ 异常 存库补偿 │ │ │ └──────────────────┴──────────────────────┴───────────────────────────────┘ 三端协同缺任何一端消息都可能丢失六、踩坑记录❌ 坑1acksall 配置了但还是发现消息丢失原因没有配置 Broker 端的 min.insync.replicas ISR 只有 Leader 一个副本 acksall Leader 自己确认自己 acks1 解决Broker 端配置 min.insync.replicas2 确认方法 kafka-configs.sh --describe --entity-type topics \ --entity-name your-topic \ --bootstrap-server localhost:9092❌ 坑2单节点 Kafka 配置min.insync.replicas2导致发送一直失败本地开发单 Broker单副本 配置了 min.insync.replicas2 → ISR 中只有1个副本 2 → 所有写入失败 解决本地开发环境用宽松配置 spring.kafka.producer.acks1 # 或在 Broker 配置 min.insync.replicas1❌ 坑3补偿任务重发消息下游出现重复处理原因Producer 重发了原来那条消息幂等 Producer 的 ProducerID 已变化 Broker 当作新消息写入Consumer 重复消费 解决 1. 重发时在消息中携带原始消息 IDmessageId 2. Consumer 端基于 messageId 做幂等判断 3. 数据库唯一键 / Redis 去重见第6篇 本篇小结环节主要风险解决方案Produceracks1 Leader 宕机acksallmin.insync.replicas2Producer发送失败无感知whenComplete捕获异常 存库补偿Broker单副本replication.factor3Broker不洁选举unclean.leader.election.enablefalseConsumer自动提交enable.auto.commitfalse 手动 ackConsumer先提交后处理严格保证先处理后 ack的顺序核心记忆消息不丢 三端协同。Producer 确认 Broker 多副本 Consumer 手动提交任何一端缺失都是定时炸弹。下篇预告第10篇《消息不重 不乱——幂等消费、顺序性保证与死信队列》。