上一篇【第42篇】Kafka日志存储源码解析一——消息是怎么被写入磁盘的下一篇【第44篇】Kafka日志存储源码解析三——OffsetIndex稀疏索引的秘密武器摘要上篇我们了解了消息是如何被顺序写入磁盘的。本文将深入LogSegment——Kafka日志存储的第二层核心抽象。每个Partition在磁盘上被切分成多个Segment每个Segment包含一对文件.log日志文件和.index偏移量索引文件。这种分段设计让Kafka能够在保持高性能的同时实现高效的日志清理、数据查找和崩溃恢复。本文将详解LogSegment的源码实现、Segment的滚动机制以及为什么Kafka选择这样的分段策略。一、为什么需要Segment分段在深入源码之前先理解为什么Kafka要把一个Partition的日志切分成多个Segment1.1 单文件的困境【如果只用单个日志文件】 partition-0/ └── 00000000000000000000.log (500GB!) 问题 ① 文件太大无法全部mmap到内存 ② 清理旧数据时只能遍历整个文件效率极低 ③ 崩溃恢复需要扫描整个文件 ④ 索引文件也会无限增大1.2 Segment分段的解法【Kafka的分段方案】 partition-0/ ├── 00000000000000000000.log (1GB) ├── 00000000000000000000.index ├── 00000000000001048576.log (1GB) ├── 00000000000001048576.index ├── 00000000000002097152.log (1GB) ├── 00000000000002097152.index └── ... 优势 ✓ 每个Segment大小可控可以mmap到内存 ✓ 清理旧数据只需删除整个Segment文件 ✓ 崩溃恢复只需扫描最新的几个Segment ✓ 索引文件大小也可控二、LogSegment核心源码解析2.1 类结构一览// LogSegment.scala (核心字段简化版)classLogSegment(vallog:FileMessageSet,// 对应的日志文件valindex:OffsetIndex,// 对应的偏移量索引valtimeIndex:TimeIndex,// 时间戳索引0.10valbaseOffset:Long,// 此Segment的起始offsetvalindexIntervalBytes:Int,// 两个索引项之间的最小字节间隔valbytesSinceLastIndexEntry:Int0){// 自上次建索引以来的字节数// 核心方法defappend(firstOffset:Long,messages:ByteBufferMessageSet):Unitdefread(startOffset:Long,maxOffset:Option[Long],maxSize:Int,maxPosition:LongInt.MaxValue):FetchDataInfodefrecover(maxMessageSize:Int):IntdeftruncateTo(offset:Long):IntdefreadNextOffset:Long}字段类型说明logFileMessageSet封装了.log文件的读写操作indexOffsetIndex封装了.index偏移量索引文件timeIndexTimeIndex封装了.timeindex时间戳索引文件0.10baseOffsetLong此Segment的起始offset也是文件名indexIntervalBytesInt每隔多少字节建一个索引项默认4096bytesSinceLastIndexEntryInt自上次建索引后累计写入的字节数2.2 append()——向Segment追加消息这是消息写入的核心路径每收到一批消息就会调用此方法// LogSegment.append() 源码解析defappend(firstOffset:Long,messages:ByteBufferMessageSet):Unit{// ① 检查当前Segment是否已有数据且第一条消息的offset必须连续if(log.sizeInBytes0){// 校验offset的连续性防止数据损坏require(lastOffset0firstOffsetlastOffset1,Attempt to append out of order messages)}// ② 判断是否需要在写入消息前先建立索引// 规则如果距上次建索引已写入超过 indexIntervalBytes 字节则建索引if(bytesSinceLastIndexEntryindexIntervalBytes){index.append(firstOffset,log.sizeInBytes)timeIndex.maybeAppend(messages.maxTimestamp,firstOffset)bytesSinceLastIndexEntry0// 重置计数器}// ③ 将消息追加到.log文件调用FileMessageSet.appendlog.append(messages)// ④ 更新字节计数器bytesSinceLastIndexEntrymessages.sizeInBytes// ⑤ 更新最大时间戳用于时间戳索引if(messages.maxTimestampmaxTimestamp)maxTimestampmessages.maxTimestamp}【append() 执行流程图】 调用 append(firstOffset100, messages) │ ▼ ┌──────────────────────────────────────────────┐ │ ① 校验offset连续性 │ │ lastOffset99, firstOffset100 ✓ │ └──────────────┬───────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────┐ │ ② 判断是否需要建索引 │ │ bytesSinceLastIndexEntry5000 │ │ indexIntervalBytes4096 │ │ 5000 4096 → 需要建索引 ✓ │ │ → index.append(100, currentPosition) │ │ → bytesSinceLastIndexEntry 0 │ └──────────────┬───────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────┐ │ ③ 追加到.log文件 │ │ log.append(messages) │ │ → FileChannel.write() │ └──────────────┬───────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────┐ │ ④ 更新计数器 │ │ bytesSinceLastIndexEntry messages.size │ └──────────────────────────────────────────────┘2.3 read()——从Segment读取消息消费者拉取消息时最终会调用此方法// LogSegment.read() 源码解析defread(startOffset:Long,maxOffset:Option[Long],maxSize:Int,maxPosition:Long):FetchDataInfo{// ① 通过OffsetIndex查找startOffset对应的物理位置valstartPositionindex.lookup(startOffset)// startPosition OffsetPosition(physicalOffset, offset)// ② 从物理位置开始读取消息valfetchInfolog.read(startPosition.position,maxSize)// ③ 封装返回结果FetchDataInfo(offsetMetadata,fetchInfo)}三、Segment滚动机制——何时切新文件Segment滚动Rolling是指当前活跃的Segment写满了需要创建一个新的Segment来接收后续消息。3.1 滚动触发条件Kafka在每次append之后都会检查是否需要滚动触发条件有三个【Segment滚动触发条件】 条件1: 当前Segment大小 ≥ log.segment.bytes (默认1GB) │ ▼ 条件2: 当前Segment创建时间 ≥ log.roll.ms (默认7天) │ ▼ 条件3: 索引文件已满无法再写入新的索引项 │ ▼ 满足以上任一条件 → 触发滚动创建新的activeSegment3.2 滚动过程源码// Log.scala - maybeRoll() 方法简化版defmaybeRoll(messagesSize:Int):LogSegment{valsegmentactiveSegment// 当前活跃的Segment// 检查三个滚动条件if(segment.sizeconfig.segmentSize-messagesSize||segment.timeWaitedconfig.segmentMs||segment.index.isFull){// 执行滚动roll()}else{segment// 不需要滚动返回当前Segment}}defroll():LogSegment{// ① 获取新的baseOffset即当前LEOvalnewOffsetlogEndOffset// ② 创建新的.log和.index文件valnewSegmentnewLogSegment(FileMessageSet(newOffset,...),OffsetIndex(newOffset,...),...)// ③ 将新Segment加入segments映射表segments.put(newOffset,newSegment)// ④ 返回新的activeSegmentnewSegment}【Segment滚动时序图】 Producer发送消息 │ ▼ Log.append() │ ▼ Log.maybeRoll() │ ├── 检查条件1: size 1GB? │ ├── 检查条件2: age 7天? │ └── 检查条件3: index isFull? │ ▼ (任一条件满足) ┌──────────────────────────────────┐ │ 创建新Segment │ │ - 新文件: 00000...N.log │ │ - 新索引: 00000...N.index │ │ - baseOffset 当前LEO │ └──────────────┬───────────────────┘ │ ▼ 后续消息写入新的activeSegment四、recover()——崩溃恢复时重建索引当Broker崩溃重启后Kafka需要重建内存中的索引数据因为索引文件可能不是最新的。这就是recover()方法的作用。4.1 为什么需要recover【崩溃恢复场景】 正常情况 写入消息 → 更新.log文件 → 更新.index文件内存mmap → 定期msync()刷到磁盘 崩溃时 .log文件已写入Page Cache可能没有刷盘但大部分还在 .index文件可能没有完全刷盘 → 需要重新扫描.log重建.index4.2 recover()源码解析// LogSegment.recover() 源码解析defrecover(maxMessageSize:Int):Int{varvalidBytes0valiterlog.iterator(maxMessageSize)// 遍历.log文件中的所有消息try{while(iter.hasNext){valentryiter.next()valmessageentry.message// 校验消息完整性CRC32、size等message.ensureValid()// 判断是否需要为此消息建立索引if(validBytes-lastIndexEntryindexIntervalBytes){// 建索引记录此offset对应的物理位置index.append(entry.offset,validBytes)timeIndex.maybeAppend(message.timestamp,entry.offset)lastIndexEntryvalidBytes}validBytesmessage.sizeInBytes// 累计有效字节数}}catch{casee:InvalidMessageException// 遇到损坏的消息停止扫描后续消息也不可信}// 截断.log文件到validBytes位置丢弃损坏部分log.truncateTo(validBytes)index.trimToValidSize()validBytes// 返回有效字节数}五、分段设计的精妙之处5.1 稀疏索引 分段 高效查找【查找offset1030的消息】 步骤1: 在segments中二分查找 → 找到baseOffset0的Segment因为0 ≤ 1030 1024? 不对... → 找到baseOffset1024的Segment 步骤2: 在OffsetIndex中二分查找 → 找到offset ≤ 1030的最大索引项 → 假设找到: offset1028, position56000 步骤3: 从position56000开始顺序扫描 → 读取每条消息的offset直到找到offset10305.2 分段 vs 不分段的对比维度单文件方案Kafka分段方案文件大小可能无限增长每个Segment ≤ 1GB查找效率O(N) 全扫描O(log S) 稀疏索引清理效率需要compact整个文件直接删除整个旧Segment崩溃恢复扫描整个文件只扫描最新几个Segmentmmap支持大文件无法mmap每个Segment可mmap到内存六、核心参数总结参数默认值说明log.segment.bytes1073741824 (1GB)单个Segment的最大字节数log.roll.ms604800000 (7天)Segment滚动的时间间隔log.index.interval.bytes4096每隔多少字节建立一个索引项log.index.size.max.bytes10485760 (10MB)索引文件的最大字节数本篇小结本文深入解析了Kafka LogSegment的源码实现分段设计将Partition切分成多个Segment每个Segment包含.log.index两个文件append()追加消息时根据indexIntervalBytes间隔建立稀疏索引read()读取消息时先通过OffsetIndex查找物理位置再顺序扫描滚动机制满足大小/时间/索引满三个条件之一时创建新的activeSegmentrecover()崩溃恢复时重建索引保证索引与日志的一致性下一篇文章我们将深入OffsetIndex的源码实现看看Kafka的稀疏索引是如何在内存中使用mmap加速查找的。上一篇【第42篇】Kafka日志存储源码解析一——消息是怎么被写入磁盘的下一篇【第44篇】Kafka日志存储源码解析三——OffsetIndex稀疏索引的秘密武器
【Kafka源码解读和使用指南】第43篇:Kafka日志存储源码解析(二)——Segment分段存储的精妙设计
上一篇【第42篇】Kafka日志存储源码解析一——消息是怎么被写入磁盘的下一篇【第44篇】Kafka日志存储源码解析三——OffsetIndex稀疏索引的秘密武器摘要上篇我们了解了消息是如何被顺序写入磁盘的。本文将深入LogSegment——Kafka日志存储的第二层核心抽象。每个Partition在磁盘上被切分成多个Segment每个Segment包含一对文件.log日志文件和.index偏移量索引文件。这种分段设计让Kafka能够在保持高性能的同时实现高效的日志清理、数据查找和崩溃恢复。本文将详解LogSegment的源码实现、Segment的滚动机制以及为什么Kafka选择这样的分段策略。一、为什么需要Segment分段在深入源码之前先理解为什么Kafka要把一个Partition的日志切分成多个Segment1.1 单文件的困境【如果只用单个日志文件】 partition-0/ └── 00000000000000000000.log (500GB!) 问题 ① 文件太大无法全部mmap到内存 ② 清理旧数据时只能遍历整个文件效率极低 ③ 崩溃恢复需要扫描整个文件 ④ 索引文件也会无限增大1.2 Segment分段的解法【Kafka的分段方案】 partition-0/ ├── 00000000000000000000.log (1GB) ├── 00000000000000000000.index ├── 00000000000001048576.log (1GB) ├── 00000000000001048576.index ├── 00000000000002097152.log (1GB) ├── 00000000000002097152.index └── ... 优势 ✓ 每个Segment大小可控可以mmap到内存 ✓ 清理旧数据只需删除整个Segment文件 ✓ 崩溃恢复只需扫描最新的几个Segment ✓ 索引文件大小也可控二、LogSegment核心源码解析2.1 类结构一览// LogSegment.scala (核心字段简化版)classLogSegment(vallog:FileMessageSet,// 对应的日志文件valindex:OffsetIndex,// 对应的偏移量索引valtimeIndex:TimeIndex,// 时间戳索引0.10valbaseOffset:Long,// 此Segment的起始offsetvalindexIntervalBytes:Int,// 两个索引项之间的最小字节间隔valbytesSinceLastIndexEntry:Int0){// 自上次建索引以来的字节数// 核心方法defappend(firstOffset:Long,messages:ByteBufferMessageSet):Unitdefread(startOffset:Long,maxOffset:Option[Long],maxSize:Int,maxPosition:LongInt.MaxValue):FetchDataInfodefrecover(maxMessageSize:Int):IntdeftruncateTo(offset:Long):IntdefreadNextOffset:Long}字段类型说明logFileMessageSet封装了.log文件的读写操作indexOffsetIndex封装了.index偏移量索引文件timeIndexTimeIndex封装了.timeindex时间戳索引文件0.10baseOffsetLong此Segment的起始offset也是文件名indexIntervalBytesInt每隔多少字节建一个索引项默认4096bytesSinceLastIndexEntryInt自上次建索引后累计写入的字节数2.2 append()——向Segment追加消息这是消息写入的核心路径每收到一批消息就会调用此方法// LogSegment.append() 源码解析defappend(firstOffset:Long,messages:ByteBufferMessageSet):Unit{// ① 检查当前Segment是否已有数据且第一条消息的offset必须连续if(log.sizeInBytes0){// 校验offset的连续性防止数据损坏require(lastOffset0firstOffsetlastOffset1,Attempt to append out of order messages)}// ② 判断是否需要在写入消息前先建立索引// 规则如果距上次建索引已写入超过 indexIntervalBytes 字节则建索引if(bytesSinceLastIndexEntryindexIntervalBytes){index.append(firstOffset,log.sizeInBytes)timeIndex.maybeAppend(messages.maxTimestamp,firstOffset)bytesSinceLastIndexEntry0// 重置计数器}// ③ 将消息追加到.log文件调用FileMessageSet.appendlog.append(messages)// ④ 更新字节计数器bytesSinceLastIndexEntrymessages.sizeInBytes// ⑤ 更新最大时间戳用于时间戳索引if(messages.maxTimestampmaxTimestamp)maxTimestampmessages.maxTimestamp}【append() 执行流程图】 调用 append(firstOffset100, messages) │ ▼ ┌──────────────────────────────────────────────┐ │ ① 校验offset连续性 │ │ lastOffset99, firstOffset100 ✓ │ └──────────────┬───────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────┐ │ ② 判断是否需要建索引 │ │ bytesSinceLastIndexEntry5000 │ │ indexIntervalBytes4096 │ │ 5000 4096 → 需要建索引 ✓ │ │ → index.append(100, currentPosition) │ │ → bytesSinceLastIndexEntry 0 │ └──────────────┬───────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────┐ │ ③ 追加到.log文件 │ │ log.append(messages) │ │ → FileChannel.write() │ └──────────────┬───────────────────────────┘ │ ▼ ┌──────────────────────────────────────────────┐ │ ④ 更新计数器 │ │ bytesSinceLastIndexEntry messages.size │ └──────────────────────────────────────────────┘2.3 read()——从Segment读取消息消费者拉取消息时最终会调用此方法// LogSegment.read() 源码解析defread(startOffset:Long,maxOffset:Option[Long],maxSize:Int,maxPosition:Long):FetchDataInfo{// ① 通过OffsetIndex查找startOffset对应的物理位置valstartPositionindex.lookup(startOffset)// startPosition OffsetPosition(physicalOffset, offset)// ② 从物理位置开始读取消息valfetchInfolog.read(startPosition.position,maxSize)// ③ 封装返回结果FetchDataInfo(offsetMetadata,fetchInfo)}三、Segment滚动机制——何时切新文件Segment滚动Rolling是指当前活跃的Segment写满了需要创建一个新的Segment来接收后续消息。3.1 滚动触发条件Kafka在每次append之后都会检查是否需要滚动触发条件有三个【Segment滚动触发条件】 条件1: 当前Segment大小 ≥ log.segment.bytes (默认1GB) │ ▼ 条件2: 当前Segment创建时间 ≥ log.roll.ms (默认7天) │ ▼ 条件3: 索引文件已满无法再写入新的索引项 │ ▼ 满足以上任一条件 → 触发滚动创建新的activeSegment3.2 滚动过程源码// Log.scala - maybeRoll() 方法简化版defmaybeRoll(messagesSize:Int):LogSegment{valsegmentactiveSegment// 当前活跃的Segment// 检查三个滚动条件if(segment.sizeconfig.segmentSize-messagesSize||segment.timeWaitedconfig.segmentMs||segment.index.isFull){// 执行滚动roll()}else{segment// 不需要滚动返回当前Segment}}defroll():LogSegment{// ① 获取新的baseOffset即当前LEOvalnewOffsetlogEndOffset// ② 创建新的.log和.index文件valnewSegmentnewLogSegment(FileMessageSet(newOffset,...),OffsetIndex(newOffset,...),...)// ③ 将新Segment加入segments映射表segments.put(newOffset,newSegment)// ④ 返回新的activeSegmentnewSegment}【Segment滚动时序图】 Producer发送消息 │ ▼ Log.append() │ ▼ Log.maybeRoll() │ ├── 检查条件1: size 1GB? │ ├── 检查条件2: age 7天? │ └── 检查条件3: index isFull? │ ▼ (任一条件满足) ┌──────────────────────────────────┐ │ 创建新Segment │ │ - 新文件: 00000...N.log │ │ - 新索引: 00000...N.index │ │ - baseOffset 当前LEO │ └──────────────┬───────────────────┘ │ ▼ 后续消息写入新的activeSegment四、recover()——崩溃恢复时重建索引当Broker崩溃重启后Kafka需要重建内存中的索引数据因为索引文件可能不是最新的。这就是recover()方法的作用。4.1 为什么需要recover【崩溃恢复场景】 正常情况 写入消息 → 更新.log文件 → 更新.index文件内存mmap → 定期msync()刷到磁盘 崩溃时 .log文件已写入Page Cache可能没有刷盘但大部分还在 .index文件可能没有完全刷盘 → 需要重新扫描.log重建.index4.2 recover()源码解析// LogSegment.recover() 源码解析defrecover(maxMessageSize:Int):Int{varvalidBytes0valiterlog.iterator(maxMessageSize)// 遍历.log文件中的所有消息try{while(iter.hasNext){valentryiter.next()valmessageentry.message// 校验消息完整性CRC32、size等message.ensureValid()// 判断是否需要为此消息建立索引if(validBytes-lastIndexEntryindexIntervalBytes){// 建索引记录此offset对应的物理位置index.append(entry.offset,validBytes)timeIndex.maybeAppend(message.timestamp,entry.offset)lastIndexEntryvalidBytes}validBytesmessage.sizeInBytes// 累计有效字节数}}catch{casee:InvalidMessageException// 遇到损坏的消息停止扫描后续消息也不可信}// 截断.log文件到validBytes位置丢弃损坏部分log.truncateTo(validBytes)index.trimToValidSize()validBytes// 返回有效字节数}五、分段设计的精妙之处5.1 稀疏索引 分段 高效查找【查找offset1030的消息】 步骤1: 在segments中二分查找 → 找到baseOffset0的Segment因为0 ≤ 1030 1024? 不对... → 找到baseOffset1024的Segment 步骤2: 在OffsetIndex中二分查找 → 找到offset ≤ 1030的最大索引项 → 假设找到: offset1028, position56000 步骤3: 从position56000开始顺序扫描 → 读取每条消息的offset直到找到offset10305.2 分段 vs 不分段的对比维度单文件方案Kafka分段方案文件大小可能无限增长每个Segment ≤ 1GB查找效率O(N) 全扫描O(log S) 稀疏索引清理效率需要compact整个文件直接删除整个旧Segment崩溃恢复扫描整个文件只扫描最新几个Segmentmmap支持大文件无法mmap每个Segment可mmap到内存六、核心参数总结参数默认值说明log.segment.bytes1073741824 (1GB)单个Segment的最大字节数log.roll.ms604800000 (7天)Segment滚动的时间间隔log.index.interval.bytes4096每隔多少字节建立一个索引项log.index.size.max.bytes10485760 (10MB)索引文件的最大字节数本篇小结本文深入解析了Kafka LogSegment的源码实现分段设计将Partition切分成多个Segment每个Segment包含.log.index两个文件append()追加消息时根据indexIntervalBytes间隔建立稀疏索引read()读取消息时先通过OffsetIndex查找物理位置再顺序扫描滚动机制满足大小/时间/索引满三个条件之一时创建新的activeSegmentrecover()崩溃恢复时重建索引保证索引与日志的一致性下一篇文章我们将深入OffsetIndex的源码实现看看Kafka的稀疏索引是如何在内存中使用mmap加速查找的。上一篇【第42篇】Kafka日志存储源码解析一——消息是怎么被写入磁盘的下一篇【第44篇】Kafka日志存储源码解析三——OffsetIndex稀疏索引的秘密武器