Skip to content

Commit

Permalink
Rework on KAFKA-3968: fsync the parent directory of a segment file wh…
Browse files Browse the repository at this point in the history
…en the file is created (#10680)

(reverted #10405). #10405 has several issues, for example:

It fails to create a topic with 9000 partitions.
It flushes in several unnecessary places.
If multiple segments of the same partition are flushed at roughly the same time, we may end up doing multiple unnecessary flushes: the logic of handling the flush in LogSegments.scala is weird.
Kafka does not call fsync() on the directory when a new log segment is created and flushed to disk.

The problem is that following sequence of calls doesn't guarantee file durability:

fd = open("log", O_RDWR | O_CREATE); // suppose open creates "log"
write(fd);
fsync(fd);

If the system crashes after fsync() but before the parent directory has been flushed to disk, the log file can disappear.

This PR is to flush the directory when flush() is called for the first time.

Did performance test which shows this PR has a minimal performance impact on Kafka clusters.

Reviewers: Jun Rao <junrao@gmail.com>
  • Loading branch information
ccding authored May 14, 2021
1 parent 29c55fd commit db3e5e2
Show file tree
Hide file tree
Showing 5 changed files with 25 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,6 @@ public void flush() throws IOException {
channel.force(true);
}

/**
* Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing.
*/
public void flushParentDir() throws IOException {
Utils.flushParentDir(file.toPath());
}

/**
* Close this record set
*/
Expand Down
23 changes: 11 additions & 12 deletions clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -902,27 +902,26 @@ public static void atomicMoveWithFallback(Path source, Path target, boolean need
}
} finally {
if (needFlushParentDir) {
flushParentDir(target);
flushDir(target.toAbsolutePath().normalize().getParent());
}
}
}

