Skip to content

Commit

Permalink
KAFKA-12554: Refactor Log layer (#10280)
Browse files Browse the repository at this point in the history
TL;DR:

This PR implements the details of the Log layer refactor, as outlined in this document: https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit. Few details maybe different from the doc, but it is more or less the same.

STRATEGY:

In this PR, I've extracted a new class called LocalLog out of Log. Currently LocalLog is purely an implementation detail thats not exposed outside Log class (except for tests). The object encapsulation is that each Log instance wraps around a LocalLog instance.

This new LocalLog class attempts to encompass most of the responsibilities of local log surrounding the segments map, which otherwise were present in Log previously. Note that not all local log responsibilities have been moved over to this new class (yet). The criteria I used was to preserve (for now) in existing Log class, any logic that is mingled in a complex manner with the logStartOffset or the LeaderEpochCache or the ProducerStateManager.

Reviewers: Ismael Juma <ismael@juma.me.uk>, Satish Duggana <satishd@apache.org>, Jun Rao <junrao@gmail.com>
  • Loading branch information
kowshik authored Jul 14, 2021
1 parent 8134adc commit b80ff18
Show file tree
Hide file tree
Showing 20 changed files with 2,202 additions and 1,308 deletions.
1,010 changes: 1,010 additions & 0 deletions core/src/main/scala/kafka/log/LocalLog.scala

Large diffs are not rendered by default.

1,212 changes: 282 additions & 930 deletions core/src/main/scala/kafka/log/Log.scala

Large diffs are not rendered by default.

8 changes: 1 addition & 7 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -449,12 +449,6 @@ object LogCleaner {
enableCleaner = config.logCleanerEnable)

}

def createNewCleanedSegment(dir: File, logConfig: LogConfig, baseOffset: Long): LogSegment = {
LogSegment.deleteIfExists(dir, baseOffset, fileSuffix = Log.CleanedFileSuffix)
LogSegment.open(dir, baseOffset, logConfig, Time.SYSTEM,
fileSuffix = Log.CleanedFileSuffix, initFileSize = logConfig.initFileSize, preallocate = logConfig.preallocate)
}
}

