Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MINOR: Add log identifier/prefix printing in Log layer static functions #10742

Merged
merged 4 commits into from
May 24, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 31 additions & 22 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -557,7 +557,7 @@ class Log(@volatile private var _dir: File,
}

private def initializeLeaderEpochCache(): Unit = lock synchronized {
leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion)
leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, recordVersion, logIdent)
}

private def updateLogEndOffset(offset: Long): Unit = {
Expand Down Expand Up @@ -592,7 +592,7 @@ class Log(@volatile private var _dir: File,
producerStateManager: ProducerStateManager): Unit = lock synchronized {
checkIfMemoryMappedBufferClosed()
Log.rebuildProducerState(producerStateManager, segments, logStartOffset, lastOffset, recordVersion, time,
reloadFromCleanShutdown = false)
reloadFromCleanShutdown = false, logIdent)
}

def activeProducers: Seq[DescribeProducersResponseData.ProducerState] = {
Expand Down Expand Up @@ -1888,14 +1888,14 @@ class Log(@volatile private var _dir: File,

private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = {
Log.deleteSegmentFiles(segments, asyncDelete, deleteProducerStateSnapshots, dir, topicPartition,
config, scheduler, logDirFailureChannel, producerStateManager)
config, scheduler, logDirFailureChannel, producerStateManager, this.logIdent)
}

private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {
lock synchronized {
checkIfMemoryMappedBufferClosed()
Log.replaceSegments(segments, newSegments, oldSegments, isRecoveredSwapFile, dir, topicPartition,
config, scheduler, logDirFailureChannel, producerStateManager)
config, scheduler, logDirFailureChannel, producerStateManager, this.logIdent)
}
}

Expand Down Expand Up @@ -1937,7 +1937,7 @@ class Log(@volatile private var _dir: File,
}

