上一篇【第67篇】Kafka请求处理机制深度解析——生产请求与获取请求的完整链路下一篇【第69篇】Kafka页缓存与零拷贝——多副本下的高性能读写魔法摘要Kafka的存储层设计是它高性能的根本原因之一。从分区在磁盘上的目录布局到消息格式的版本演进再到Log Compaction的清理算法——每一层设计都经过精心权衡。很多人知道Kafka快但不知道它为什么这么能装、这么能清。本文将深入物理存储的四个核心领域磁盘目录布局、消息格式V2详解、Log Compaction算法、基于时间和大小的保留策略。读完这篇你会对Kafka怎么存、怎么清、怎么又快又省有透彻理解。一、分区在磁盘上的布局——一切从目录开始1.1 目录结构【Kafka 数据目录布局】 $KAFKA_LOG_DIRS可配置多个目录 │ ├── order-events-0/ ← Topic: order-events, Partition: 0 │ ├── 00000000000000000000.log ← 当前活跃Segment的数据文件 │ ├── 00000000000000000000.index ← 对应的偏移量稀疏索引 │ ├── 00000000000000000000.timeindex ← 对应的时间戳稀疏索引 │ ├── 00000000000000001000.log ← 下一个Segmentoffset从1000开始 │ ├── 00000000000000001000.index │ ├── 00000000000000001000.timeindex │ ├── 00000000000000002000.log │ ├── 00000000000000002000.index │ └── ... │ └── leader-epoch-checkpoint ← Leader Epoch持久化文件 │ ├── order-events-1/ │ └── ... │ ├── __consumer_offsets-0/ ← 内部Topic存储offset │ └── ... │ └── cleaner-offset-checkpoint ← Log Compaction进度记录 关键规则 ① 每个 Partition 对应一个目录topic-partition ② 目录分布在 log.dirs 配置的多个磁盘上JBOD模式 ③ 每个 Partition 的日志由多个 Segment 组成1.2 分区分配到磁盘的策略// Kafka 的分区分配磁盘策略Kafka 1.1// 目标均匀分配 Leader 分区到不同磁盘publicclassPartitionedAssignor{// 分配新分区时选择分区数最少的目录publicFileallocatePartition(Stringtopic,intpartition){// 1. 统计每个 log.dir 上该 Topic 的 Leader 分区数MapString,IntegerleaderCountPerDirnewHashMap();for(Stringdir:logDirs){intcountcountLeaderPartitions(dir,topic);leaderCountPerDir.put(dir,count);}// 2. 选择 Leader 分区数最少的目录returnleaderCountPerDir.entrySet().stream().min(Comparator.comparingInt(Map.Entry::getValue)).map(Map.Entry::getKey).orElse(logDirs[0]);}}多个log.dirs的作用不是RAID而是JBODJust a Bunch Of Disks——每个分区只存在一个磁盘上不同分区分散在不同磁盘实现I/O并行。# server.properties log.dirs/data/kafka-logs-1,/data/kafka-logs-2,/data/kafka-logs-3 # 三个目录分别对应三块物理磁盘 # Kafka 会自动将不同 Partition 分散到不同磁盘 # 实现 I/O 并行化二、消息格式V2详解——Kafka 0.11的里程碑2.1 V0/V1/V2 演进史【消息格式版本演进】 V0 (Kafka 0.8~0.10): ┌─────────────────────────────────────┐ │ Record Header: │ │ • crc32 (4 bytes) │ │ • magic (1 byte) 0 │ │ • attributes (1 byte) │ │ • key length key │ │ • value length value │ │ │ │ 问题每条消息都有完整header浪费空间│ └─────────────────────────────────────┘ V1 (Kafka 0.11): ┌─────────────────────────────────────┐ │ 在 V0 基础上增加 │ │ • timestamp (8 bytes) │ │ │ │ 用途支持按时间戳索引和保留 │ └─────────────────────────────────────┘ V2 (Kafka 0.11, 当前主流): ┌─────────────────────────────────────┐ │ RecordBatch Header共享header: │ │ • baseOffset (8 bytes) │ │ • batchLength (4 bytes) │ │ • partitionLeaderEpoch (4 bytes) │ │ • magic (1 byte) 2 │ │ • crc32 (4 bytes) │ │ • attributes (2 bytes) │ │ • lastOffsetDelta (4 bytes) │ │ • baseTimestamp (8 bytes) │ │ • maxTimestamp (8 bytes) │ │ • producerId / producerEpoch │ │ • firstSequence / baseSequence │ │ │ │ Record Payload (变长编码): │ │ • attributes (1 byte, 省空间) │ │ • timestampDelta (变长) │ │ • offsetDelta (变长) │ │ • key length key (变长) │ │ • value length value (变长) │ │ │ │ 优势 │ │ ① 批量共享header → 省空间 │ │ ② 变长整数编码 → 小数字节数更少 │ │ ③ 支持事务和幂等性 │ └─────────────────────────────────────┘2.2 V2格式的变长编码Varint【Varint 编码原理】 传统 int32: 固定 4 bytes V2 Varint: 1~5 bytes数值越小字节数越少 示例 ┌─────────────────────────────────────┐ │ 数值 │ 传统编码 │ Varint编码 │ │ ─────────┼─────────┼──────────┤ │ 0 │ 4 bytes │ 1 byte │ │ 127 │ 4 bytes │ 1 byte │ │ 128 │ 4 bytes │ 2 bytes │ │ 16383 │ 4 bytes │ 2 bytes │ │ 16384 │ 4 bytes │ 3 bytes │ │ 2147483647│ 4 bytes │ 5 bytes │ └─────────────────────────────────────┘ Kafka 消息的 offsetDelta、timestampDelta 通常很小 → Varint 编码平均节省 30~40% 空间2.3 RecordBatch 结构V2核心// V2 RecordBatch 的 on-disk 格式简化┌─────────────────────────────────────────────────────┐ │RecordBatchHeader(固定49~61bytes)│ │ │ │ baseOffset:int64(基准offset,第一条)│ │ batchLength:int32(batch总长度)│ │ partitionLeaderEpoch:int32(防止HW截断问题)│ │ magic:int8(2)│ │ crc:int32(headerpayload的CRC)│ │ attributes:int16 │ │ • bit0~2:压缩类型(0none,1GZIP,...)│ │ • bit3:timestamp类型(0创建,1追加)│ │ • bit4:事务标记(0非事务,1事务)│ │ lastOffsetDelta:int32(最后一条的offset增量)│ │ baseTimestamp:int64(基准时间戳)│ │ maxTimestamp:int64(本batch最大时间戳)│ │ producerId:int64(幂等性ProducerID)│ │ producerEpoch:int16(Producerepoch)│ │ baseSequence:int32(基准序列号)│ │ recordsCount:int32(本batch消息条数)│ │ deletedCount:int32(已删除消息数,事务用)│ ├─────────────────────────────────────────────────────┤ │RecordPayload(每条消息变长编码)│ │ │ │Record1:│ │ attributes:int8(0无,1控制消息)│ │ timestampDelta:varint(相对baseTimestamp的增量)│ │ offsetDelta:varint(相对baseOffset的增量)│ │ keyLength:varint │ │ key:byte[]│ │ valueLength:varint │ │ value:byte[]│ │ headersCount:varint │ │ headers:[key,value]pairs │ │ │ │Record2:...│ │RecordN:...│ └─────────────────────────────────────────────────────┘三、Log Compaction日志压实—— 只保留最新值3.1 为什么需要 Compaction【日志保留的两种策略】 策略A基于时间/大小的删除默认 ┌─────────────────────────────────────┐ │ Topic: user-clicks用户点击日志 │ │ Key: 用户ID │ │ Value: 点击次数 │ │ │ │ 保留7天 → 7天前的所有消息被删除 │ │ 适合事件日志、审计日志 │ └─────────────────────────────────────┘ 策略BLog Compaction压实 ┌─────────────────────────────────────┐ │ Topic: user-profile用户资料 │ │ Key: 用户ID │ │ Value: 用户资料JSON │ │ │ │ 只保留每个 Key 的最新值 │ │ → 类似数据库里的 latest value │ │ 适合K/V存储、状态存储、CDC │ └─────────────────────────────────────┘3.2 Compaction 算法图解【Log Compaction 执行过程】 压实前Log Segment ┌─────────────────────────────────────┐ │ Offset │ Key │ Value │ Timestamp │ │ ───────┼───────┼───────┼───────────┤ │ 0 │ user1 │ v1 │ T1 │ │ 1 │ user2 │ v1 │ T2 │ │ 2 │ user1 │ v2 │ T3 │ ← user1的最新值 │ 3 │ user3 │ v1 │ T4 │ │ 4 │ user2 │ v2 │ T5 │ ← user2的最新值 │ 5 │ user1 │ v3 │ T6 │ ← user1的再最新值 └─────────────────────────────────────┘ 压实后保留每个Key的最新值 ┌─────────────────────────────────────┐ │ Offset │ Key │ Value │ Timestamp │ │ ───────┼───────┼───────┼───────────┤ │ 3 │ user3 │ v1 │ T4 │ │ 4 │ user2 │ v2 │ T5 │ │ 5 │ user1 │ v3 │ T6 │ └─────────────────────────────────────┘ Offset 0,1,2 被删除user1、user2的老版本 每个 Key 只保留最新的一条消息3.3 Cleaner 线程工作原理【Log Cleaner 线程工作流程】 ┌─────────────────────────────────────────────────────┐ │ Step 1: 选择需要清理的 TopicPartition │ │ │ │ • 从 cleaner-offset-checkpoint 读取上次清理位置 │ │ • 选择脏数据比例最高的分区优先清理 │ │ • 脏数据比例 (日志总大小 - 压实后大小) / 总大小│ └─────────────────┬─────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ Step 2: 构建 Key → Offset 映射Sketch │ │ │ │ • 读取日志尾部未被清理过的最近日志 │ │ • 为每个 Key 记录最大的 Offset │ │ • 使用高效哈希表类似RoaringBitmap │ │ • 哈希表放入堆外内存避免GC压力 │ └─────────────────┬─────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ Step 3: 滑动清理Sliding Window │ │ │ │ • 从头读取日志 Segment │ │ • 对每条消息查哈希表 │ │ → 如果 message.offset hashtable[key] │ │ → 这条消息不是最新值标记为删除 │ │ → 否则保留 │ │ • 将保留的消息写入新的 Cleaned Segment │ │ • 用 Cleaned Segment 替换原 Segment │ └─────────────────┬─────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ Step 4: 更新清理进度 │ │ │ │ • 将本次清理到的 offset 写入 checkpoint 文件 │ │ • 下次从 checkpoint 继续 │ └─────────────────────────────────────────────────────┘# Log Compaction 相关配置 # 开启 CompactionTopic级别配置 kafka-topics.sh --create \ --topic user-profile \ --config cleanup.policycompact \ --bootstrap-server localhost:9092 # 同时保留和压实Kafka 0.10 支持 kafka-topics.sh --alter \ --topic user-profile \ --config cleanup.policycompact,delete \ --bootstrap-server localhost:9092 # Cleaner 线程数默认 1调大可加速压实 log.cleaner.threads2 # 压实触发阈值脏数据比例默认 0.5 50% log.cleaner.min.cleanable.ratio0.5 # 单个 Cleaner 缓冲区大小 log.cleaner.io.buffer.size524288 # 512KB四、基于时间和大小的日志保留4.1 保留策略对比【日志保留策略矩阵】 策略 │ 配置项 │ 触发条件 ────────────────┼─────────────────────────┼────────────────────── 基于时间删除 │ log.retention.hours │ 日志Segment的 │ │ log.retention.ms │ 最后修改时间超过阈值 │ │ (默认 168小时 7天) │ │ ────────────────┼─────────────────────────┼────────────────────── 基于大小删除 │ log.retention.bytes │ 所有Segment总大小 │ │ (默认 -1 不限制) │ 超过阈值时删除最老的 │ │ │ Segment │ ────────────────┼─────────────────────────┼────────────────────── Log Compaction │ cleanup.policycompact │ 每个Key只保留最新值 │ │ │ (不按时间/大小删除) │ ────────────────┼─────────────────────────┼────────────────────── 混合模式 │ cleanup.policy │ 先压实再按时间删除 │ │ compact,delete │ (Compaction保留最新 │ │ │ 过期的最新值也删除) │4.2 保留策略执行流程【基于时间的保留策略执行】 ┌─────────────────────────────────────────────────────┐ │ LogManager 后台线程每 5 分钟触发一次 │ │ │ │ for each Partition: │ │ 1. 获取所有 Segment │ │ 2. 从最老的 Segment 开始检查 │ │ if (now - segment.lastModifiedTime │ │ log.retention.ms): │ │ → 标记该 Segment 为 待删除 │ │ 3. 批量删除被标记的 Segment │ │ 不能删除当前活跃的 Segment │ │ │ │ 注意 │ │ • 时间计算基于 Segment 文件的最后修改时间 │ │ • 不是消息里的时间戳 │ │ • 默认保留 7 天168小时 │ └─────────────────────────────────────────────────────┘五、性能影响与最佳实践5.1 存储参数对性能的影响【存储相关参数调优指南】 参数 │ 默认值 │ 建议值 │ 说明 ────────────────────────────┼──────────┼────────────┼──────────────────── log.segment.bytes │ 1GB │ 512MB~1GB │ Segment太大│ (单个Segment大小上限) │ │ │ Compaction慢│ ────────────────────────────┼──────────┼────────────┼──────────────────── log.roll.hours │ 168h │ 24~72h │ 按时间滚动 │ (Segment按时间滚动) │ │ │ Segment │ ────────────────────────────┼──────────┼────────────┼──────────────────── log.index.interval.bytes │ 4096 │ 4096 │ 每4KB消息 │ (索引条目间隔) │ │ │ 建一条索引 │ ────────────────────────────┼──────────┼────────────┼──────────────────── log.retention.hours │ 168 │ 72~168 │ 保留3~7天 │ (基于时间保留) │ │ │ 视业务而定 │ ────────────────────────────┼──────────┼────────────┼──────────────────── log.retention.bytes │ -1 │ -1 │ 不限制总大小 │ (基于大小保留) │ (不限制) │ 或磁盘80% │ 除非磁盘紧张 │ ────────────────────────────┼──────────┼────────────┼──────────────────── log.cleaner.threads │ 1 │ 2~4 │ 压实加速 │ (Compaction线程数) │ │ │ 仅compact策略用│ ────────────────────────────┼──────────┼────────────┼──────────────────── num.recovery.threads.per.data.dir│ 1 │ 2~4 │ 启动时恢复 │ (每个数据目录的恢复线程数) │ │ │ 加速启动 │5.2 磁盘空间规划# 磁盘空间计算公式所需磁盘空间日均消息量 × 每条消息平均大小 × 保留天数 ×(1 副本数)×1.2示例 日均消息量1亿条/天 每条消息平均大小1KB 保留天数7天 副本数3 冗余系数1.2索引Segment碎片 计算1亿 × 1KB ×7×3×1.2≈2.52TB 建议磁盘配置3TB/每块 ×3块9TB3块盘JBOD本篇小结今天我们深入了Kafka的物理存储层目录布局每个Partition一个目录多log.dirs实现JBOD并行I/O消息格式V2批量共享Header 变长编码节省30~40%空间支持事务和幂等性Log Compaction为每个Key保留最新值适合K/V存储场景Cleaner线程用Sketch滑动窗口高效清理保留策略基于时间默认7天、基于大小、Compaction三者可组合使用核心要点Kafka的存储设计哲学是顺序写、批量清、按需压——充分发挥磁盘顺序写性能用后台线程异步清理用Compaction兼顾存储效率和最新值查询。下一篇我们将聚焦页缓存与零拷贝——这才是Kafka快得离谱的真正秘密武器。上一篇【第67篇】Kafka请求处理机制深度解析——生产请求与获取请求的完整链路下一篇【第69篇】Kafka页缓存与零拷贝——多副本下的高性能读写魔法
【Kafka源码解读和使用指南】第68篇:Kafka物理存储深度解析——分区分配、文件格式、日志清理全解析
上一篇【第67篇】Kafka请求处理机制深度解析——生产请求与获取请求的完整链路下一篇【第69篇】Kafka页缓存与零拷贝——多副本下的高性能读写魔法摘要Kafka的存储层设计是它高性能的根本原因之一。从分区在磁盘上的目录布局到消息格式的版本演进再到Log Compaction的清理算法——每一层设计都经过精心权衡。很多人知道Kafka快但不知道它为什么这么能装、这么能清。本文将深入物理存储的四个核心领域磁盘目录布局、消息格式V2详解、Log Compaction算法、基于时间和大小的保留策略。读完这篇你会对Kafka怎么存、怎么清、怎么又快又省有透彻理解。一、分区在磁盘上的布局——一切从目录开始1.1 目录结构【Kafka 数据目录布局】 $KAFKA_LOG_DIRS可配置多个目录 │ ├── order-events-0/ ← Topic: order-events, Partition: 0 │ ├── 00000000000000000000.log ← 当前活跃Segment的数据文件 │ ├── 00000000000000000000.index ← 对应的偏移量稀疏索引 │ ├── 00000000000000000000.timeindex ← 对应的时间戳稀疏索引 │ ├── 00000000000000001000.log ← 下一个Segmentoffset从1000开始 │ ├── 00000000000000001000.index │ ├── 00000000000000001000.timeindex │ ├── 00000000000000002000.log │ ├── 00000000000000002000.index │ └── ... │ └── leader-epoch-checkpoint ← Leader Epoch持久化文件 │ ├── order-events-1/ │ └── ... │ ├── __consumer_offsets-0/ ← 内部Topic存储offset │ └── ... │ └── cleaner-offset-checkpoint ← Log Compaction进度记录 关键规则 ① 每个 Partition 对应一个目录topic-partition ② 目录分布在 log.dirs 配置的多个磁盘上JBOD模式 ③ 每个 Partition 的日志由多个 Segment 组成1.2 分区分配到磁盘的策略// Kafka 的分区分配磁盘策略Kafka 1.1// 目标均匀分配 Leader 分区到不同磁盘publicclassPartitionedAssignor{// 分配新分区时选择分区数最少的目录publicFileallocatePartition(Stringtopic,intpartition){// 1. 统计每个 log.dir 上该 Topic 的 Leader 分区数MapString,IntegerleaderCountPerDirnewHashMap();for(Stringdir:logDirs){intcountcountLeaderPartitions(dir,topic);leaderCountPerDir.put(dir,count);}// 2. 选择 Leader 分区数最少的目录returnleaderCountPerDir.entrySet().stream().min(Comparator.comparingInt(Map.Entry::getValue)).map(Map.Entry::getKey).orElse(logDirs[0]);}}多个log.dirs的作用不是RAID而是JBODJust a Bunch Of Disks——每个分区只存在一个磁盘上不同分区分散在不同磁盘实现I/O并行。# server.properties log.dirs/data/kafka-logs-1,/data/kafka-logs-2,/data/kafka-logs-3 # 三个目录分别对应三块物理磁盘 # Kafka 会自动将不同 Partition 分散到不同磁盘 # 实现 I/O 并行化二、消息格式V2详解——Kafka 0.11的里程碑2.1 V0/V1/V2 演进史【消息格式版本演进】 V0 (Kafka 0.8~0.10): ┌─────────────────────────────────────┐ │ Record Header: │ │ • crc32 (4 bytes) │ │ • magic (1 byte) 0 │ │ • attributes (1 byte) │ │ • key length key │ │ • value length value │ │ │ │ 问题每条消息都有完整header浪费空间│ └─────────────────────────────────────┘ V1 (Kafka 0.11): ┌─────────────────────────────────────┐ │ 在 V0 基础上增加 │ │ • timestamp (8 bytes) │ │ │ │ 用途支持按时间戳索引和保留 │ └─────────────────────────────────────┘ V2 (Kafka 0.11, 当前主流): ┌─────────────────────────────────────┐ │ RecordBatch Header共享header: │ │ • baseOffset (8 bytes) │ │ • batchLength (4 bytes) │ │ • partitionLeaderEpoch (4 bytes) │ │ • magic (1 byte) 2 │ │ • crc32 (4 bytes) │ │ • attributes (2 bytes) │ │ • lastOffsetDelta (4 bytes) │ │ • baseTimestamp (8 bytes) │ │ • maxTimestamp (8 bytes) │ │ • producerId / producerEpoch │ │ • firstSequence / baseSequence │ │ │ │ Record Payload (变长编码): │ │ • attributes (1 byte, 省空间) │ │ • timestampDelta (变长) │ │ • offsetDelta (变长) │ │ • key length key (变长) │ │ • value length value (变长) │ │ │ │ 优势 │ │ ① 批量共享header → 省空间 │ │ ② 变长整数编码 → 小数字节数更少 │ │ ③ 支持事务和幂等性 │ └─────────────────────────────────────┘2.2 V2格式的变长编码Varint【Varint 编码原理】 传统 int32: 固定 4 bytes V2 Varint: 1~5 bytes数值越小字节数越少 示例 ┌─────────────────────────────────────┐ │ 数值 │ 传统编码 │ Varint编码 │ │ ─────────┼─────────┼──────────┤ │ 0 │ 4 bytes │ 1 byte │ │ 127 │ 4 bytes │ 1 byte │ │ 128 │ 4 bytes │ 2 bytes │ │ 16383 │ 4 bytes │ 2 bytes │ │ 16384 │ 4 bytes │ 3 bytes │ │ 2147483647│ 4 bytes │ 5 bytes │ └─────────────────────────────────────┘ Kafka 消息的 offsetDelta、timestampDelta 通常很小 → Varint 编码平均节省 30~40% 空间2.3 RecordBatch 结构V2核心// V2 RecordBatch 的 on-disk 格式简化┌─────────────────────────────────────────────────────┐ │RecordBatchHeader(固定49~61bytes)│ │ │ │ baseOffset:int64(基准offset,第一条)│ │ batchLength:int32(batch总长度)│ │ partitionLeaderEpoch:int32(防止HW截断问题)│ │ magic:int8(2)│ │ crc:int32(headerpayload的CRC)│ │ attributes:int16 │ │ • bit0~2:压缩类型(0none,1GZIP,...)│ │ • bit3:timestamp类型(0创建,1追加)│ │ • bit4:事务标记(0非事务,1事务)│ │ lastOffsetDelta:int32(最后一条的offset增量)│ │ baseTimestamp:int64(基准时间戳)│ │ maxTimestamp:int64(本batch最大时间戳)│ │ producerId:int64(幂等性ProducerID)│ │ producerEpoch:int16(Producerepoch)│ │ baseSequence:int32(基准序列号)│ │ recordsCount:int32(本batch消息条数)│ │ deletedCount:int32(已删除消息数,事务用)│ ├─────────────────────────────────────────────────────┤ │RecordPayload(每条消息变长编码)│ │ │ │Record1:│ │ attributes:int8(0无,1控制消息)│ │ timestampDelta:varint(相对baseTimestamp的增量)│ │ offsetDelta:varint(相对baseOffset的增量)│ │ keyLength:varint │ │ key:byte[]│ │ valueLength:varint │ │ value:byte[]│ │ headersCount:varint │ │ headers:[key,value]pairs │ │ │ │Record2:...│ │RecordN:...│ └─────────────────────────────────────────────────────┘三、Log Compaction日志压实—— 只保留最新值3.1 为什么需要 Compaction【日志保留的两种策略】 策略A基于时间/大小的删除默认 ┌─────────────────────────────────────┐ │ Topic: user-clicks用户点击日志 │ │ Key: 用户ID │ │ Value: 点击次数 │ │ │ │ 保留7天 → 7天前的所有消息被删除 │ │ 适合事件日志、审计日志 │ └─────────────────────────────────────┘ 策略BLog Compaction压实 ┌─────────────────────────────────────┐ │ Topic: user-profile用户资料 │ │ Key: 用户ID │ │ Value: 用户资料JSON │ │ │ │ 只保留每个 Key 的最新值 │ │ → 类似数据库里的 latest value │ │ 适合K/V存储、状态存储、CDC │ └─────────────────────────────────────┘3.2 Compaction 算法图解【Log Compaction 执行过程】 压实前Log Segment ┌─────────────────────────────────────┐ │ Offset │ Key │ Value │ Timestamp │ │ ───────┼───────┼───────┼───────────┤ │ 0 │ user1 │ v1 │ T1 │ │ 1 │ user2 │ v1 │ T2 │ │ 2 │ user1 │ v2 │ T3 │ ← user1的最新值 │ 3 │ user3 │ v1 │ T4 │ │ 4 │ user2 │ v2 │ T5 │ ← user2的最新值 │ 5 │ user1 │ v3 │ T6 │ ← user1的再最新值 └─────────────────────────────────────┘ 压实后保留每个Key的最新值 ┌─────────────────────────────────────┐ │ Offset │ Key │ Value │ Timestamp │ │ ───────┼───────┼───────┼───────────┤ │ 3 │ user3 │ v1 │ T4 │ │ 4 │ user2 │ v2 │ T5 │ │ 5 │ user1 │ v3 │ T6 │ └─────────────────────────────────────┘ Offset 0,1,2 被删除user1、user2的老版本 每个 Key 只保留最新的一条消息3.3 Cleaner 线程工作原理【Log Cleaner 线程工作流程】 ┌─────────────────────────────────────────────────────┐ │ Step 1: 选择需要清理的 TopicPartition │ │ │ │ • 从 cleaner-offset-checkpoint 读取上次清理位置 │ │ • 选择脏数据比例最高的分区优先清理 │ │ • 脏数据比例 (日志总大小 - 压实后大小) / 总大小│ └─────────────────┬─────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ Step 2: 构建 Key → Offset 映射Sketch │ │ │ │ • 读取日志尾部未被清理过的最近日志 │ │ • 为每个 Key 记录最大的 Offset │ │ • 使用高效哈希表类似RoaringBitmap │ │ • 哈希表放入堆外内存避免GC压力 │ └─────────────────┬─────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ Step 3: 滑动清理Sliding Window │ │ │ │ • 从头读取日志 Segment │ │ • 对每条消息查哈希表 │ │ → 如果 message.offset hashtable[key] │ │ → 这条消息不是最新值标记为删除 │ │ → 否则保留 │ │ • 将保留的消息写入新的 Cleaned Segment │ │ • 用 Cleaned Segment 替换原 Segment │ └─────────────────┬─────────────────────────────────┘ │ ▼ ┌─────────────────────────────────────────────────────┐ │ Step 4: 更新清理进度 │ │ │ │ • 将本次清理到的 offset 写入 checkpoint 文件 │ │ • 下次从 checkpoint 继续 │ └─────────────────────────────────────────────────────┘# Log Compaction 相关配置 # 开启 CompactionTopic级别配置 kafka-topics.sh --create \ --topic user-profile \ --config cleanup.policycompact \ --bootstrap-server localhost:9092 # 同时保留和压实Kafka 0.10 支持 kafka-topics.sh --alter \ --topic user-profile \ --config cleanup.policycompact,delete \ --bootstrap-server localhost:9092 # Cleaner 线程数默认 1调大可加速压实 log.cleaner.threads2 # 压实触发阈值脏数据比例默认 0.5 50% log.cleaner.min.cleanable.ratio0.5 # 单个 Cleaner 缓冲区大小 log.cleaner.io.buffer.size524288 # 512KB四、基于时间和大小的日志保留4.1 保留策略对比【日志保留策略矩阵】 策略 │ 配置项 │ 触发条件 ────────────────┼─────────────────────────┼────────────────────── 基于时间删除 │ log.retention.hours │ 日志Segment的 │ │ log.retention.ms │ 最后修改时间超过阈值 │ │ (默认 168小时 7天) │ │ ────────────────┼─────────────────────────┼────────────────────── 基于大小删除 │ log.retention.bytes │ 所有Segment总大小 │ │ (默认 -1 不限制) │ 超过阈值时删除最老的 │ │ │ Segment │ ────────────────┼─────────────────────────┼────────────────────── Log Compaction │ cleanup.policycompact │ 每个Key只保留最新值 │ │ │ (不按时间/大小删除) │ ────────────────┼─────────────────────────┼────────────────────── 混合模式 │ cleanup.policy │ 先压实再按时间删除 │ │ compact,delete │ (Compaction保留最新 │ │ │ 过期的最新值也删除) │4.2 保留策略执行流程【基于时间的保留策略执行】 ┌─────────────────────────────────────────────────────┐ │ LogManager 后台线程每 5 分钟触发一次 │ │ │ │ for each Partition: │ │ 1. 获取所有 Segment │ │ 2. 从最老的 Segment 开始检查 │ │ if (now - segment.lastModifiedTime │ │ log.retention.ms): │ │ → 标记该 Segment 为 待删除 │ │ 3. 批量删除被标记的 Segment │ │ 不能删除当前活跃的 Segment │ │ │ │ 注意 │ │ • 时间计算基于 Segment 文件的最后修改时间 │ │ • 不是消息里的时间戳 │ │ • 默认保留 7 天168小时 │ └─────────────────────────────────────────────────────┘五、性能影响与最佳实践5.1 存储参数对性能的影响【存储相关参数调优指南】 参数 │ 默认值 │ 建议值 │ 说明 ────────────────────────────┼──────────┼────────────┼──────────────────── log.segment.bytes │ 1GB │ 512MB~1GB │ Segment太大│ (单个Segment大小上限) │ │ │ Compaction慢│ ────────────────────────────┼──────────┼────────────┼──────────────────── log.roll.hours │ 168h │ 24~72h │ 按时间滚动 │ (Segment按时间滚动) │ │ │ Segment │ ────────────────────────────┼──────────┼────────────┼──────────────────── log.index.interval.bytes │ 4096 │ 4096 │ 每4KB消息 │ (索引条目间隔) │ │ │ 建一条索引 │ ────────────────────────────┼──────────┼────────────┼──────────────────── log.retention.hours │ 168 │ 72~168 │ 保留3~7天 │ (基于时间保留) │ │ │ 视业务而定 │ ────────────────────────────┼──────────┼────────────┼──────────────────── log.retention.bytes │ -1 │ -1 │ 不限制总大小 │ (基于大小保留) │ (不限制) │ 或磁盘80% │ 除非磁盘紧张 │ ────────────────────────────┼──────────┼────────────┼──────────────────── log.cleaner.threads │ 1 │ 2~4 │ 压实加速 │ (Compaction线程数) │ │ │ 仅compact策略用│ ────────────────────────────┼──────────┼────────────┼──────────────────── num.recovery.threads.per.data.dir│ 1 │ 2~4 │ 启动时恢复 │ (每个数据目录的恢复线程数) │ │ │ 加速启动 │5.2 磁盘空间规划# 磁盘空间计算公式所需磁盘空间日均消息量 × 每条消息平均大小 × 保留天数 ×(1 副本数)×1.2示例 日均消息量1亿条/天 每条消息平均大小1KB 保留天数7天 副本数3 冗余系数1.2索引Segment碎片 计算1亿 × 1KB ×7×3×1.2≈2.52TB 建议磁盘配置3TB/每块 ×3块9TB3块盘JBOD本篇小结今天我们深入了Kafka的物理存储层目录布局每个Partition一个目录多log.dirs实现JBOD并行I/O消息格式V2批量共享Header 变长编码节省30~40%空间支持事务和幂等性Log Compaction为每个Key保留最新值适合K/V存储场景Cleaner线程用Sketch滑动窗口高效清理保留策略基于时间默认7天、基于大小、Compaction三者可组合使用核心要点Kafka的存储设计哲学是顺序写、批量清、按需压——充分发挥磁盘顺序写性能用后台线程异步清理用Compaction兼顾存储效率和最新值查询。下一篇我们将聚焦页缓存与零拷贝——这才是Kafka快得离谱的真正秘密武器。上一篇【第67篇】Kafka请求处理机制深度解析——生产请求与获取请求的完整链路下一篇【第69篇】Kafka页缓存与零拷贝——多副本下的高性能读写魔法