/**
Expand Down Expand Up @@ -562,7 +556,7 @@ private[log] class Cleaner(val id: Int,
stats: CleanerStats,
transactionMetadata: CleanedTransactionMetadata): Unit = {
// create a new segment with a suffix appended to the name of the log and indexes
val cleaned = LogCleaner.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
val cleaned = Log.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)

try {
Expand Down
34 changes: 26 additions & 8 deletions core/src/main/scala/kafka/log/LogLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,18 @@ case class LoadLogParams(dir: File,
* This object is responsible for all activities related with recovery of log segments from disk.
*/
object LogLoader extends Logging {

/**
* Clean shutdown file that indicates the broker was cleanly shutdown in 0.8 and higher.
* This is used to avoid unnecessary recovery after a clean shutdown. In theory this could be
* avoided by passing in the recovery point, however finding the correct position to do this
* requires accessing the offset index which may not be safe in an unclean shutdown.
* For more information see the discussion in PR#2104
*/
val CleanShutdownFile = ".kafka_cleanshutdown"

/**
* Load the log segments from the log files on disk, and return the components of the loaded log.
* Load the log segments from the log files on disk, and returns the components of the loaded log.
* Additionally, it also suitably updates the provided LeaderEpochFileCache and ProducerStateManager
* to reflect the contents of the loaded log.
*
Expand All @@ -90,7 +100,6 @@ object LogLoader extends Logging {
* overflow index offset
*/
def load(params: LoadLogParams): LoadedLogOffsets = {

// First pass: through the files in the log directory and remove any temporary files
// and find any interrupted swap operations
val swapFiles = removeTempFilesAndCollectSwapFiles(params)
Expand Down Expand Up @@ -141,7 +150,6 @@ object LogLoader extends Logging {
}
}


// Fourth pass: load all the log and index files.
// We might encounter legacy log segments with offset overflow (KAFKA-6264). We need to split such segments. When
// this happens, restart loading segment files from scratch.
Expand Down Expand Up @@ -200,7 +208,6 @@ object LogLoader extends Logging {
params.time,
reloadFromCleanShutdown = params.hadCleanShutdown,
params.logIdentifier)

val activeSegment = params.segments.lastSegment.get
LoadedLogOffsets(
newLogStartOffset,
Expand Down Expand Up @@ -274,16 +281,16 @@ object LogLoader extends Logging {
} catch {
case e: LogSegmentOffsetOverflowException =>
info(s"${params.logIdentifier}Caught segment overflow error: ${e.getMessage}. Split segment and retry.")
Log.splitOverflowedSegment(
val result = Log.splitOverflowedSegment(
e.segment,
params.segments,
params.dir,
params.topicPartition,
params.config,
params.scheduler,
params.logDirFailureChannel,
params.producerStateManager,
params.logIdentifier)
deleteProducerSnapshotsAsync(result.deletedSegments, params)
}
}
throw new IllegalStateException()
Expand Down Expand Up @@ -493,14 +500,25 @@ object LogLoader extends Logging {
Log.deleteSegmentFiles(
toDelete,
asyncDelete = true,
deleteProducerStateSnapshots = true,
params.dir,
params.topicPartition,
params.config,
params.scheduler,
params.logDirFailureChannel,
params.producerStateManager,
params.logIdentifier)
deleteProducerSnapshotsAsync(segmentsToDelete, params)
}
}

private def deleteProducerSnapshotsAsync(segments: Iterable[LogSegment],
params: LoadLogParams): Unit = {
Log.deleteProducerSnapshots(segments,
params.producerStateManager,
asyncDelete = true,
params.scheduler,
params.config,
params.logDirFailureChannel,
params.dir.getParent,
params.topicPartition)
}
}
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ class LogManager(logDirs: Seq[File],
val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
threadPools.append(pool)

val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
val cleanShutdownFile = new File(dir, LogLoader.CleanShutdownFile)
if (cleanShutdownFile.exists) {
info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found")
// Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile
Expand Down Expand Up @@ -516,7 +516,7 @@ class LogManager(logDirs: Seq[File],

// mark that the shutdown was clean by creating marker file
debug(s"Writing clean shutdown marker at $dir")
CoreUtils.swallow(Files.createFile(new File(dir, Log.CleanShutdownFile).toPath), this)
CoreUtils.swallow(Files.createFile(new File(dir, LogLoader.CleanShutdownFile).toPath), this)
}
}
} finally {
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,13 @@ class LogSegment private[log] (val log: FileRecords,
txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
}

def hasSuffix(suffix: String): Boolean = {
log.file.getName.endsWith(suffix) &&
lazyOffsetIndex.file.getName.endsWith(suffix) &&
lazyTimeIndex.file.getName.endsWith(suffix) &&
txnIndex.file.getName.endsWith(suffix)
}

/**
* Append the largest time index entry to the time index and trim the log and indexes.
*
Expand Down Expand Up @@ -624,6 +631,10 @@ class LogSegment private[log] (val log: FileRecords,
))
}

def deleted(): Boolean = {
!log.file.exists() && !lazyOffsetIndex.file.exists() && !lazyTimeIndex.file.exists() && !txnIndex.file.exists()
}

/**
* The last modified time of this log segment as a unix time stamp
*/
Expand Down
37 changes: 37 additions & 0 deletions core/src/main/scala/kafka/log/LogSegments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,11 @@ class LogSegments(topicPartition: TopicPartition) {
@threadsafe
def firstSegment: Option[LogSegment] = firstEntry.map(_.getValue)

/**
* @return the base offset of the log segment associated with the smallest offset, if it exists
*/
private[log] def firstSegmentBaseOffset: Option[Long] = firstSegment.map(_.baseOffset)

/**
* @return the entry associated with the greatest offset, if it exists.
*/
Expand All @@ -228,4 +233,36 @@ class LogSegments(topicPartition: TopicPartition) {
}.getOrElse(collection.immutable.Map[Long, LogSegment]().asJava)
view.values.asScala
}

/**
* The active segment that is currently taking appends
*/
def activeSegment = lastSegment.get

def sizeInBytes: Long = LogSegments.sizeInBytes(values)

/**
* Returns an Iterable containing segments matching the provided predicate.
*
* @param predicate the predicate to be used for filtering segments.
*/
def filter(predicate: LogSegment => Boolean): Iterable[LogSegment] = values.filter(predicate)
}

