diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 72d2d6183ffd..17a41e2a7447 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -197,13 +197,6 @@ public void flush() throws IOException { channel.force(true); } - /** - * Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing. - */ - public void flushParentDir() throws IOException { - Utils.flushParentDir(file.toPath()); - } - /** * Close this record set */ diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index b56ad155b1f2..09dcae4b1848 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -902,27 +902,26 @@ public static void atomicMoveWithFallback(Path source, Path target, boolean need } } finally { if (needFlushParentDir) { - flushParentDir(target); + flushDir(target.toAbsolutePath().normalize().getParent()); } } } /** - * Flushes the parent directory to guarantee crash consistency. + * Flushes dirty directories to guarantee crash consistency. * - * @throws IOException if flushing the parent directory fails. + * @throws IOException if flushing the directory fails. */ - public static void flushParentDir(Path path) throws IOException { - FileChannel dir = null; - try { - Path parent = path.toAbsolutePath().getParent(); - if (parent != null) { - dir = FileChannel.open(parent, StandardOpenOption.READ); + public static void flushDir(Path path) throws IOException { + if (path != null) { + FileChannel dir = null; + try { + dir = FileChannel.open(path, StandardOpenOption.READ); dir.force(true); + } finally { + if (dir != null) + dir.close(); } - } finally { - if (dir != null) - dir.close(); } } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index edbabb88feef..6c306101b147 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -1676,7 +1676,10 @@ class Log(@volatile private var _dir: File, if (offset > this.recoveryPoint) { debug(s"Flushing log up to offset $offset, last flushed: $lastFlushTime, current time: ${time.milliseconds()}, " + s"unflushed: $unflushedMessages") - logSegments(this.recoveryPoint, offset).foreach(_.flush()) + val segments = logSegments(this.recoveryPoint, offset) + segments.foreach(_.flush()) + // if there are any new segments, we need to flush the parent directory for crash consistency + segments.lastOption.filter(_.baseOffset >= this.recoveryPoint).foreach(_ => Utils.flushDir(dir.toPath)) lock synchronized { checkIfMemoryMappedBufferClosed() @@ -2312,7 +2315,7 @@ object Log extends Logging { // need to do this in two phases to be crash safe AND do the delete asynchronously // if we crash in the middle of this we complete the swap in loadSegments() if (!isRecoveredSwapFile) - sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix, false)) + sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)) sortedNewSegments.reverse.foreach(existingSegments.add(_)) val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet @@ -2335,6 +2338,7 @@ object Log extends Logging { } // okay we are safe now, remove the swap suffix sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, "")) + Utils.flushDir(dir.toPath) } /** @@ -2369,7 +2373,7 @@ object Log extends Logging { scheduler: Scheduler, logDirFailureChannel: LogDirFailureChannel, producerStateManager: ProducerStateManager): Unit = { - segmentsToDelete.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix, false)) + segmentsToDelete.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix)) def deleteSegments(): Unit = { info(s"Deleting segment files ${segmentsToDelete.mkString(",")}") diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index a13a7d484e95..2a4d9539babf 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -150,7 +150,7 @@ class LogManager(logDirs: Seq[File], val created = dir.mkdirs() if (!created) throw new IOException(s"Failed to create data directory ${dir.getAbsolutePath}") - Utils.flushParentDir(dir.toPath) + Utils.flushDir(dir.toPath.toAbsolutePath.normalize.getParent) } if (!dir.isDirectory || !dir.canRead) throw new IOException(s"${dir.getAbsolutePath} is not a readable log directory.") @@ -640,6 +640,8 @@ class LogManager(logDirs: Seq[File], try { recoveryPointCheckpoints.get(logDir).foreach { checkpoint => val recoveryOffsets = logsToCheckpoint.map { case (tp, log) => tp -> log.recoveryPoint } + // checkpoint.write calls Utils.atomicMoveWithFallback, which flushes the parent + // directory and guarantees crash consistency. checkpoint.write(recoveryOffsets) } } catch { @@ -867,7 +869,6 @@ class LogManager(logDirs: Seq[File], val dir = new File(logDirPath, logDirName) try { Files.createDirectories(dir.toPath) - Utils.flushParentDir(dir.toPath) Success(dir) } catch { case e: IOException => diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index e3b09d4b90f8..37882ffa5259 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -20,7 +20,6 @@ import java.io.{File, IOException} import java.nio.file.{Files, NoSuchFileException} import java.nio.file.attribute.FileTime import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicBoolean import kafka.common.LogSegmentOffsetOverflowException import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.server.epoch.LeaderEpochFileCache @@ -51,7 +50,6 @@ import scala.math._ * @param indexIntervalBytes The approximate number of bytes between entries in the index * @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time * @param time The time instance - * @param needsFlushParentDir Whether or not we need to flush the parent directory during the first flush */ @nonthreadsafe class LogSegment private[log] (val log: FileRecords, @@ -61,8 +59,7 @@ class LogSegment private[log] (val log: FileRecords, val baseOffset: Long, val indexIntervalBytes: Int, val rollJitterMs: Long, - val time: Time, - val needsFlushParentDir: Boolean = false) extends Logging { + val time: Time) extends Logging { def offsetIndex: OffsetIndex = lazyOffsetIndex.get @@ -98,9 +95,6 @@ class LogSegment private[log] (val log: FileRecords, /* the number of bytes since we last added an entry in the offset index */ private var bytesSinceLastIndexEntry = 0 - /* whether or not we need to flush the parent dir during the next flush */ - private val atomicNeedsFlushParentDir = new AtomicBoolean(needsFlushParentDir) - // The timestamp we used for time based log rolling and for ensuring max compaction delay // volatile for LogCleaner to see the update @volatile private var rollingBasedTimestamp: Option[Long] = None @@ -478,9 +472,6 @@ class LogSegment private[log] (val log: FileRecords, offsetIndex.flush() timeIndex.flush() txnIndex.flush() - // We only need to flush the parent of the log file because all other files share the same parent - if (atomicNeedsFlushParentDir.getAndSet(false)) - log.flushParentDir() } } @@ -499,14 +490,11 @@ class LogSegment private[log] (val log: FileRecords, * Change the suffix for the index and log files for this log segment * IOException from this method should be handled by the caller */ - def changeFileSuffixes(oldSuffix: String, newSuffix: String, needsFlushParentDir: Boolean = true): Unit = { + def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = { log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix))) lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix))) txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix))) - // We only need to flush the parent of the log file because all other files share the same parent - if (needsFlushParentDir) - log.flushParentDir() } /** @@ -669,8 +657,7 @@ class LogSegment private[log] (val log: FileRecords, object LogSegment { def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false, - initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = "", - needsRecovery: Boolean = false): LogSegment = { + initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = { val maxIndexSize = config.maxIndexSize new LogSegment( FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate), @@ -680,8 +667,7 @@ object LogSegment { baseOffset, indexIntervalBytes = config.indexInterval, rollJitterMs = config.randomSegmentJitter, - time, - needsFlushParentDir = needsRecovery || !fileAlreadyExists) + time) } def deleteIfExists(dir: File, baseOffset: Long, fileSuffix: String = ""): Unit = {