Skip to content

Commit

Permalink
KAFKA-3968: fsync the parent directory of a segment file when the fil…
Browse files Browse the repository at this point in the history
…e is created (#10405)

Kafka does not call fsync() on the directory when a new log segment is created and flushed to disk.

The problem is that following sequence of calls doesn't guarantee file durability:

fd = open("log", O_RDWR | O_CREATE); // suppose open creates "log"
write(fd);
fsync(fd);

If the system crashes after fsync() but before the parent directory has been flushed to disk, the log file can disappear.

This PR is to flush the directory when flush() is called for the first time.

Reviewers: Jun Rao <junrao@gmail.com>
  • Loading branch information
ccding authored Apr 3, 2021
1 parent f863749 commit 66b0c5c
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,13 @@ public void flush() throws IOException {
channel.force(true);
}

/**
* Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing.
*/
public void flushParentDir() throws IOException {
Utils.flushParentDir(file.toPath());
}

/**
* Close this record set
*/
Expand Down Expand Up @@ -245,7 +252,7 @@ public void updateParentDir(File parentDir) {
*/
public void renameTo(File f) throws IOException {
try {
Utils.atomicMoveWithFallback(file.toPath(), f.toPath());
Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false);
} finally {
this.file = f;
}
Expand Down
37 changes: 37 additions & 0 deletions clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.kafka.common.utils;

import java.nio.BufferUnderflowException;
import java.nio.file.StandardOpenOption;
import java.util.AbstractMap;
import java.util.EnumSet;
import java.util.SortedSet;
Expand Down Expand Up @@ -893,10 +894,23 @@ public static ClassLoader getContextOrKafkaClassLoader() {

/**
* Attempts to move source to target atomically and falls back to a non-atomic move if it fails.
* This function also flushes the parent directory to guarantee crash consistency.
*
* @throws IOException if both atomic and non-atomic moves fail
*/
public static void atomicMoveWithFallback(Path source, Path target) throws IOException {
atomicMoveWithFallback(source, target, true);
}

/**
* Attempts to move source to target atomically and falls back to a non-atomic move if it fails.
* This function allows callers to decide whether to flush the parent directory. This is needed
* when a sequence of atomicMoveWithFallback is called for the same directory and we don't want
* to repeatedly flush the same parent directory.
*
* @throws IOException if both atomic and non-atomic moves fail
*/
public static void atomicMoveWithFallback(Path source, Path target, boolean needFlushParentDir) throws IOException {
try {
Files.move(source, target, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException outer) {
Expand All @@ -908,6 +922,29 @@ public static void atomicMoveWithFallback(Path source, Path target) throws IOExc
inner.addSuppressed(outer);
throw inner;
}
} finally {
if (needFlushParentDir) {
flushParentDir(target);
}
}
}

/**
* Flushes the parent directory to guarantee crash consistency.
*
* @throws IOException if flushing the parent directory fails.
*/
public static void flushParentDir(Path path) throws IOException {
FileChannel dir = null;
try {
Path parent = path.toAbsolutePath().getParent();
if (parent != null) {
dir = FileChannel.open(parent, StandardOpenOption.READ);
dir.force(true);
}
} finally {
if (dir != null)
dir.close();
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/AbstractIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ abstract class AbstractIndex(@volatile private var _file: File, val baseOffset:
* @throws IOException if rename fails
*/
def renameTo(f: File): Unit = {
try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false)
finally _file = f
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/LazyIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ object LazyIndex {
def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName)

def renameTo(f: File): Unit = {
try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false)
catch {
case _: NoSuchFileException if !file.exists => ()
}
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,8 @@ class Log(@volatile private var _dir: File,
baseOffset = baseOffset,
config,
time = time,
fileAlreadyExists = true)
fileAlreadyExists = true,
needsRecovery = !hadCleanShutdown)

try segment.sanityCheck(timeIndexFileNewlyCreated)
catch {
Expand Down Expand Up @@ -2325,7 +2326,7 @@ class Log(@volatile private var _dir: File,
* @throws IOException if the file can't be renamed and still exists
*/
private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = {
segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))
segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix, false))

def deleteSegments(): Unit = {
info(s"Deleting segment files ${segments.mkString(",")}")
Expand Down Expand Up @@ -2388,7 +2389,7 @@ class Log(@volatile private var _dir: File,
// need to do this in two phases to be crash safe AND do the delete asynchronously
// if we crash in the middle of this we complete the swap in loadSegments()
if (!isRecoveredSwapFile)
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix))
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix, false))
sortedNewSegments.reverse.foreach(addSegment(_))
val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet

Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import kafka.server.metadata.ConfigRepository
import kafka.server._
import kafka.utils._
import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.utils.{Time, Utils}
import org.apache.kafka.common.errors.{InconsistentTopicIdException, KafkaStorageException, LogDirNotFoundException}

import scala.jdk.CollectionConverters._
Expand Down Expand Up @@ -150,6 +150,7 @@ class LogManager(logDirs: Seq[File],
val created = dir.mkdirs()
if (!created)
throw new IOException(s"Failed to create data directory ${dir.getAbsolutePath}")
Utils.flushParentDir(dir.toPath)
}
if (!dir.isDirectory || !dir.canRead)
throw new IOException(s"${dir.getAbsolutePath} is not a readable log directory.")
Expand Down Expand Up @@ -866,6 +867,7 @@ class LogManager(logDirs: Seq[File],
val dir = new File(logDirPath, logDirName)
try {
Files.createDirectories(dir.toPath)
Utils.flushParentDir(dir.toPath)
Success(dir)
} catch {
case e: IOException =>
Expand Down
22 changes: 18 additions & 4 deletions core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
import java.nio.file.attribute.FileTime
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import kafka.common.LogSegmentOffsetOverflowException
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server.epoch.LeaderEpochFileCache
Expand Down Expand Up @@ -50,6 +51,7 @@ import scala.math._
* @param indexIntervalBytes The approximate number of bytes between entries in the index
* @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time
* @param time The time instance
* @param needsFlushParentDir Whether or not we need to flush the parent directory during the first flush
*/
@nonthreadsafe
class LogSegment private[log] (val log: FileRecords,
Expand All @@ -59,7 +61,8 @@ class LogSegment private[log] (val log: FileRecords,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time) extends Logging {
val time: Time,
val needsFlushParentDir: Boolean = false) extends Logging {

def offsetIndex: OffsetIndex = lazyOffsetIndex.get

Expand Down Expand Up @@ -95,6 +98,9 @@ class LogSegment private[log] (val log: FileRecords,
/* the number of bytes since we last added an entry in the offset index */
private var bytesSinceLastIndexEntry = 0

/* whether or not we need to flush the parent dir during the next flush */
private val atomicNeedsFlushParentDir = new AtomicBoolean(needsFlushParentDir)

// The timestamp we used for time based log rolling and for ensuring max compaction delay
// volatile for LogCleaner to see the update
@volatile private var rollingBasedTimestamp: Option[Long] = None
Expand Down Expand Up @@ -472,6 +478,9 @@ class LogSegment private[log] (val log: FileRecords,
offsetIndex.flush()
timeIndex.flush()
txnIndex.flush()
// We only need to flush the parent of the log file because all other files share the same parent
if (atomicNeedsFlushParentDir.getAndSet(false))
log.flushParentDir()
}
}

Expand All @@ -490,11 +499,14 @@ class LogSegment private[log] (val log: FileRecords,
* Change the suffix for the index and log files for this log segment
* IOException from this method should be handled by the caller
*/
def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = {
def changeFileSuffixes(oldSuffix: String, newSuffix: String, needsFlushParentDir: Boolean = true): Unit = {
log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix)))
lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix)))
txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
// We only need to flush the parent of the log file because all other files share the same parent
if (needsFlushParentDir)
log.flushParentDir()
}

/**
Expand Down Expand Up @@ -657,7 +669,8 @@ class LogSegment private[log] (val log: FileRecords,
object LogSegment {

def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false,
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = {
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = "",
needsRecovery: Boolean = false): LogSegment = {
val maxIndexSize = config.maxIndexSize
new LogSegment(
FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate),
Expand All @@ -667,7 +680,8 @@ object LogSegment {
baseOffset,
indexIntervalBytes = config.indexInterval,
rollJitterMs = config.randomSegmentJitter,
time)
time,
needsFlushParentDir = needsRecovery || !fileAlreadyExists)
}

def deleteIfExists(dir: File, baseOffset: Long, fileSuffix: String = ""): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/log/TransactionIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class TransactionIndex(val startOffset: Long, @volatile private var _file: File)
def renameTo(f: File): Unit = {
try {
if (file.exists)
Utils.atomicMoveWithFallback(file.toPath, f.toPath)
Utils.atomicMoveWithFallback(file.toPath, f.toPath, false)
} finally _file = f
}

Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ final class KafkaMetadataLog private (
val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
val destination = Snapshots.deleteRename(path, snapshotId)
try {
Utils.atomicMoveWithFallback(path, destination)
Utils.atomicMoveWithFallback(path, destination, false)
} catch {
case e: IOException =>
error(s"Error renaming snapshot file: $path to $destination", e)
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1370,7 +1370,7 @@ class LogCleanerTest {
// On recovery, clean operation is aborted. All messages should be present in the log
log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix)
for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) {
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false)
}
log = recoverAndCheck(config, allKeys)

Expand All @@ -1386,7 +1386,7 @@ class LogCleanerTest {
// renamed to .deleted. Clean operation is resumed during recovery.
log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix)
for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) {
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false)
}
log = recoverAndCheck(config, cleanedKeys)

Expand Down
6 changes: 3 additions & 3 deletions core/src/test/scala/unit/kafka/log/LogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3431,7 +3431,7 @@ class LogTest {
segment.truncateTo(0)
})
for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false)

val recoveredLog = recoverAndCheck(logConfig, expectedKeys)
assertEquals(expectedKeys, LogTest.keysInLog(recoveredLog))
Expand Down Expand Up @@ -3459,7 +3459,7 @@ class LogTest {
segment.truncateTo(0)
}
for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false)

val recoveredLog = recoverAndCheck(logConfig, expectedKeys)
assertEquals(expectedKeys, LogTest.keysInLog(recoveredLog))
Expand All @@ -3483,7 +3483,7 @@ class LogTest {
segment.changeFileSuffixes("", Log.SwapFileSuffix)
})
for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false)

// Truncate the old segment
segmentWithOverflow.truncateTo(0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public void testDeleteSnapshot(boolean renameBeforeDeleting) throws IOException

if (renameBeforeDeleting)
// rename snapshot before deleting
Utils.atomicMoveWithFallback(snapshotPath, Snapshots.deleteRename(snapshotPath, snapshotId));
Utils.atomicMoveWithFallback(snapshotPath, Snapshots.deleteRename(snapshotPath, snapshotId), false);

assertTrue(Snapshots.deleteSnapshotIfExists(logDirPath, snapshot.snapshotId()));
assertFalse(Files.exists(snapshotPath));
Expand Down

0 comments on commit 66b0c5c

Please sign in to comment.