Kafka源码解析(二)—Log分析
上一篇文章讲了LogSegment和Log的初始化,这篇来讲讲Log的主要操作有哪些。
一般来说Log 的常见操作分为 4 大部分。
- 高水位管理操作
- 日志段管理
- 关键位移值管理
- 读写操作
其中关键位移值管理主要包含Log Start Offset 和 LEO等。
高水位HighWatermark
高水位HighWatermark初始化
高水位是通过LogOffsetMetadata类来定义的:
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
这里传入的初始值是logStartOffset,表明当首次构建高水位时,它会被赋值成 Log Start Offset 值。
我们再来看看LogOffsetMetadata类:
case class LogOffsetMetadata(messageOffset: Long, segmentBaseOffset: Long = Log.UnknownOffset, relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) { // check if this offset is already on an older segment compared with the given offset def onOlderSegment(that: LogOffsetMetadata): Boolean = { if (messageOffsetOnly) throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info") this.segmentBaseOffset < that.segmentBaseOffset } ... }
LogOffsetMetadata有三个初始值:
messageOffset表示消息位移值;
segmentBaseOffset保存消息位移值所在日志段的起始位移,用来判断两条消息是否处于同一个日志段的;
relativePositionSegment保存消息位移值所在日志段的物理磁盘位置;
上面的onOlderSegment表明,要比较哪个日志段更老,只需要比较segmentBaseOffset的大小就可以了。
高水位HighWatermark设值与更新
private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = { //高水位的值不可能小于零 if (newHighWatermark.messageOffset < 0) throw new IllegalArgumentException("High watermark offset should be non-negative") lock synchronized {// 保护Log对象修改的Monitor锁 highWatermarkMetadata = newHighWatermark// 赋值新的高水位值 //事务相关,暂时忽略 producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset) //事务相关,暂时忽略 maybeIncrementFirstUnstableOffset() } trace(s"Setting high watermark $newHighWatermark") }
设置高水位的值是很简单的,首先校验高水位的值是否大于零,然后通过直接加锁之后更新高水位的值。
更新更新高水位值的方法有两个:updateHighWatermark 和 maybeIncrementHighWatermark,我们分别分析。
updateHighWatermark
def updateHighWatermark(hw: Long): Long = { //传入的高水位的值如果小于logStartOffset,设置为logStartOffset val newHighWatermark = if (hw logEndOffset) logEndOffset else hw //将newHighWatermark封装成一个LogOffsetMetadata然后更新高水位的值 updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark)) //返回新的高水位的值 newHighWatermark }
这个方法逻辑也很简洁,因为高水位的值是不可能大于LEO,也不可能小于logStartOffset,所以需要对传入的hw校验然后设置成正确的值,然后调用上面的设置高水位的方法设值。
maybeIncrementHighWatermark
/** * Update the high watermark to a new value if and only if it is larger than the old value. It is * an error to update to a value which is larger than the log end offset. * * This method is intended to be used by the leader to update the high watermark after follower * fetch offsets have been updated. * * @return the old high watermark, if updated by the new value */ // 当新的高水位的值大于旧的高水位的值时才做更新,如果新的高水位的值大于LEO,会报错 // 这个方法是leader在确认Follower已经拉取了日志之后才做更新 def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = { //如果新的高水位的值大于LEO,会报错 if (newHighWatermark.messageOffset > logEndOffset) throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " + s"log end offset $logEndOffsetMetadata") lock.synchronized { // 获取老的高水位值 val oldHighWatermark = fetchHighWatermarkMetadata // Ensure that the high watermark increases monotonically. We also update the high watermark when the new // offset metadata is on a newer segment, which occurs whenever the log is rolled to a new segment. //只有当新的高水位值大于老的值,因为要维护高水位的单调递增性 //或者当新的高水位值和老的高水位值相等,但是新的高水位在一个新的日志段上面时才做更新 if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) { updateHighWatermarkMetadata(newHighWatermark) Some(oldHighWatermark)// 返回老的高水位值 } else { None } } }
这个方法我将这个方法的英文注释贴出来了,这个注释的说明我也写到方法上了,逻辑很清楚,大家看看注释应该能理解。
这两个方法主要的区别是,updateHighWatermark 方法,主要用在 Follower 副本从 Leader 副本获取到消息后更新高水位值。而 maybeIncrementHighWatermark 方法,主要是用来更新 Leader 副本的高水位值。
上面的方法中通过调用fetchHighWatermarkMetadata来获取高水位的值,我们下面看看这个方法:
fetchHighWatermarkMetadata
private def fetchHighWatermarkMetadata: LogOffsetMetadata = { // 读取时确保日志不能被关闭 checkIfMemoryMappedBufferClosed() val offsetMetadata = highWatermarkMetadata if (offsetMetadata.messageOffsetOnly) {//没有获得到完整的高水位元数据 lock.synchronized { // 通过读日志文件的方式把完整的高水位元数据信息拉出来 val fullOffset = convertToOffsetMetadataOrThrow(highWatermark) updateHighWatermarkMetadata(fullOffset) fullOffset } } else { offsetMetadata } } private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata = { //通过给的offset,去日志文件中找到相应的日志信息 val fetchDataInfo = read(offset, maxLength = 1, isolation = FetchLogEnd, minOneMessage = false) fetchDataInfo.fetchOffsetMetadata }
然后我们提前看一下日志的read方法,是如何根据索引读取数据的:
日志段操作
日志读取操作
read
def read(startOffset: Long, maxLength: Int, isolation: FetchIsolation, minOneMessage: Boolean): FetchDataInfo = { maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") { trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes") //convertToOffsetMetadataOrThrow传进来是FetchLogEnd,所以这里是false val includeAbortedTxns = isolation == FetchTxnCommitted // 由于没有使用锁,所以使用变量缓存当前的nextOffsetMetadata状态 val endOffsetMetadata = nextOffsetMetadata val endOffset = endOffsetMetadata.messageOffset // 到日字段中根据索引寻找最近的日志段 var segmentEntry = segments.floorEntry(startOffset) // return error on attempt to read beyond the log end offset or read below log start offset // 这里给出了几种异常场景: // 1. 给的日志索引大于最大值; // 2. 通过索引找的日志段为空; // 3. 给的日志索引小于logStartOffset if (startOffset > endOffset || segmentEntry == null || startOffset endOffsetMetadata case FetchHighWatermark => fetchHighWatermarkMetadata case FetchTxnCommitted => fetchLastStableOffsetMetadata } //如果寻找的索引等于maxOffsetMetadata,那么直接返回 if (startOffset == maxOffsetMetadata.messageOffset) { return emptyFetchDataInfo(maxOffsetMetadata, includeAbortedTxns) //如果寻找的索引大于maxOffsetMetadata,返回空的消息集合,因为没法读取任何消息 } else if (startOffset > maxOffsetMetadata.messageOffset) { val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset) return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns) } // 开始遍历日志段对象,直到读出东西来或者读到日志末尾 while (segmentEntry != null) { val segment = segmentEntry.getValue // 找到日志段中最大的日志位移 val maxPosition = { if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) { maxOffsetMetadata.relativePositionInSegment } else { segment.size } } // 根据位移信息从日志段中读取日志信息 val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage) // 如果找不到日志信息,那么去日志段集合中找更大的日志位移的日志段 if (fetchInfo == null) { segmentEntry = segments.higherEntry(segmentEntry.getKey) } else { return if (includeAbortedTxns) addAbortedTransactions(startOffset, segmentEntry, fetchInfo) else fetchInfo } } //找了所有日志段的位移依然找不到,这可能是因为大于指定的日志位移的消息都被删除了,这种情况返回空 FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY) } }
read方法,有四个参数,分别是:
- startOffset:读取的日志索引位置。
- maxLength:读取数据量长度。
- isolation:隔离级别,多用于 Kafka 事务。
- minOneMessage:是否至少返回一条消息。设想如果消息很大,超过了 maxLength,正常情况下 read 方法永远不会返回任何消息。但如果设置了该参数为 true,read 方法就保证至少能够返回一条消息。
代码中使用了segments,来根据位移查找日志段:
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
我们下面看看read方法具体做了哪些事:
- 由于没有使用锁,所以使用变量缓存当前的nextOffsetMetadata状态,作为最大索引LEO;
- 去日志段集合里寻找小于或等于指定索引的日志段;
- 校验异常情况:
- startOffset是不是超过了LEO;
- 是不是日志段集合里没有索引小于startOffset;
- startOffset小于Log Start Offset;
- 接下来获取一下隔离级别;
- 如果寻找的索引等于LEO,那么返回空;
- 如果寻找的索引大于LEO,返回空的消息集合,因为没法读取任何消息;
- 开始遍历日志段对象,直到读出东西来或者读到日志末尾;
- 首先找到日志段中最大的位置;
- 根据位移信息从日志段中读取日志信息(这个read方法我们上一篇已经讲解过了);
- 如果找不到日志信息,那么读取日志段集合中下一个日志段;
- 找了所有日志段的位移依然找不到,这可能是因为大于指定的日志位移的消息都被删除了,这种情况返回空;
我们在上面的read操作中可以看到,使用了segments来查找日志。我们主要看看删除操作
删除日志
删除日志的入口是: deleteOldSegments
// 如果topic deletion开关是打开的,那么会删去过期的日志段以及超过设置保留日志大小的日志 // 无论是否开启删除规则,都会删除在log start offset之前的日志段 def deleteOldSegments(): Int = { if (config.delete) { deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments() } else { deleteLogStartOffsetBreachedSegments() } }
deleteOldSegments方法会判断是否开启删除规则,如果开启,那么会分别调用:
deleteRetentionMsBreachedSegments删除segment的时间戳超过了设置时间的日志段;
deleteRetentionSizeBreachedSegments删除日志段空间超过设置空间大小的日志段;
deleteLogStartOffsetBreachedSegments删除日志段的baseOffset小于logStartOffset的日志段;
我这里列举一下这三个方法主要是怎么实现的:
private def deleteRetentionMsBreachedSegments(): Int = { if (config.retentionMs startMs - segment.largestTimestamp > config.retentionMs, reason = s"retention time ${config.retentionMs}ms breach") } private def deleteRetentionSizeBreachedSegments(): Int = { if (config.retentionSize < 0 || size = 0) { diff -= segment.size true } else { false } } deleteOldSegments(shouldDelete, reason = s"retention size in bytes ${config.retentionSize} breach") } private def deleteLogStartOffsetBreachedSegments(): Int = { //shouldDelete函数主要判断日志段的baseOffset是否小于logStartOffset def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]) = nextSegmentOpt.exists(_.baseOffset <= logStartOffset) deleteOldSegments(shouldDelete, reason = s"log start offset $logStartOffset breach") }
这种写代码的方式非常的灵活,通过不同方法设置不同的函数来实现代码复用的目的,最后都是通过调用deleteOldSegments来实现删除日志段的目的。
下面我们来看一下deleteOldSegments的操作:
deleteOldSegments
这个deleteOldSegments方法和上面的入口方法传入的参数是不一致的,这个方法传入了一个predicate函数,用于判断哪些日志段是可以被删除的,reason用来说明被删除的原因。
private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = { //删除任何匹配到predicate规则的日志段 lock synchronized { val deletable = deletableSegments(predicate) if (deletable.nonEmpty) info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason") deleteSegments(deletable) } }
这个方法调用了两个主要的方法,一个是deletableSegments,用于获取可以被删除的日志段的集合;deleteSegments用于删除日志段。
deletableSegments
private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = { //如果日志段是空的,那么直接返回 if (segments.isEmpty) { Seq.empty } else { val deletable = ArrayBuffer.empty[LogSegment] var segmentEntry = segments.firstEntry //如果日志段集合不为空,找到第一个日志段 while (segmentEntry != null) { val segment = segmentEntry.getValue //获取下一个日志段 val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey) val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) = if (nextSegmentEntry != null) (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false) else (null, logEndOffset, segment.size == 0) //如果下一个日志段的位移没有大于或等于HW,并且日志段是匹配predicate函数的,下一个日志段也不是空的 //那么将这个日志段放入可删除集合中,然后遍历下一个日志段 if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) { deletable += segment segmentEntry = nextSegmentEntry } else { segmentEntry = null } } deletable } }
这个方法逻辑十分清晰,主要做了如下几件事:
-
判断日志段集合是否为空,为空那么直接返回空集合;
-
如果日志段集合不为空,那么从日志段集合的第一个日志段开始遍历;
-
判断当前被遍历日志段是否能够被删除
- 日志段的下一个日志段的位移有没有大于或等于HW;
- 日志段是否能够通过predicate函数校验;
- 日志段是否是最后一个日志段;
-
将符合条件的日志段都加入到deletable集合中,并返回。
接下来调用deleteSegments函数:
private def deleteSegments(deletable: Iterable[LogSegment]): Int = { maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") { val numToDelete = deletable.size if (numToDelete > 0) { // we must always have at least one segment, so if we are going to delete all the segments, create a new one first // 我们至少保证要存在一个日志段,如果要删除所有的日志; //所以调用roll方法创建一个全新的日志段对象,并且关闭当前写入的日志段对象; if (segments.size == numToDelete) roll() lock synchronized { // 确保Log对象没有被关闭 checkIfMemoryMappedBufferClosed() // remove the segments for lookups // 删除给定的日志段对象以及底层的物理文件 removeAndDeleteSegments(deletable, asyncDelete = true) // 尝试更新日志的Log Start Offset值 maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset) } } numToDelete } }
写日志
写日志的方法主要有两个:
appendAsLeader
def appendAsLeader(records: MemoryRecords, leaderEpoch: Int, isFromClient: Boolean = true, interBrokerProtocolVersion: ApiVersion = ApiVersion.latestVersion): LogAppendInfo = { append(records, isFromClient, interBrokerProtocolVersion, assignOffsets = true, leaderEpoch) }
appendAsFollower
def appendAsFollower(records: MemoryRecords): LogAppendInfo = { append(records, isFromClient = false, interBrokerProtocolVersion = ApiVersion.latestVersion, assignOffsets = false, leaderEpoch = -1) }
appendAsLeader 是用于写 Leader 副本的,appendAsFollower 是用于 Follower 副本同步的。它们的底层都调用了 append 方法
append
private def append(records: MemoryRecords, isFromClient: Boolean, interBrokerProtocolVersion: ApiVersion, assignOffsets: Boolean, leaderEpoch: Int): LogAppendInfo = { maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") { // 第1步:分析和验证待写入消息集合,并返回校验结果 val appendInfo = analyzeAndValidateRecords(records, isFromClient = isFromClient) // return if we have no valid messages or if this is a duplicate of the last appended entry // 如果压根就不需要写入任何消息,直接返回即可 if (appendInfo.shallowCount == 0) return appendInfo // trim any invalid bytes or partial messages before appending it to the on-disk log // 第2步:消息格式规整,即删除无效格式消息或无效字节 var validRecords = trimInvalidBytes(records, appendInfo) // they are valid, insert them in the log lock synchronized { // 确保Log对象未关闭 checkIfMemoryMappedBufferClosed() //需要分配位移值 if (assignOffsets) { // assign offsets to the message set // 第3步:使用当前LEO值作为待写入消息集合中第一条消息的位移值,nextOffsetMetadata为LEO值 val offset = new LongRef(nextOffsetMetadata.messageOffset) appendInfo.firstOffset = Some(offset.value) val now = time.milliseconds val validateAndOffsetAssignResult = try { LogValidator.validateMessagesAndAssignOffsets(validRecords, topicPartition, offset, time, now, appendInfo.sourceCodec, appendInfo.targetCodec, config.compact, config.messageFormatVersion.recordVersion.value, config.messageTimestampType, config.messageTimestampDifferenceMaxMs, leaderEpoch, isFromClient, interBrokerProtocolVersion, brokerTopicStats) } catch { case e: IOException => throw new KafkaException(s"Error validating messages while appending to log $name", e) } // 更新校验结果对象类LogAppendInfo validRecords = validateAndOffsetAssignResult.validatedRecords appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp appendInfo.lastOffset = offset.value - 1 appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME) appendInfo.logAppendTime = now // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message // format conversion) // 第4步:验证消息,确保消息大小不超限 if (validateAndOffsetAssignResult.messageSizeMaybeChanged) { for (batch config.maxMessageSize) { // we record the original message set size instead of the trimmed size // to be consistent with pre-compression bytesRejectedRate recording brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" + s"partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.") } } } // 直接使用给定的位移值,无需自己分配位移值 } else { // we are taking the offsets we are given if (!appendInfo.offsetsMonotonic)// 确保消息位移值的单调递增性 throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " + records.records.asScala.map(_.offset)) if (appendInfo.firstOrLastOffsetOfFirstBatch offset case None => records.batches.asScala.head.baseOffset() } val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch" throw new UnexpectedAppendOffsetException( s"Unexpected offset in append to $topicPartition. $firstOrLast " + s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " + s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" + s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset", firstOffset, appendInfo.lastOffset) } } // update the epoch cache with the epoch stamped onto the message by the leader // 第5步:更新Leader Epoch缓存 validRecords.batches.asScala.foreach { batch => if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) { maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset) } else { // In partial upgrade scenarios, we may get a temporary regression to the message format. In // order to ensure the safety of leader election, we clear the epoch cache so that we revert // to truncation by high watermark after the next leader election. leaderEpochCache.filter(_.nonEmpty).foreach { cache => warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}") cache.clearAndFlush() } } } // check messages set size may be exceed config.segmentSize // 第6步:确保消息大小不超限 if (validRecords.sizeInBytes > config.segmentSize) { throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " + s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.") } // maybe roll the log if this segment is full // 第7步:执行日志切分。当前日志段剩余容量可能无法容纳新消息集合,因此有必要创建一个新的日志段来保存待写入的所有消息 //下面情况将会执行日志切分: //logSegment 已经满了 //日志段中的第一个消息的maxTime已经过期 //index索引满了 val segment = maybeRoll(validRecords.sizeInBytes, appendInfo) val logOffsetMetadata = LogOffsetMetadata( messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch, segmentBaseOffset = segment.baseOffset, relativePositionInSegment = segment.size) // now that we have valid records, offsets assigned, and timestamps updated, we need to // validate the idempotent/transactional state of the producers and collect some metadata // 第8步:验证事务状态 val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState( logOffsetMetadata, validRecords, isFromClient) maybeDuplicate.foreach { duplicate => appendInfo.firstOffset = Some(duplicate.firstOffset) appendInfo.lastOffset = duplicate.lastOffset appendInfo.logAppendTime = duplicate.timestamp appendInfo.logStartOffset = logStartOffset return appendInfo } // 第9步:执行真正的消息写入操作,主要调用日志段对象的append方法实现 segment.append(largestOffset = appendInfo.lastOffset, largestTimestamp = appendInfo.maxTimestamp, shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp, records = validRecords) // Increment the log end offset. We do this immediately after the append because a // write to the transaction index below may fail and we want to ensure that the offsets // of future appends still grow monotonically. The resulting transaction index inconsistency // will be cleaned up after the log directory is recovered. Note that the end offset of the // ProducerStateManager will not be updated and the last stable offset will not advance // if the append to the transaction index fails. // 第10步:更新LEO对象,其中,LEO值是消息集合中最后一条消息位移值+1 // 前面说过,LEO值永远指向下一条不存在的消息 updateLogEndOffset(appendInfo.lastOffset + 1) // update the producer state // 第11步:更新事务状态 for (producerAppendInfo <- updatedProducers.values) { producerStateManager.update(producerAppendInfo) } // update the transaction index with the true last stable offset. The last offset visible // to consumers using READ_COMMITTED will be limited by this value and the high watermark. for (completedTxn = config.flushInterval) flush() // 第12步:返回写入结果 appendInfo } } }
上面代码的主要步骤如下:
我们下面看看analyzeAndValidateRecords是如何进行消息校验的:
analyzeAndValidateRecords
private def analyzeAndValidateRecords(records: MemoryRecords, isFromClient: Boolean): LogAppendInfo = { var shallowMessageCount = 0 var validBytesCount = 0 var firstOffset: Option[Long] = None var lastOffset = -1L var sourceCodec: CompressionCodec = NoCompressionCodec var monotonic = true var maxTimestamp = RecordBatch.NO_TIMESTAMP var offsetOfMaxTimestamp = -1L var readFirstMessage = false var lastOffsetOfFirstBatch = -1L for (batch = RecordBatch.MAGIC_VALUE_V2 && isFromClient && batch.baseOffset != 0) throw new InvalidRecordException(s"The baseOffset of the record batch in the append to $topicPartition should " + s"be 0, but it is ${batch.baseOffset}") // update the first offset if on the first message. For magic versions older than 2, we use the last offset // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message). // For magic version 2, we can get the first offset directly from the batch header. // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower // case, validation will be more lenient. // Also indicate whether we have the accurate first offset or not if (!readFirstMessage) { if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) firstOffset = Some(batch.baseOffset) // 更新firstOffset字段 lastOffsetOfFirstBatch = batch.lastOffset // 更新lastOffsetOfFirstBatch字段 readFirstMessage = true } // check that offsets are monotonically increasing // 一旦出现当前lastOffset不小于下一个batch的lastOffset,说明上一个batch中有消息的位移值大于后面batch的消息 // 这违反了位移值单调递增性 if (lastOffset >= batch.lastOffset) monotonic = false // update the last offset seen // 使用当前batch最后一条消息的位移值去更新lastOffset lastOffset = batch.lastOffset // Check if the message sizes are valid. val batchSize = batch.sizeInBytes // 检查消息批次总字节数大小是否超限,即是否大于Broker端参数max.message.bytes值 if (batchSize > config.maxMessageSize) { brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes) brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes) throw new RecordTooLargeException(s"The record batch size in the append to $topicPartition is $batchSize bytes " + s"which exceeds the maximum configured value of ${config.maxMessageSize}.") } // check the validity of the message by checking CRC // 执行消息批次校验,包括格式是否正确以及CRC校验 if (!batch.isValid) { brokerTopicStats.allTopicsStats.invalidMessageCrcRecordsPerSec.mark() throw new CorruptRecordException(s"Record is corrupt (stored crc = ${batch.checksum()}) in topic partition $topicPartition.") } // 更新maxTimestamp字段和offsetOfMaxTimestamp if (batch.maxTimestamp > maxTimestamp) { maxTimestamp = batch.maxTimestamp offsetOfMaxTimestamp = lastOffset } // 累加消息批次计数器以及有效字节数,更新shallowMessageCount字段 shallowMessageCount += 1 validBytesCount += batchSize // 从消息批次中获取压缩器类型 val messageCodec = CompressionCodec.getCompressionCodec(batch.compressionType.id) if (messageCodec != NoCompressionCodec) sourceCodec = messageCodec } // Apply broker-side compression if any // 获取Broker端设置的压缩器类型,即Broker端参数compression.type值。 // 该参数默认值是producer,表示sourceCodec用的什么压缩器,targetCodec就用什么 val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec) // 最后生成LogAppendInfo对象并返回 LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, RecordBatch.NO_TIMESTAMP, logStartOffset, RecordConversionStats.EMPTY, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic, lastOffsetOfFirstBatch) }