Kafaka安装和使用以及整和一、 安装docker1创建docker-compose.yml文件2测试二、 kafaka基础知识1kafaka核心架构2) 工作流程三、Spring Boot 整合Kafka1. 导入依赖 配置yml文件2. API讲解2.1 KafkaListener2.2 KafkaTemplate2.3 实现手动提交偏移量一、 安装docker1创建docker-compose.yml文件mkdirkafka-democdkafka-demotouchdocker-compose.ymldockercompose up-ddocker-compose.ymlversion:3services:kafka:image:apache/kafka:latest# 镜像container_name:kafka# 容器名ports:# 映射端口-9092:9092environment:KAFKA_NODE_ID:1# 当钱节点# KRaft 模式KAFKA_PROCESS_ROLES:broker,controllerKAFKA_LISTENERS:PLAINTEXT://:9092,CONTROLLER://:9093# 监听端口# controller Kafka集群节点之间通信# plaintext 普通客户端端口KAFKA_ADVERTISED_LISTENERS:PLAINTEXT://localhost:9092# 客户端如访问kafakaKAFKA_CONTROLLER_LISTENER_NAMES:CONTROLLERKAFKA_LISTENER_SECURITY_PROTOCOL_MAP:CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXTKAFKA_CONTROLLER_QUORUM_VOTERS:1kafka:9093# nodeIdhost:portKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR:1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR:1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS:02测试dockerexec-itkafkabash# 进入容器# 创建topic/opt/kafka/bin/kafka-topics.sh\--create\--topictest-topic\--bootstrap-server localhost:9092# 查看topic/opt/kafka/bin/kafka-topics.sh\--list\--bootstrap-server localhost:9092# 启动生产者/opt/kafka/bin/kafka-console-producer.sh\--topictest-topic\--bootstrap-server localhost:9092# 输入hello kafka启动一个新终端dockerexec-itkafkabash/opt/kafka/bin/kafka-console-consumer.sh\--topictest-topic\--from-beginning\--bootstrap-server localhost:9092可以看到二、 kafaka基础知识1kafaka核心架构Producer(生产者)负责发送消息可以是订单系统 、 日志系统 、webAppConsumer(消费者)从主题订阅新消息的Kafka 客户端。消费者通过检查消息偏移量来区分消息是否已读。Topic(主题)Kafka 消息通过主题进行分类。类似 数据库的表例如order-topic log-topic user-topicPartition分区一个 Topic 可以拆成多个 Partition。分区后可以可以并行写、并行读、横向扩展本质就是多个日志文件Offset偏移量Kafka 中消息长这样Partition-00-hello1-world2-kafka这个编号就是offset0、1、2作用标识消息位置记录消费进度BrokerKafka 集群中的一台服务器。组成 Kafka 集群提供高可用提供分布式能力Broker-1 Broker-2 Broker-3Consumer Group消费者组多个消费者构成的消费者组同时消费多个分区以实现高并发。每一个消费者属于一个特定的消费者组。消费者组中一个消费者可以消费多个分区。一个分区只能被指定给一个消费者。Replica(副本)Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余这些地方就是所谓的副本。副本还分为领导者副本和追随者副本各自有不同的角色划分。副本是在分区层级下的即每个分区可配置多个副本实现高可用。2) 工作流程# 1.生产者发送消息Producer ↓ Topic# 2.Kafka写入PartitionTopic ├── P0 ├── P1 └── P2# 3.Consumer 拉取消息Consumer -主动拉取三、Spring Boot 整合Kafka1. 导入依赖 配置yml文件dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencyspring:kafka:bootstrap-servers:192.168.59.128:9092# kafka集群地址producer:#生产者配置retries:3acks:all# 要求所有副本都确认收到消息之后才算发送成功batch-size:16384# 批量发送大小 16KB累积到该大小后批零发送提升吞吐量buffer-memory:33554432# 生产者缓冲区总大小 32MB用于缓存待发送消息key-serializer:org.apache.kafka.common.serialization.StringSerializer# KEY的序列化器value-serializer:org.apache.kafka.common.serialization.StringSerializer# 消息的序列化器properties:linger.ms:1# 消息在发送前最多等待1ms,配和batch-size 实现微批量consumer:# 消费者配置group-id:pet-life-consumer-group# 消费者组ID同一组的消费者回分摊消费分区auto-offset-reset:earliest# 无初始偏移量时从最早的消息开始消费另一个常用值是 latestenable-auto-commit:true# 自动提交消费位移简化消费端逻辑auto-commit-interval:1000# 自动提交的间隔为 1000ms即 1 秒#消息 Key 和 Value 的反序列化器与生产者对应key-deserializer:org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:org.apache.kafka.common.serialization.StringDeserializermax-poll-records:500# 单次 poll() 调用最多拉取 500 条消息listener:missing-topics-fatal:false# 监听的 Topic 不存在时不抛致命错误避免启动失败concurrency:3# 消费监听器并发线程数为 3即同时启动 3 个消费者实例消费分区2. API讲解2.1KafkaListenerpublicinterfaceKafkaListener{Stringid()default;//Listener 唯一 ID。StringcontainerFactory()default;//指定监听容器工厂String[]topics()default{};//指定监听的topicStringtopicPattern()default;//正则匹配topicTopicPartition[]topicPartitions()default{};//精确撇脂topic及其分区StringcontainerGroup()default;StringerrorHandler()default;//异常处理器StringgroupId()default;//指定消费者组booleanidIsGroup()defaulttrue;// 只id 和groupId是否相同StringclientIdPrefix()default;//Kafka Client ID 前缀StringbeanRef()default__listener;Stringconcurrency()default;// 并发消费线程数量StringautoStartup()default;// 是否自动消费String[]properties()default{};//额外 Kafka 配置。booleansplitIterables()defaulttrue;StringcontentTypeConverter()default;Stringbatch()default;Stringfilter()default;Stringinfo()default;StringcontainerPostProcessor()default;}示例KafkaListener(topicstest)publicvoidlisten(Stringmessage){System.out.println(Received message: message);}2.2KafkaTemplateKafkaTemplate本质上就是对 Kafka Producer的高级封装发送消息的API如下方法作用send(topic, data)普通发送send(topic, key, data)带 keysend(topic, partition, key, data)指定分区send(record)完整 ProducerRecordsend(message)Spring MessagesendDefault()默认 TopicexecuteInTransaction()事务消息其中带 key 和普通发送的区别 Kafka会对key进行哈希运算对于同一个key会进入同一分区能够保证**局部顺序性**完整ProducerRecordpublicclassProducerRecordK,V{privatefinalStringtopic;// 主题privatefinalIntegerpartition;//分区privatefinalHeadersheaders;// 元数据头部用于在消息体之外传递额外信息privatefinalKkey;// keyprivatefinalVvalue;//消息privatefinalLongtimestamp;// 时间戳}场景示例链路追踪传入 traceId、spanId实现分布式追踪消息去重传入唯一 messageId消费端幂等判断内容标识标记消息的 contentType、encoding路由标记标记消息来源 source、目标 target自定义标记任意业务标识如 priority、retryCount接收header的方法 // 方式1用 Header 注解取单个值KafkaListener(topicstest-topic)publicvoidlisten(Stringmessage,Header(traceId)StringtraceId,Header(source)Stringsource,Acknowledgmentack){System.out.println(traceIdtraceId, sourcesource, bodymessage);ack.acknowledge();}// 方式2接收完整 Headers 对象KafkaListener(topicstest-topic)publicvoidlisten(Stringmessage,Headersheaders,Acknowledgmentack){headers.forEach(h-System.out.println(keyh.key(), valuenewString(h.value())));ack.acknowledge();}// 方式3接收 ConsumerRecord最完整包含分区、偏移量等所有信息KafkaListener(topicstest-topic)publicvoidlisten(ConsumerRecordString,Stringrecord,Acknowledgmentack){System.out.println(topicrecord.topic(), partitionrecord.partition(), offsetrecord.offset(), headersrecord.headers(), valuerecord.value());ack.acknowledge();}2.3 实现手动提交偏移量避免业务还没有跑完就提交的偏移量。如果执行过程中出现故障但是这条消息已经消费过了造成数据丢失。修改yml文件spring:kafka:consumer:# 消费者配置enable-auto-commit:false# 关闭自动提交配合 ack-mode: manual 手动提交偏移量listener:ack-mode:manualmissing-topics-fatal:false# 监听的 Topic 不存在时不抛致命错误避免启动失败KafkaListener(topicPatterntest-.*,groupIdtest-group)publicvoidlisten(Stringmessage,Acknowledgmentack){try{System.out.println(Received message: message);// 业务逻辑处理...}finally{ack.acknowledge();// 手动提交偏移量}}
Kafka基础篇
Kafaka安装和使用以及整和一、 安装docker1创建docker-compose.yml文件2测试二、 kafaka基础知识1kafaka核心架构2) 工作流程三、Spring Boot 整合Kafka1. 导入依赖 配置yml文件2. API讲解2.1 KafkaListener2.2 KafkaTemplate2.3 实现手动提交偏移量一、 安装docker1创建docker-compose.yml文件mkdirkafka-democdkafka-demotouchdocker-compose.ymldockercompose up-ddocker-compose.ymlversion:3services:kafka:image:apache/kafka:latest# 镜像container_name:kafka# 容器名ports:# 映射端口-9092:9092environment:KAFKA_NODE_ID:1# 当钱节点# KRaft 模式KAFKA_PROCESS_ROLES:broker,controllerKAFKA_LISTENERS:PLAINTEXT://:9092,CONTROLLER://:9093# 监听端口# controller Kafka集群节点之间通信# plaintext 普通客户端端口KAFKA_ADVERTISED_LISTENERS:PLAINTEXT://localhost:9092# 客户端如访问kafakaKAFKA_CONTROLLER_LISTENER_NAMES:CONTROLLERKAFKA_LISTENER_SECURITY_PROTOCOL_MAP:CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXTKAFKA_CONTROLLER_QUORUM_VOTERS:1kafka:9093# nodeIdhost:portKAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR:1KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR:1KAFKA_TRANSACTION_STATE_LOG_MIN_ISR:1KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS:02测试dockerexec-itkafkabash# 进入容器# 创建topic/opt/kafka/bin/kafka-topics.sh\--create\--topictest-topic\--bootstrap-server localhost:9092# 查看topic/opt/kafka/bin/kafka-topics.sh\--list\--bootstrap-server localhost:9092# 启动生产者/opt/kafka/bin/kafka-console-producer.sh\--topictest-topic\--bootstrap-server localhost:9092# 输入hello kafka启动一个新终端dockerexec-itkafkabash/opt/kafka/bin/kafka-console-consumer.sh\--topictest-topic\--from-beginning\--bootstrap-server localhost:9092可以看到二、 kafaka基础知识1kafaka核心架构Producer(生产者)负责发送消息可以是订单系统 、 日志系统 、webAppConsumer(消费者)从主题订阅新消息的Kafka 客户端。消费者通过检查消息偏移量来区分消息是否已读。Topic(主题)Kafka 消息通过主题进行分类。类似 数据库的表例如order-topic log-topic user-topicPartition分区一个 Topic 可以拆成多个 Partition。分区后可以可以并行写、并行读、横向扩展本质就是多个日志文件Offset偏移量Kafka 中消息长这样Partition-00-hello1-world2-kafka这个编号就是offset0、1、2作用标识消息位置记录消费进度BrokerKafka 集群中的一台服务器。组成 Kafka 集群提供高可用提供分布式能力Broker-1 Broker-2 Broker-3Consumer Group消费者组多个消费者构成的消费者组同时消费多个分区以实现高并发。每一个消费者属于一个特定的消费者组。消费者组中一个消费者可以消费多个分区。一个分区只能被指定给一个消费者。Replica(副本)Kafka 中同一条消息能够被拷贝到多个地方以提供数据冗余这些地方就是所谓的副本。副本还分为领导者副本和追随者副本各自有不同的角色划分。副本是在分区层级下的即每个分区可配置多个副本实现高可用。2) 工作流程# 1.生产者发送消息Producer ↓ Topic# 2.Kafka写入PartitionTopic ├── P0 ├── P1 └── P2# 3.Consumer 拉取消息Consumer -主动拉取三、Spring Boot 整合Kafka1. 导入依赖 配置yml文件dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependencyspring:kafka:bootstrap-servers:192.168.59.128:9092# kafka集群地址producer:#生产者配置retries:3acks:all# 要求所有副本都确认收到消息之后才算发送成功batch-size:16384# 批量发送大小 16KB累积到该大小后批零发送提升吞吐量buffer-memory:33554432# 生产者缓冲区总大小 32MB用于缓存待发送消息key-serializer:org.apache.kafka.common.serialization.StringSerializer# KEY的序列化器value-serializer:org.apache.kafka.common.serialization.StringSerializer# 消息的序列化器properties:linger.ms:1# 消息在发送前最多等待1ms,配和batch-size 实现微批量consumer:# 消费者配置group-id:pet-life-consumer-group# 消费者组ID同一组的消费者回分摊消费分区auto-offset-reset:earliest# 无初始偏移量时从最早的消息开始消费另一个常用值是 latestenable-auto-commit:true# 自动提交消费位移简化消费端逻辑auto-commit-interval:1000# 自动提交的间隔为 1000ms即 1 秒#消息 Key 和 Value 的反序列化器与生产者对应key-deserializer:org.apache.kafka.common.serialization.StringDeserializervalue-deserializer:org.apache.kafka.common.serialization.StringDeserializermax-poll-records:500# 单次 poll() 调用最多拉取 500 条消息listener:missing-topics-fatal:false# 监听的 Topic 不存在时不抛致命错误避免启动失败concurrency:3# 消费监听器并发线程数为 3即同时启动 3 个消费者实例消费分区2. API讲解2.1KafkaListenerpublicinterfaceKafkaListener{Stringid()default;//Listener 唯一 ID。StringcontainerFactory()default;//指定监听容器工厂String[]topics()default{};//指定监听的topicStringtopicPattern()default;//正则匹配topicTopicPartition[]topicPartitions()default{};//精确撇脂topic及其分区StringcontainerGroup()default;StringerrorHandler()default;//异常处理器StringgroupId()default;//指定消费者组booleanidIsGroup()defaulttrue;// 只id 和groupId是否相同StringclientIdPrefix()default;//Kafka Client ID 前缀StringbeanRef()default__listener;Stringconcurrency()default;// 并发消费线程数量StringautoStartup()default;// 是否自动消费String[]properties()default{};//额外 Kafka 配置。booleansplitIterables()defaulttrue;StringcontentTypeConverter()default;Stringbatch()default;Stringfilter()default;Stringinfo()default;StringcontainerPostProcessor()default;}示例KafkaListener(topicstest)publicvoidlisten(Stringmessage){System.out.println(Received message: message);}2.2KafkaTemplateKafkaTemplate本质上就是对 Kafka Producer的高级封装发送消息的API如下方法作用send(topic, data)普通发送send(topic, key, data)带 keysend(topic, partition, key, data)指定分区send(record)完整 ProducerRecordsend(message)Spring MessagesendDefault()默认 TopicexecuteInTransaction()事务消息其中带 key 和普通发送的区别 Kafka会对key进行哈希运算对于同一个key会进入同一分区能够保证**局部顺序性**完整ProducerRecordpublicclassProducerRecordK,V{privatefinalStringtopic;// 主题privatefinalIntegerpartition;//分区privatefinalHeadersheaders;// 元数据头部用于在消息体之外传递额外信息privatefinalKkey;// keyprivatefinalVvalue;//消息privatefinalLongtimestamp;// 时间戳}场景示例链路追踪传入 traceId、spanId实现分布式追踪消息去重传入唯一 messageId消费端幂等判断内容标识标记消息的 contentType、encoding路由标记标记消息来源 source、目标 target自定义标记任意业务标识如 priority、retryCount接收header的方法 // 方式1用 Header 注解取单个值KafkaListener(topicstest-topic)publicvoidlisten(Stringmessage,Header(traceId)StringtraceId,Header(source)Stringsource,Acknowledgmentack){System.out.println(traceIdtraceId, sourcesource, bodymessage);ack.acknowledge();}// 方式2接收完整 Headers 对象KafkaListener(topicstest-topic)publicvoidlisten(Stringmessage,Headersheaders,Acknowledgmentack){headers.forEach(h-System.out.println(keyh.key(), valuenewString(h.value())));ack.acknowledge();}// 方式3接收 ConsumerRecord最完整包含分区、偏移量等所有信息KafkaListener(topicstest-topic)publicvoidlisten(ConsumerRecordString,Stringrecord,Acknowledgmentack){System.out.println(topicrecord.topic(), partitionrecord.partition(), offsetrecord.offset(), headersrecord.headers(), valuerecord.value());ack.acknowledge();}2.3 实现手动提交偏移量避免业务还没有跑完就提交的偏移量。如果执行过程中出现故障但是这条消息已经消费过了造成数据丢失。修改yml文件spring:kafka:consumer:# 消费者配置enable-auto-commit:false# 关闭自动提交配合 ack-mode: manual 手动提交偏移量listener:ack-mode:manualmissing-topics-fatal:false# 监听的 Topic 不存在时不抛致命错误避免启动失败KafkaListener(topicPatterntest-.*,groupIdtest-group)publicvoidlisten(Stringmessage,Acknowledgmentack){try{System.out.println(Received message: message);// 业务逻辑处理...}finally{ack.acknowledge();// 手动提交偏移量}}