Skip to content

Commit

Permalink
KAFKA-13068: Rename Log to UnifiedLog (#11154)
Browse files Browse the repository at this point in the history
In this PR, I've renamed kafka.log.Log to kafka.log.UnifiedLog. With the advent of KIP-405, going forward the existing Log class would present a unified view of local and tiered log segments, so we rename it to UnifiedLog. The motivation for this PR is also the same as outlined in this design document: https://docs.google.com/document/d/1dQJL4MCwqQJSPmZkVmVzshFZKuFy_bCPtubav4wBfHQ/edit.
This PR is a follow-up to #10280 where we had refactored the Log layer introducing a new kafka.log.LocalLog class.

Note: the Log class name had to be hardcoded to ensure metrics are defined under the Log class (for backwards compatibility). Please refer to the newly introduced UnifiedLog.metricName() method.

Reviewers: Cong Ding <cong@ccding.com>, Satish Duggana <satishd@apache.org>, Jun Rao <junrao@gmail.com>
  • Loading branch information
kowshik authored Aug 12, 2021
1 parent b1b872b commit db1f581
Show file tree
Hide file tree
Showing 56 changed files with 486 additions and 477 deletions.
32 changes: 16 additions & 16 deletions core/src/main/scala/kafka/cluster/Partition.scala
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,9 @@ class Partition(val topicPartition: TopicPartition,
// is getting changed (as a result of ReplicaAlterLogDirs command), we may have two logs until copy
// completes and a switch to new location is performed.
// log and futureLog variables defined below are used to capture this
@volatile var log: Option[Log] = None
@volatile var log: Option[UnifiedLog] = None
// If ReplicaAlterLogDir command is in progress, this is future location of the log
@volatile var futureLog: Option[Log] = None
@volatile var futureLog: Option[UnifiedLog] = None

/* Epoch of the controller that last changed the leader. This needs to be initialized correctly upon broker startup.
* One way of doing that is through the controller's start replica state change command. When a new broker starts up
Expand Down Expand Up @@ -313,10 +313,10 @@ class Partition(val topicPartition: TopicPartition,
}

def createLogIfNotExists(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Unit = {
def maybeCreate(logOpt: Option[Log]): Log = {
def maybeCreate(logOpt: Option[UnifiedLog]): UnifiedLog = {
logOpt match {
case Some(log) =>
trace(s"${if (isFutureReplica) "Future Log" else "Log"} already exists.")
trace(s"${if (isFutureReplica) "Future UnifiedLog" else "UnifiedLog"} already exists.")
if (log.topicId.isEmpty)
topicId.foreach(log.assignTopicId)
log
Expand All @@ -333,8 +333,8 @@ class Partition(val topicPartition: TopicPartition,
}

// Visible for testing
private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): Log = {
def updateHighWatermark(log: Log) = {
private[cluster] def createLog(isNew: Boolean, isFutureReplica: Boolean, offsetCheckpoints: OffsetCheckpoints, topicId: Option[Uuid]): UnifiedLog = {
def updateHighWatermark(log: UnifiedLog) = {
val checkpointHighWatermark = offsetCheckpoints.fetch(log.parentDir, topicPartition).getOrElse {
info(s"No checkpointed highwatermark is found for partition $topicPartition")
0L
Expand All @@ -344,7 +344,7 @@ class Partition(val topicPartition: TopicPartition,
}

logManager.initializingLog(topicPartition)
var maybeLog: Option[Log] = None
var maybeLog: Option[UnifiedLog] = None
try {
val log = logManager.getOrCreateLog(topicPartition, isNew, isFutureReplica, topicId)
maybeLog = Some(log)
Expand Down Expand Up @@ -373,7 +373,7 @@ class Partition(val topicPartition: TopicPartition,
}

private def getLocalLog(currentLeaderEpoch: Optional[Integer],
requireLeader: Boolean): Either[Log, Errors] = {
requireLeader: Boolean): Either[UnifiedLog, Errors] = {
checkCurrentLeaderEpoch(currentLeaderEpoch) match {
case Errors.NONE =>
if (requireLeader && !isLeader) {
Expand All @@ -391,17 +391,17 @@ class Partition(val topicPartition: TopicPartition,
}
}

def localLogOrException: Log = log.getOrElse {
def localLogOrException: UnifiedLog = log.getOrElse {
throw new NotLeaderOrFollowerException(s"Log for partition $topicPartition is not available " +
s"on broker $localBrokerId")
}

def futureLocalLogOrException: Log = futureLog.getOrElse {
def futureLocalLogOrException: UnifiedLog = futureLog.getOrElse {
throw new NotLeaderOrFollowerException(s"Future log for partition $topicPartition is not available " +
s"on broker $localBrokerId")
}

def leaderLogIfLocal: Option[Log] = {
def leaderLogIfLocal: Option[UnifiedLog] = {
log.filter(_ => isLeader)
}

Expand All @@ -411,7 +411,7 @@ class Partition(val topicPartition: TopicPartition,
def isLeader: Boolean = leaderReplicaIdOpt.contains(localBrokerId)

private def localLogWithEpochOrException(currentLeaderEpoch: Optional[Integer],
requireLeader: Boolean): Log = {
requireLeader: Boolean): UnifiedLog = {
getLocalLog(currentLeaderEpoch, requireLeader) match {
case Left(localLog) => localLog
case Right(error) =>
Expand All @@ -422,7 +422,7 @@ class Partition(val topicPartition: TopicPartition,
}

// Visible for testing -- Used by unit tests to set log for this partition
def setLog(log: Log, isFutureLog: Boolean): Unit = {
def setLog(log: UnifiedLog, isFutureLog: Boolean): Unit = {
if (isFutureLog)
futureLog = Some(log)
else
Expand Down Expand Up @@ -576,9 +576,9 @@ class Partition(val topicPartition: TopicPartition,
remoteReplicas.foreach { replica =>
replica.updateFetchState(
followerFetchOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata,
followerStartOffset = Log.UnknownOffset,
followerStartOffset = UnifiedLog.UnknownOffset,
followerFetchTimeMs = 0L,
leaderEndOffset = Log.UnknownOffset)
leaderEndOffset = UnifiedLog.UnknownOffset)
}
}
// we may need to increment high watermark since ISR could be down to 1
Expand Down Expand Up @@ -843,7 +843,7 @@ class Partition(val topicPartition: TopicPartition,
*
* @return true if the HW was incremented, and false otherwise.
*/
private def maybeIncrementLeaderHW(leaderLog: Log, curTime: Long = time.milliseconds): Boolean = {
private def maybeIncrementLeaderHW(leaderLog: UnifiedLog, curTime: Long = time.milliseconds): Boolean = {
// maybeIncrementLeaderHW is in the hot path, the following code is written to
// avoid unnecessary collection generation
var newHighWatermark = leaderLog.logEndOffsetMetadata
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/kafka/cluster/Replica.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package kafka.cluster

import kafka.log.Log
import kafka.log.UnifiedLog
import kafka.server.LogOffsetMetadata
import kafka.utils.Logging
import org.apache.kafka.common.TopicPartition
Expand All @@ -28,7 +28,7 @@ class Replica(val brokerId: Int, val topicPartition: TopicPartition) extends Log
@volatile private[this] var _logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
// the log start offset value, kept in all replicas;
// for local replica it is the log's start offset, for remote replicas its value is only updated by follower fetch
@volatile private[this] var _logStartOffset = Log.UnknownOffset
@volatile private[this] var _logStartOffset = UnifiedLog.UnknownOffset

// The log end offset value at the time the leader received the last FetchRequest from this follower
// This is used to determine the lastCaughtUpTimeMs of the follower
Expand Down
14 changes: 7 additions & 7 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ import scala.util.control.ControlThrowable
*/
class LogCleaner(initialConfig: CleanerConfig,
val logDirs: Seq[File],
val logs: Pool[TopicPartition, Log],
val logs: Pool[TopicPartition, UnifiedLog],
val logDirFailureChannel: LogDirFailureChannel,
time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup with BrokerReconfigurable
{
Expand Down Expand Up @@ -272,7 +272,7 @@ class LogCleaner(initialConfig: CleanerConfig,
* retention threads need to make this call to obtain:
* @return A list of log partitions that retention threads can safely work on
*/
def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, Log)] = {
def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, UnifiedLog)] = {
cleanerManager.pauseCleaningForNonCompactedPartitions()
}

Expand Down Expand Up @@ -356,7 +356,7 @@ class LogCleaner(initialConfig: CleanerConfig,
case e: Exception => throw new LogCleaningException(cleanable.log, e.getMessage, e)
}
}
val deletable: Iterable[(TopicPartition, Log)] = cleanerManager.deletableLogs()
val deletable: Iterable[(TopicPartition, UnifiedLog)] = cleanerManager.deletableLogs()
try {
deletable.foreach { case (_, log) =>
try {
Expand Down Expand Up @@ -549,14 +549,14 @@ private[log] class Cleaner(val id: Int,
* @param transactionMetadata State of ongoing transactions which is carried between the cleaning
* of the grouped segments
*/
private[log] def cleanSegments(log: Log,
private[log] def cleanSegments(log: UnifiedLog,
segments: Seq[LogSegment],
map: OffsetMap,
deleteHorizonMs: Long,
stats: CleanerStats,
transactionMetadata: CleanedTransactionMetadata): Unit = {
// create a new segment with a suffix appended to the name of the log and indexes
val cleaned = Log.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
val cleaned = UnifiedLog.createNewCleanedSegment(log.dir, log.config, segments.head.baseOffset)
transactionMetadata.cleanedIndex = Some(cleaned.txnIndex)

try {
Expand Down Expand Up @@ -876,7 +876,7 @@ private[log] class Cleaner(val id: Int,
* @param map The map in which to store the mappings
* @param stats Collector for cleaning statistics
*/
private[log] def buildOffsetMap(log: Log,
private[log] def buildOffsetMap(log: UnifiedLog,
start: Long,
end: Long,
map: OffsetMap,
Expand Down Expand Up @@ -1063,7 +1063,7 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
* and whether it needs compaction immediately.
*/
private case class LogToClean(topicPartition: TopicPartition,
log: Log,
log: UnifiedLog,
firstDirtyOffset: Long,
uncleanableOffset: Long,
needCompactionNow: Boolean = false) extends Ordered[LogToClean] {
Expand Down
18 changes: 9 additions & 9 deletions core/src/main/scala/kafka/log/LogCleanerManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ private[log] case object LogCleaningInProgress extends LogCleaningState
private[log] case object LogCleaningAborted extends LogCleaningState
private[log] case class LogCleaningPaused(pausedCount: Int) extends LogCleaningState

private[log] class LogCleaningException(val log: Log,
private[log] class LogCleaningException(val log: UnifiedLog,
private val message: String,
private val cause: Throwable) extends KafkaException(message, cause)

Expand All @@ -59,7 +59,7 @@ private[log] class LogCleaningException(val log: Log,
* Valid previous state is LogCleaningPaused(i-1) or LogCleaningPaused(i+1).
*/
private[log] class LogCleanerManager(val logDirs: Seq[File],
val logs: Pool[TopicPartition, Log],
val logs: Pool[TopicPartition, UnifiedLog],
val logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
import LogCleanerManager._

Expand Down Expand Up @@ -216,7 +216,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
* switch topic configuration between compacted and non-compacted topic.
* @return retention logs that have log cleaning successfully paused
*/
def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, Log)] = {
def pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, UnifiedLog)] = {
inLock(lock) {
val deletableLogs = logs.filter {
case (_, log) => !log.config.compact // pick non-compacted logs
Expand All @@ -236,7 +236,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
* Include logs without delete enabled, as they may have segments
* that precede the start offset.
*/
def deletableLogs(): Iterable[(TopicPartition, Log)] = {
def deletableLogs(): Iterable[(TopicPartition, UnifiedLog)] = {
inLock(lock) {
val toClean = logs.filter { case (topicPartition, log) =>
!inProgress.contains(topicPartition) && log.config.compact &&
Expand Down Expand Up @@ -506,7 +506,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
}
}

private def isUncleanablePartition(log: Log, topicPartition: TopicPartition): Boolean = {
private def isUncleanablePartition(log: UnifiedLog, topicPartition: TopicPartition): Boolean = {
inLock(lock) {
uncleanablePartitions.get(log.parentDir).exists(partitions => partitions.contains(topicPartition))
}
Expand All @@ -529,15 +529,15 @@ private case class OffsetsToClean(firstDirtyOffset: Long,

private[log] object LogCleanerManager extends Logging {

def isCompactAndDelete(log: Log): Boolean = {
def isCompactAndDelete(log: UnifiedLog): Boolean = {
log.config.compact && log.config.delete
}

/**
* get max delay between the time when log is required to be compacted as determined
* by maxCompactionLagMs and the current time.
*/
def maxCompactionDelay(log: Log, firstDirtyOffset: Long, now: Long) : Long = {
def maxCompactionDelay(log: UnifiedLog, firstDirtyOffset: Long, now: Long) : Long = {
val dirtyNonActiveSegments = log.nonActiveLogSegmentsFrom(firstDirtyOffset)
val firstBatchTimestamps = log.getFirstBatchTimestampForSegments(dirtyNonActiveSegments).filter(_ > 0)

Expand All @@ -564,7 +564,7 @@ private[log] object LogCleanerManager extends Logging {
* @param now the current time in milliseconds of the cleaning operation
* @return OffsetsToClean containing offsets for cleanable portion of log and whether the log checkpoint needs updating
*/
def cleanableOffsets(log: Log, lastCleanOffset: Option[Long], now: Long): OffsetsToClean = {
def cleanableOffsets(log: UnifiedLog, lastCleanOffset: Option[Long], now: Long): OffsetsToClean = {
// If the log segments are abnormally truncated and hence the checkpointed offset is no longer valid;
// reset to the log starting offset and log the error
val (firstDirtyOffset, forceUpdateCheckpoint) = {
Expand Down Expand Up @@ -626,7 +626,7 @@ private[log] object LogCleanerManager extends Logging {
* Given the first dirty offset and an uncleanable offset, calculates the total cleanable bytes for this log
* @return the biggest uncleanable offset and the total amount of cleanable bytes
*/
def calculateCleanableBytes(log: Log, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = {
def calculateCleanableBytes(log: UnifiedLog, firstDirtyOffset: Long, uncleanableOffset: Long): (Long, Long) = {
val firstUncleanableSegment = log.nonActiveLogSegmentsFrom(uncleanableOffset).headOption.getOrElse(log.activeSegment)
val firstUncleanableOffset = firstUncleanableSegment.baseOffset
val cleanableBytes = log.logSegments(math.min(firstDirtyOffset, firstUncleanableOffset), firstUncleanableOffset).map(_.size.toLong).sum
Expand Down
Loading

0 comments on commit db1f581

Please sign in to comment.