Kafka + Spring Boot 实战入门

Kafka + Spring Boot 实战入门 1. Docker 快速搭建# docker-compose.ymlversion:3.8services: zookeeper: image: confluentinc/cp-zookeeper:7.5.0 ports: -2181:2181kafka: image: confluentinc/cp-kafka:7.5.0 ports: -9092:9092environment: KAFKA_BROKER_ID:1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092docker-composeup-d2. Spring Boot 集成2.1 添加依赖dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency2.2 配置spring:kafka:bootstrap-servers:localhost:9092producer:key-serializer:org.apache.kafka.common.serialization.StringSerializervalue-serializer:org.apache.kafka.common.serialization.StringSerializerconsumer:key-deserializer:org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:org.apache.kafka.common.serialization.StringDeserializergroup-id:my-groupauto-offset-reset:earliest3. 生产者ServicepublicclassKafkaProducerService{AutowiredprivateKafkaTemplateString,StringkafkaTemplate;publicvoidsendMessage(Stringtopic,Stringmessage){kafkaTemplate.send(topic,message).addCallback(result-log.info(发送成功),ex-log.error(发送失败,ex));}publicvoidsendWithKey(Stringtopic,Stringkey,Stringmessage){kafkaTemplate.send(topic,key,message);}}4. 消费者ServicepublicclassKafkaConsumerService{KafkaListener(topicsmy-topic,groupIdmy-group)publicvoidconsume(ConsumerRecordString,Stringrecord){log.info(收到消息: {},record.value());}KafkaListener(topicsmy-topic,groupIdmy-group,concurrency3)publicvoidconsumeMulti(ConsumerRecordString,Stringrecord){log.info(收到消息: {},record.value());}}5. 消息序列化5.1 JSON 序列化spring:kafka:producer:value-serializer:org.springframework.kafka.support.serializer.JsonSerializerconsumer:value-deserializer:org.springframework.kafka.support.serializer.JsonDeserializerproperties:spring.json.trusted.packages:com.example.*// 发送对象kafkaTemplate.send(order-topic,order);// 消费对象KafkaListener(topicsorder-topic)publicvoidconsumeOrder(Orderorder){log.info(收到订单: {},order);}6. 常见问题6.1 消息丢失spring:kafka:producer:acks:all# 必须等所有副本确认consumer:enable-auto-commit:false# 手动提交KafkaListener(topicsmy-topic)publicvoidconsume(ConsumerRecordString,Stringrecord,Acknowledgmentack){try{processMessage(record.value());ack.acknowledge();// 手动提交}catch(Exceptione){log.error(处理失败,e);}}6.2 消息重复业务层面实现幂等KafkaListener(topicsorder-topic)publicvoidconsume(ConsumerRecordString,Stringrecord){Stringidrecord.key();if(redis.setIfAbsent(id,1)){processMessage(record.value());}}6.3 消息顺序相同 key 发送到相同分区kafkaTemplate.send(order-topic,orderId,message);7. 总结Docker一键搭建 Kafka 环境生产者通过 KafkaTemplate 发送消息消费者通过 KafkaListener 监听消息序列化支持 String 和 JSON可靠投递生产者 acksall 消费者手动提交