From db3e5e2c0de367ffcfe4078359d6d208ba722581 Mon Sep 17 00:00:00 2001
From: Cong Ding <cong@ccding.com>
Date: Fri, 14 May 2021 11:25:24 -0500
Subject: [PATCH] Rework on KAFKA-3968: fsync the parent directory of a segment
 file when the file is created (#10680)

(reverted #10405). #10405 has several issues, for example:

It fails to create a topic with 9000 partitions.
It flushes in several unnecessary places.
If multiple segments of the same partition are flushed at roughly the same time, we may end up doing multiple unnecessary flushes: the logic of handling the flush in LogSegments.scala is weird.
Kafka does not call fsync() on the directory when a new log segment is created and flushed to disk.

The problem is that following sequence of calls doesn't guarantee file durability:

fd = open("log", O_RDWR | O_CREATE); // suppose open creates "log"
write(fd);
fsync(fd);

If the system crashes after fsync() but before the parent directory has been flushed to disk, the log file can disappear.

This PR is to flush the directory when flush() is called for the first time.

Did performance test which shows this PR has a minimal performance impact on Kafka clusters.

Reviewers: Jun Rao <junrao@gmail.com>
---
 .../kafka/common/record/FileRecords.java      |  7 ------
 .../org/apache/kafka/common/utils/Utils.java  | 23 +++++++++----------
 core/src/main/scala/kafka/log/Log.scala       | 10 +++++---
 .../src/main/scala/kafka/log/LogManager.scala |  5 ++--
 .../src/main/scala/kafka/log/LogSegment.scala | 22 ++++--------------
 5 files changed, 25 insertions(+), 42 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 72d2d6183ffd3..17a41e2a74479 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -197,13 +197,6 @@ public void flush() throws IOException {
         channel.force(true);
     }
 
-    /**
-     * Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing.
-     */
-    public void flushParentDir() throws IOException {
-        Utils.flushParentDir(file.toPath());
-    }
-
     /**
      * Close this record set
      */
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index b56ad155b1f21..09dcae4b1848b 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -902,27 +902,26 @@ public static void atomicMoveWithFallback(Path source, Path target, boolean need
             }
         } finally {
             if (needFlushParentDir) {
-                flushParentDir(target);
+                flushDir(target.toAbsolutePath().normalize().getParent());
             }
         }
     }
 
     /**
-     * Flushes the parent directory to guarantee crash consistency.
+     * Flushes dirty directories to guarantee crash consistency.
      *
-     * @throws IOException if flushing the parent directory fails.
+     * @throws IOException if flushing the directory fails.
      */