/**
* Flushes the parent directory to guarantee crash consistency.
* Flushes dirty directories to guarantee crash consistency.
*
* @throws IOException if flushing the parent directory fails.
* @throws IOException if flushing the directory fails.
*/
public static void flushParentDir(Path path) throws IOException {
FileChannel dir = null;
try {
Path parent = path.toAbsolutePath().getParent();
if (parent != null) {
dir = FileChannel.open(parent, StandardOpenOption.READ);
public static void flushDir(Path path) throws IOException {
if (path != null) {
FileChannel dir = null;
try {
dir = FileChannel.open(path, StandardOpenOption.READ);
dir.force(true);
} finally {
if (dir != null)
dir.close();
}
} finally {
if (dir != null)
dir.close();
}
}

Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1676,7 +1676,10 @@ class Log(@volatile private var _dir: File,
if (offset > this.recoveryPoint) {
debug(s"Flushing log up to offset $offset, last flushed: $lastFlushTime, current time: ${time.milliseconds()}, " +
s"unflushed: $unflushedMessages")
logSegments(this.recoveryPoint, offset).foreach(_.flush())
val segments = logSegments(this.recoveryPoint, offset)
segments.foreach(_.flush())
// if there are any new segments, we need to flush the parent directory for crash consistency
segments.lastOption.filter(_.baseOffset >= this.recoveryPoint).foreach(_ => Utils.flushDir(dir.toPath))

lock synchronized {
checkIfMemoryMappedBufferClosed()
Expand Down Expand Up @@ -2312,7 +2315,7 @@ object Log extends Logging {
// need to do this in two phases to be crash safe AND do the delete asynchronously
// if we crash in the middle of this we complete the swap in loadSegments()
if (!isRecoveredSwapFile)
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix, false))
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix))
sortedNewSegments.reverse.foreach(existingSegments.add(_))
val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet

Expand All @@ -2335,6 +2338,7 @@ object Log extends Logging {
}
// okay we are safe now, remove the swap suffix
sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
Utils.flushDir(dir.toPath)
}

/**
Expand Down Expand Up @@ -2369,7 +2373,7 @@ object Log extends Logging {
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
producerStateManager: ProducerStateManager): Unit = {
segmentsToDelete.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix, false))
segmentsToDelete.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))

def deleteSegments(): Unit = {
info(s"Deleting segment files ${segmentsToDelete.mkString(",")}")
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class LogManager(logDirs: Seq[File],
val created = dir.mkdirs()
if (!created)
throw new IOException(s"Failed to create data directory ${dir.getAbsolutePath}")
Utils.flushParentDir(dir.toPath)
Utils.flushDir(dir.toPath.toAbsolutePath.normalize.getParent)
}
if (!dir.isDirectory || !dir.canRead)
throw new IOException(s"${dir.getAbsolutePath} is not a readable log directory.")
Expand Down Expand Up @@ -640,6 +640,8 @@ class LogManager(logDirs: Seq[File],
try {
recoveryPointCheckpoints.get(logDir).foreach { checkpoint =>
val recoveryOffsets = logsToCheckpoint.map { case (tp, log) => tp -> log.recoveryPoint }
// checkpoint.write calls Utils.atomicMoveWithFallback, which flushes the parent
// directory and guarantees crash consistency.
checkpoint.write(recoveryOffsets)
}
} catch {
Expand Down Expand Up @@ -867,7 +869,6 @@ class LogManager(logDirs: Seq[File],
val dir = new File(logDirPath, logDirName)
try {
Files.createDirectories(dir.toPath)
Utils.flushParentDir(dir.toPath)
Success(dir)
} catch {
case e: IOException =>
Expand Down
22 changes: 4 additions & 18 deletions core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
import java.nio.file.attribute.FileTime
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import kafka.common.LogSegmentOffsetOverflowException
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server.epoch.LeaderEpochFileCache
Expand Down Expand Up @@ -51,7 +50,6 @@ import scala.math._
* @param indexIntervalBytes The approximate number of bytes between entries in the index
* @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time
* @param time The time instance
* @param needsFlushParentDir Whether or not we need to flush the parent directory during the first flush
*/
@nonthreadsafe
class LogSegment private[log] (val log: FileRecords,
Expand All @@ -61,8 +59,7 @@ class LogSegment private[log] (val log: FileRecords,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time,
val needsFlushParentDir: Boolean = false) extends Logging {
val time: Time) extends Logging {

def offsetIndex: OffsetIndex = lazyOffsetIndex.get

Expand Down Expand Up @@ -98,9 +95,6 @@ class LogSegment private[log] (val log: FileRecords,
/* the number of bytes since we last added an entry in the offset index */
private var bytesSinceLastIndexEntry = 0

/* whether or not we need to flush the parent dir during the next flush */
private val atomicNeedsFlushParentDir = new AtomicBoolean(needsFlushParentDir)

// The timestamp we used for time based log rolling and for ensuring max compaction delay
// volatile for LogCleaner to see the update
@volatile private var rollingBasedTimestamp: Option[Long] = None
Expand Down Expand Up @@ -478,9 +472,6 @@ class LogSegment private[log] (val log: FileRecords,
offsetIndex.flush()
timeIndex.flush()
txnIndex.flush()
// We only need to flush the parent of the log file because all other files share the same parent
if (atomicNeedsFlushParentDir.getAndSet(false))
log.flushParentDir()
}
}

Expand All @@ -499,14 +490,11 @@ class LogSegment private[log] (val log: FileRecords,
* Change the suffix for the index and log files for this log segment
* IOException from this method should be handled by the caller
*/
def changeFileSuffixes(oldSuffix: String, newSuffix: String, needsFlushParentDir: Boolean = true): Unit = {
def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = {
log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix)))
lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix)))
txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
// We only need to flush the parent of the log file because all other files share the same parent
if (needsFlushParentDir)
log.flushParentDir()
}

/**
Expand Down Expand Up @@ -669,8 +657,7 @@ class LogSegment private[log] (val log: FileRecords,
object LogSegment {

def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false,
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = "",
needsRecovery: Boolean = false): LogSegment = {
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = {
val maxIndexSize = config.maxIndexSize
new LogSegment(
FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate),
Expand All @@ -680,8 +667,7 @@ object LogSegment {
baseOffset,
indexIntervalBytes = config.indexInterval,
rollJitterMs = config.randomSegmentJitter,
time,
needsFlushParentDir = needsRecovery || !fileAlreadyExists)
time)
}

def deleteIfExists(dir: File, baseOffset: Long, fileSuffix: String = ""): Unit = {
Expand Down

0 comments on commit db3e5e2

Please sign in to comment.