Skip to content

Commit

Permalink
KAFKA-12575: Eliminate Log.isLogDirOffline boolean attribute (#10430)
Browse files Browse the repository at this point in the history
This PR is a precursor to the recovery logic refactor work (KAFKA-12553).

I have made a change to eliminate Log.isLogDirOffline attribute. This boolean also comes in the way of refactoring the recovery logic. This attribute was added in #9676. But it is redundant and can be eliminated in favor of looking up LogDirFailureChannel to check if the logDir is offline. The performance/latency implication of such a ConcurrentHashMap lookup inside LogDirFailureChannel should be very low given that ConcurrentHashMap reads are usually lock free.

Tests:
Relying on existing unit/integration tests.

Reviewers: Dhruvil Shah <dhruvil@confluent.io>, Jun Rao <junrao@gmail.com>
  • Loading branch information
kowshik authored Mar 31, 2021
1 parent 6d7a901 commit b6278ee
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 13 deletions.
17 changes: 4 additions & 13 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -283,10 +283,6 @@ class Log(@volatile private var _dir: File,

@volatile private var nextOffsetMetadata: LogOffsetMetadata = _

// Log dir failure is handled asynchronously we need to prevent threads
// from reading inconsistent state caused by a failure in another thread
@volatile private var logDirOffline = false

/* The earliest offset which is part of an incomplete transaction. This is used to compute the
* last stable offset (LSO) in ReplicaManager. Note that it is possible that the "true" first unstable offset
* gets removed from the log (through record or segment deletion). In this case, the first unstable offset
Expand Down Expand Up @@ -1332,12 +1328,6 @@ class Log(@volatile private var _dir: File,
}
}

private def checkForLogDirFailure(): Unit = {
if (logDirOffline) {
throw new KafkaStorageException(s"The log dir $parentDir is offline due to a previous IO exception.")
}
}

def maybeAssignEpochStartOffset(leaderEpoch: Int, startOffset: Long): Unit = {
leaderEpochCache.foreach { cache =>
cache.assign(leaderEpoch, startOffset)
Expand Down Expand Up @@ -2428,13 +2418,14 @@ class Log(@volatile private var _dir: File,
private[log] def addSegment(segment: LogSegment): LogSegment = this.segments.add(segment)

private def maybeHandleIOException[T](msg: => String)(fun: => T): T = {
if (logDirFailureChannel.hasOfflineLogDir(parentDir)) {
throw new KafkaStorageException(s"The log dir $parentDir is offline due to a previous IO exception.")
}
try {
checkForLogDirFailure()
fun
} catch {
case e: IOException =>
logDirOffline = true
logDirFailureChannel.maybeAddOfflineLogDir(dir.getParent, msg, e)
logDirFailureChannel.maybeAddOfflineLogDir(parentDir, msg, e)
throw new KafkaStorageException(msg, e)
}
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/kafka/server/LogDirFailureChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ class LogDirFailureChannel(logDirNum: Int) extends Logging {
private val offlineLogDirs = new ConcurrentHashMap[String, String]
private val offlineLogDirQueue = new ArrayBlockingQueue[String](logDirNum)

def hasOfflineLogDir(logDir: String): Boolean = {
offlineLogDirs.containsKey(logDir)
}

/*
* If the given logDir is not already offline, add it to the
* set of offline log dirs and enqueue it to the logDirFailureEvent queue
Expand Down

0 comments on commit b6278ee

Please sign in to comment.