-    public static void flushParentDir(Path path) throws IOException {
-        FileChannel dir = null;
-        try {
-            Path parent = path.toAbsolutePath().getParent();
-            if (parent != null) {
-                dir = FileChannel.open(parent, StandardOpenOption.READ);
+    public static void flushDir(Path path) throws IOException {
+        if (path != null) {
+            FileChannel dir = null;
+            try {
+                dir = FileChannel.open(path, StandardOpenOption.READ);
                 dir.force(true);
+            } finally {
+                if (dir != null)
+                    dir.close();
             }
-        } finally {
-            if (dir != null)
-                dir.close();
         }
     }
 
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index edbabb88feef6..6c306101b1474 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1676,7 +1676,10 @@ class Log(@volatile private var _dir: File,
       if (offset > this.recoveryPoint) {
         debug(s"Flushing log up to offset $offset, last flushed: $lastFlushTime,  current time: ${time.milliseconds()}, " +
           s"unflushed: $unflushedMessages")
-        logSegments(this.recoveryPoint, offset).foreach(_.flush())
+        val segments = logSegments(this.recoveryPoint, offset)
+        segments.foreach(_.flush())
+        // if there are any new segments, we need to flush the parent directory for crash consistency
+        segments.lastOption.filter(_.baseOffset >= this.recoveryPoint).foreach(_ => Utils.flushDir(dir.toPath))
 
         lock synchronized {
           checkIfMemoryMappedBufferClosed()
@@ -2312,7 +2315,7 @@ object Log extends Logging {
       // need to do this in two phases to be crash safe AND do the delete asynchronously
       // if we crash in the middle of this we complete the swap in loadSegments()
       if (!isRecoveredSwapFile)
-        sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix, false))
+        sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix))
       sortedNewSegments.reverse.foreach(existingSegments.add(_))
       val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet
 
@@ -2335,6 +2338,7 @@ object Log extends Logging {
       }
       // okay we are safe now, remove the swap suffix
       sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
+      Utils.flushDir(dir.toPath)
   }
 
   /**
@@ -2369,7 +2373,7 @@ object Log extends Logging {
                                       scheduler: Scheduler,
                                       logDirFailureChannel: LogDirFailureChannel,
                                       producerStateManager: ProducerStateManager): Unit = {
-    segmentsToDelete.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix, false))
+    segmentsToDelete.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))
 
     def deleteSegments(): Unit = {
       info(s"Deleting segment files ${segmentsToDelete.mkString(",")}")
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index a13a7d484e95f..2a4d9539babfc 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -150,7 +150,7 @@ class LogManager(logDirs: Seq[File],
           val created = dir.mkdirs()
           if (!created)
             throw new IOException(s"Failed to create data directory ${dir.getAbsolutePath}")
-          Utils.flushParentDir(dir.toPath)
+          Utils.flushDir(dir.toPath.toAbsolutePath.normalize.getParent)
         }
         if (!dir.isDirectory || !dir.canRead)
           throw new IOException(s"${dir.getAbsolutePath} is not a readable log directory.")
@@ -640,6 +640,8 @@ class LogManager(logDirs: Seq[File],
     try {
       recoveryPointCheckpoints.get(logDir).foreach { checkpoint =>
         val recoveryOffsets = logsToCheckpoint.map { case (tp, log) => tp -> log.recoveryPoint }
+        // checkpoint.write calls Utils.atomicMoveWithFallback, which flushes the parent
+        // directory and guarantees crash consistency.
         checkpoint.write(recoveryOffsets)
       }
     } catch {
@@ -867,7 +869,6 @@ class LogManager(logDirs: Seq[File],
       val dir = new File(logDirPath, logDirName)
       try {
         Files.createDirectories(dir.toPath)
-        Utils.flushParentDir(dir.toPath)
         Success(dir)
       } catch {
         case e: IOException =>
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index e3b09d4b90f8b..37882ffa52592 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -20,7 +20,6 @@ import java.io.{File, IOException}
 import java.nio.file.{Files, NoSuchFileException}
 import java.nio.file.attribute.FileTime
 import java.util.concurrent.TimeUnit
-import java.util.concurrent.atomic.AtomicBoolean
 import kafka.common.LogSegmentOffsetOverflowException
 import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.server.epoch.LeaderEpochFileCache
@@ -51,7 +50,6 @@ import scala.math._
  * @param indexIntervalBytes The approximate number of bytes between entries in the index
  * @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time
  * @param time The time instance
- * @param needsFlushParentDir Whether or not we need to flush the parent directory during the first flush
  */
 @nonthreadsafe
 class LogSegment private[log] (val log: FileRecords,
@@ -61,8 +59,7 @@ class LogSegment private[log] (val log: FileRecords,
                                val baseOffset: Long,
                                val indexIntervalBytes: Int,
                                val rollJitterMs: Long,
-                               val time: Time,
-                               val needsFlushParentDir: Boolean = false) extends Logging {
+                               val time: Time) extends Logging {
 
   def offsetIndex: OffsetIndex = lazyOffsetIndex.get
 
@@ -98,9 +95,6 @@ class LogSegment private[log] (val log: FileRecords,
   /* the number of bytes since we last added an entry in the offset index */
   private var bytesSinceLastIndexEntry = 0
 
-  /* whether or not we need to flush the parent dir during the next flush */
-  private val atomicNeedsFlushParentDir = new AtomicBoolean(needsFlushParentDir)
-
   // The timestamp we used for time based log rolling and for ensuring max compaction delay
   // volatile for LogCleaner to see the update
   @volatile private var rollingBasedTimestamp: Option[Long] = None
@@ -478,9 +472,6 @@ class LogSegment private[log] (val log: FileRecords,
       offsetIndex.flush()
       timeIndex.flush()
       txnIndex.flush()
-      // We only need to flush the parent of the log file because all other files share the same parent
-      if (atomicNeedsFlushParentDir.getAndSet(false))
-        log.flushParentDir()
     }
   }
 
@@ -499,14 +490,11 @@ class LogSegment private[log] (val log: FileRecords,
    * Change the suffix for the index and log files for this log segment
    * IOException from this method should be handled by the caller
    */
-  def changeFileSuffixes(oldSuffix: String, newSuffix: String, needsFlushParentDir: Boolean = true): Unit = {
+  def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = {
     log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
     lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix)))
     lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix)))
     txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
-    // We only need to flush the parent of the log file because all other files share the same parent
-    if (needsFlushParentDir)
-      log.flushParentDir()
   }
 
   /**
@@ -669,8 +657,7 @@ class LogSegment private[log] (val log: FileRecords,
 object LogSegment {
 
   def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false,
-           initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = "",
-           needsRecovery: Boolean = false): LogSegment = {
+           initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = {
     val maxIndexSize = config.maxIndexSize
     new LogSegment(
       FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate),
@@ -680,8 +667,7 @@ object LogSegment {
       baseOffset,
       indexIntervalBytes = config.indexInterval,
       rollJitterMs = config.randomSegmentJitter,
-      time,
-      needsFlushParentDir = needsRecovery || !fileAlreadyExists)
+      time)
   }
 
   def deleteIfExists(dir: File, baseOffset: Long, fileSuffix: String = ""): Unit = {