object LogSegments {
/**
* Calculate a log's size (in bytes) from the provided log segments.
*
* @param segments The log segments to calculate the size of
* @return Sum of the log segments' sizes (in bytes)
*/
def sizeInBytes(segments: Iterable[LogSegment]): Long =
segments.map(_.size.toLong).sum

def getFirstBatchTimestampForSegments(segments: Iterable[LogSegment]): Iterable[Long] = {
segments.map {
segment =>
segment.getFirstBatchTimestamp()
}
}
}
23 changes: 8 additions & 15 deletions core/src/test/scala/unit/kafka/cluster/PartitionLockTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,10 @@ class PartitionLockTest extends Logging {
maxProducerIdExpirationMs,
leaderEpochCache,
producerStateManager))
new SlowLog(log, segments, offsets, leaderEpochCache, producerStateManager, mockTime, logDirFailureChannel, appendSemaphore)
val localLog = new LocalLog(log.dir, log.config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, log.topicPartition,
logDirFailureChannel)
new SlowLog(log, offsets.logStartOffset, localLog, leaderEpochCache, producerStateManager, appendSemaphore)
}
}
when(offsetCheckpoints.fetch(ArgumentMatchers.anyString, ArgumentMatchers.eq(topicPartition)))
Expand Down Expand Up @@ -365,28 +368,18 @@ class PartitionLockTest extends Logging {

private class SlowLog(
log: Log,
segments: LogSegments,
offsets: LoadedLogOffsets,
logStartOffset: Long,
localLog: LocalLog,
leaderEpochCache: Option[LeaderEpochFileCache],
producerStateManager: ProducerStateManager,
mockTime: MockTime,
logDirFailureChannel: LogDirFailureChannel,
appendSemaphore: Semaphore
) extends Log(
log.dir,
log.config,
segments,
offsets.logStartOffset,
offsets.recoveryPoint,
offsets.nextOffsetMetadata,
mockTime.scheduler,
logStartOffset,
localLog,
new BrokerTopicStats,
mockTime,
log.producerIdExpirationCheckIntervalMs,
log.topicPartition,
leaderEpochCache,
producerStateManager,
logDirFailureChannel,
_topicId = None,
keepPartitionMetadataFile = true) {

Expand Down
23 changes: 8 additions & 15 deletions core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,10 @@ class PartitionTest extends AbstractPartitionTest {
maxProducerIdExpirationMs,
leaderEpochCache,
producerStateManager))
new SlowLog(log, segments, offsets, leaderEpochCache, producerStateManager, mockTime, logDirFailureChannel, appendSemaphore)
val localLog = new LocalLog(log.dir, log.config, segments, offsets.recoveryPoint,
offsets.nextOffsetMetadata, mockTime.scheduler, mockTime, log.topicPartition,
logDirFailureChannel)
new SlowLog(log, offsets.logStartOffset, localLog, leaderEpochCache, producerStateManager, appendSemaphore)
}
}

Expand Down Expand Up @@ -2030,28 +2033,18 @@ class PartitionTest extends AbstractPartitionTest {

private class SlowLog(
log: Log,
segments: LogSegments,
offsets: LoadedLogOffsets,
logStartOffset: Long,
localLog: LocalLog,
leaderEpochCache: Option[LeaderEpochFileCache],
producerStateManager: ProducerStateManager,
mockTime: MockTime,
logDirFailureChannel: LogDirFailureChannel,
appendSemaphore: Semaphore
) extends Log(
log.dir,
log.config,
segments,
offsets.logStartOffset,
offsets.recoveryPoint,
offsets.nextOffsetMetadata,
mockTime.scheduler,
logStartOffset,
localLog,
new BrokerTopicStats,
mockTime,
log.producerIdExpirationCheckIntervalMs,
log.topicPartition,
leaderEpochCache,
producerStateManager,
logDirFailureChannel,
_topicId = None,
keepPartitionMetadataFile = true) {

Expand Down
Loading

0 comments on commit b80ff18

Please sign in to comment.