private[log] def splitOverflowedSegment(segment: LogSegment): List[LogSegment] = lock synchronized {
Log.splitOverflowedSegment(segment, segments, dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager)
Log.splitOverflowedSegment(segment, segments, dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager, this.logIdent)
}

}
Expand Down Expand Up @@ -2005,7 +2005,7 @@ object Log extends Logging {
Files.createDirectories(dir.toPath)
val topicPartition = Log.parseTopicPartitionName(dir)
val segments = new LogSegments(topicPartition)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion)
val leaderEpochCache = Log.maybeCreateLeaderEpochCache(dir, topicPartition, logDirFailureChannel, config.messageFormatVersion.recordVersion, this.logIdent)
kowshik marked this conversation as resolved.
Show resolved Hide resolved
val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs)
val offsets = LogLoader.load(LoadLogParams(
dir,
Expand Down Expand Up @@ -2225,13 +2225,15 @@ object Log extends Logging {
* @param dir The directory in which the log will reside
* @param topicPartition The topic partition
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param recordVersion The record version
* @param recordVersion The record
kowshik marked this conversation as resolved.
Show resolved Hide resolved
* @param logPrefix Logging prefix
* @return The new LeaderEpochFileCache instance (if created), none otherwise
*/
def maybeCreateLeaderEpochCache(dir: File,
topicPartition: TopicPartition,
logDirFailureChannel: LogDirFailureChannel,
recordVersion: RecordVersion): Option[LeaderEpochFileCache] = {
recordVersion: RecordVersion,
logPrefix: String): Option[LeaderEpochFileCache] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add the default logPrefix parameter value to empty string? So all the tests don't need to update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introducing a default value can lead to programming error, because we could forget to pass it when it is really needed to be passed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, good to me.

val leaderEpochFile = LeaderEpochCheckpointFile.newFile(dir)

def newLeaderEpochFileCache(): LeaderEpochFileCache = {
Expand All @@ -2246,7 +2248,7 @@ object Log extends Logging {
None

if (currentCache.exists(_.nonEmpty))
warn(s"Deleting non-empty leader epoch cache due to incompatible message format $recordVersion")
warn(s"${logPrefix}Deleting non-empty leader epoch cache due to incompatible message format $recordVersion")

Files.deleteIfExists(leaderEpochFile.toPath)
None
Expand Down Expand Up @@ -2293,6 +2295,7 @@ object Log extends Logging {
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param producerStateManager The ProducerStateManager instance (if any) containing state associated
* with the existingSegments
* @param logPrefix Logging prefix
*/
private[log] def replaceSegments(existingSegments: LogSegments,
newSegments: Seq[LogSegment],
Expand All @@ -2303,7 +2306,8 @@ object Log extends Logging {
config: LogConfig,
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
producerStateManager: ProducerStateManager): Unit = {
producerStateManager: ProducerStateManager,
logPrefix: String): Unit = {
val sortedNewSegments = newSegments.sortBy(_.baseOffset)
// Some old segments may have been removed from index and scheduled for async deletion after the caller reads segments
// but before this method is executed. We want to filter out those segments to avoid calling asyncDeleteSegment()
Expand Down Expand Up @@ -2332,7 +2336,8 @@ object Log extends Logging {
config,
scheduler,
logDirFailureChannel,
producerStateManager)
producerStateManager,
logPrefix)
kowshik marked this conversation as resolved.
Show resolved Hide resolved
}
// okay we are safe now, remove the swap suffix
sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
Expand All @@ -2359,7 +2364,7 @@ object Log extends Logging {
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param producerStateManager The ProducerStateManager instance (if any) containing state associated
* with the existingSegments
*
* @param logPrefix Logging prefix
* @throws IOException if the file can't be renamed and still exists
*/
private[log] def deleteSegmentFiles(segmentsToDelete: Iterable[LogSegment],
Expand All @@ -2370,11 +2375,12 @@ object Log extends Logging {
config: LogConfig,
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
producerStateManager: ProducerStateManager): Unit = {
producerStateManager: ProducerStateManager,
logPrefix: String): Unit = {
segmentsToDelete.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))

def deleteSegments(): Unit = {
info(s"Deleting segment files ${segmentsToDelete.mkString(",")}")
info(s"${logPrefix}Deleting segment files ${segmentsToDelete.mkString(",")}")
val parentDir = dir.getParent
maybeHandleIOException(logDirFailureChannel, parentDir, s"Error while deleting segments for $topicPartition in dir $parentDir") {
segmentsToDelete.foreach { segment =>
Expand Down Expand Up @@ -2436,7 +2442,8 @@ object Log extends Logging {
lastOffset: Long,
recordVersion: RecordVersion,
time: Time,
reloadFromCleanShutdown: Boolean): Unit = {
reloadFromCleanShutdown: Boolean,
logPrefix: String): Unit = {
val allSegments = segments.values
val offsetsToSnapshot =
if (allSegments.nonEmpty) {
Expand All @@ -2445,7 +2452,7 @@ object Log extends Logging {
} else {
Seq(Some(lastOffset))
}
info(s"Loading producer state till offset $lastOffset with message format version ${recordVersion.value}")
info(s"${logPrefix}Loading producer state till offset $lastOffset with message format version ${recordVersion.value}")

// We want to avoid unnecessary scanning of the log to build the producer state when the broker is being
// upgraded. The basic idea is to use the absence of producer snapshot files to detect the upgrade case,
Expand All @@ -2469,7 +2476,7 @@ object Log extends Logging {
producerStateManager.takeSnapshot()
}
} else {
info(s"Reloading from producer snapshot and rebuilding producer state from offset $lastOffset")
info(s"${logPrefix}Reloading from producer snapshot and rebuilding producer state from offset $lastOffset")
val isEmptyBeforeTruncation = producerStateManager.isEmpty && producerStateManager.mapEndOffset >= lastOffset
val producerStateLoadStart = time.milliseconds()
producerStateManager.truncateAndReload(logStartOffset, lastOffset, time.milliseconds())
Expand Down Expand Up @@ -2508,7 +2515,7 @@ object Log extends Logging {
}
producerStateManager.updateMapEndOffset(lastOffset)
producerStateManager.takeSnapshot()
info(s"Producer state recovery took ${segmentRecoveryStart - producerStateLoadStart}ms for snapshot load " +
info(s"${logPrefix}Producer state recovery took ${segmentRecoveryStart - producerStateLoadStart}ms for snapshot load " +
s"and ${time.milliseconds() - segmentRecoveryStart}ms for segment recovery from offset $lastOffset")
}
}
Expand All @@ -2535,6 +2542,7 @@ object Log extends Logging {
* @param logDirFailureChannel The LogDirFailureChannel to asynchronously handle log dir failure
* @param producerStateManager The ProducerStateManager instance (if any) containing state associated
* with the existingSegments
* @param logPrefix Logging prefix
* @return List of new segments that replace the input segment
*/
private[log] def splitOverflowedSegment(segment: LogSegment,
Expand All @@ -2544,11 +2552,12 @@ object Log extends Logging {
config: LogConfig,
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
producerStateManager: ProducerStateManager): List[LogSegment] = {
producerStateManager: ProducerStateManager,
logPrefix: String): List[LogSegment] = {
require(Log.isLogFile(segment.log.file), s"Cannot split file ${segment.log.file.getAbsoluteFile}")
require(segment.hasOverflow, "Split operation is only permitted for segments with overflow")

info(s"Splitting overflowed segment $segment")
info(s"${logPrefix}Splitting overflowed segment $segment")

val newSegments = ListBuffer[LogSegment]()
try {
Expand Down Expand Up @@ -2581,9 +2590,9 @@ object Log extends Logging {
s" before: ${segment.log.sizeInBytes} after: $totalSizeOfNewSegments")

// replace old segment with new ones
info(s"Replacing overflowed segment $segment with split segments $newSegments")
info(s"${logPrefix}Replacing overflowed segment $segment with split segments $newSegments")
replaceSegments(existingSegments, newSegments.toList, List(segment), isRecoveredSwapFile = false,
dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager)
dir, topicPartition, config, scheduler, logDirFailureChannel, producerStateManager, logPrefix)
newSegments.toList
} catch {
case e: Exception =>
Expand Down
Loading