Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework on KAFKA-3968: fsync the parent directory of a segment file when the file is created #10680

Merged
merged 1 commit into from
May 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,13 +197,6 @@ public void flush() throws IOException {
channel.force(true);
}

/**
* Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing.
*/
public void flushParentDir() throws IOException {
Utils.flushParentDir(file.toPath());
}

/**
* Close this record set
*/
Expand Down
23 changes: 11 additions & 12 deletions clients/src/main/java/org/apache/kafka/common/utils/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -902,27 +902,26 @@ public static void atomicMoveWithFallback(Path source, Path target, boolean need
}
} finally {
if (needFlushParentDir) {
flushParentDir(target);
flushDir(target.toAbsolutePath().normalize().getParent());
}
}
}

/**
* Flushes the parent directory to guarantee crash consistency.
* Flushes dirty directories to guarantee crash consistency.
*
* @throws IOException if flushing the parent directory fails.
* @throws IOException if flushing the directory fails.
*/
public static void flushParentDir(Path path) throws IOException {
FileChannel dir = null;
try {
Path parent = path.toAbsolutePath().getParent();
if (parent != null) {
dir = FileChannel.open(parent, StandardOpenOption.READ);
public static void flushDir(Path path) throws IOException {
if (path != null) {
FileChannel dir = null;
try {
dir = FileChannel.open(path, StandardOpenOption.READ);
dir.force(true);
} finally {
if (dir != null)
dir.close();
}
} finally {
if (dir != null)
dir.close();
}
}

Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/kafka/log/Log.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1676,7 +1676,10 @@ class Log(@volatile private var _dir: File,
if (offset > this.recoveryPoint) {
debug(s"Flushing log up to offset $offset, last flushed: $lastFlushTime, current time: ${time.milliseconds()}, " +
s"unflushed: $unflushedMessages")
logSegments(this.recoveryPoint, offset).foreach(_.flush())
val segments = logSegments(this.recoveryPoint, offset)
segments.foreach(_.flush())
// if there are any new segments, we need to flush the parent directory for crash consistency
segments.lastOption.filter(_.baseOffset >= this.recoveryPoint).foreach(_ => Utils.flushDir(dir.toPath))

lock synchronized {
checkIfMemoryMappedBufferClosed()
Expand Down Expand Up @@ -2312,7 +2315,7 @@ object Log extends Logging {
// need to do this in two phases to be crash safe AND do the delete asynchronously
// if we crash in the middle of this we complete the swap in loadSegments()
if (!isRecoveredSwapFile)
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix, false))
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix))
sortedNewSegments.reverse.foreach(existingSegments.add(_))
val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet

Expand All @@ -2335,6 +2338,7 @@ object Log extends Logging {
}
// okay we are safe now, remove the swap suffix
sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
Utils.flushDir(dir.toPath)
}

/**
Expand Down Expand Up @@ -2369,7 +2373,7 @@ object Log extends Logging {
scheduler: Scheduler,
logDirFailureChannel: LogDirFailureChannel,
producerStateManager: ProducerStateManager): Unit = {
segmentsToDelete.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix, false))
segmentsToDelete.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix))

def deleteSegments(): Unit = {
info(s"Deleting segment files ${segmentsToDelete.mkString(",")}")
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/kafka/log/LogManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class LogManager(logDirs: Seq[File],
val created = dir.mkdirs()
if (!created)
throw new IOException(s"Failed to create data directory ${dir.getAbsolutePath}")
Utils.flushParentDir(dir.toPath)
Utils.flushDir(dir.toPath.toAbsolutePath.normalize.getParent)
}
if (!dir.isDirectory || !dir.canRead)
throw new IOException(s"${dir.getAbsolutePath} is not a readable log directory.")
Expand Down Expand Up @@ -640,6 +640,8 @@ class LogManager(logDirs: Seq[File],
try {
recoveryPointCheckpoints.get(logDir).foreach { checkpoint =>
val recoveryOffsets = logsToCheckpoint.map { case (tp, log) => tp -> log.recoveryPoint }
// checkpoint.write calls Utils.atomicMoveWithFallback, which flushes the parent
// directory and guarantees crash consistency.
checkpoint.write(recoveryOffsets)
}
} catch {
Expand Down Expand Up @@ -867,7 +869,6 @@ class LogManager(logDirs: Seq[File],
val dir = new File(logDirPath, logDirName)
try {
Files.createDirectories(dir.toPath)
Utils.flushParentDir(dir.toPath)
Success(dir)
} catch {
case e: IOException =>
Expand Down
22 changes: 4 additions & 18 deletions core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import java.io.{File, IOException}
import java.nio.file.{Files, NoSuchFileException}
import java.nio.file.attribute.FileTime
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import kafka.common.LogSegmentOffsetOverflowException
import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.server.epoch.LeaderEpochFileCache
Expand Down Expand Up @@ -51,7 +50,6 @@ import scala.math._
* @param indexIntervalBytes The approximate number of bytes between entries in the index
* @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time
* @param time The time instance
* @param needsFlushParentDir Whether or not we need to flush the parent directory during the first flush
*/
@nonthreadsafe
class LogSegment private[log] (val log: FileRecords,
Expand All @@ -61,8 +59,7 @@ class LogSegment private[log] (val log: FileRecords,
val baseOffset: Long,
val indexIntervalBytes: Int,
val rollJitterMs: Long,
val time: Time,
val needsFlushParentDir: Boolean = false) extends Logging {
val time: Time) extends Logging {

def offsetIndex: OffsetIndex = lazyOffsetIndex.get

Expand Down Expand Up @@ -98,9 +95,6 @@ class LogSegment private[log] (val log: FileRecords,
/* the number of bytes since we last added an entry in the offset index */
private var bytesSinceLastIndexEntry = 0

/* whether or not we need to flush the parent dir during the next flush */
private val atomicNeedsFlushParentDir = new AtomicBoolean(needsFlushParentDir)

// The timestamp we used for time based log rolling and for ensuring max compaction delay
// volatile for LogCleaner to see the update
@volatile private var rollingBasedTimestamp: Option[Long] = None
Expand Down Expand Up @@ -478,9 +472,6 @@ class LogSegment private[log] (val log: FileRecords,
offsetIndex.flush()
timeIndex.flush()
txnIndex.flush()
// We only need to flush the parent of the log file because all other files share the same parent
if (atomicNeedsFlushParentDir.getAndSet(false))
log.flushParentDir()
}
}

Expand All @@ -499,14 +490,11 @@ class LogSegment private[log] (val log: FileRecords,
* Change the suffix for the index and log files for this log segment
* IOException from this method should be handled by the caller
*/
def changeFileSuffixes(oldSuffix: String, newSuffix: String, needsFlushParentDir: Boolean = true): Unit = {
def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = {
log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix)))
lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix)))
txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
// We only need to flush the parent of the log file because all other files share the same parent
if (needsFlushParentDir)
log.flushParentDir()
}

/**
Expand Down Expand Up @@ -669,8 +657,7 @@ class LogSegment private[log] (val log: FileRecords,
object LogSegment {

def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false,
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = "",
needsRecovery: Boolean = false): LogSegment = {
initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = {
val maxIndexSize = config.maxIndexSize
new LogSegment(
FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate),
Expand All @@ -680,8 +667,7 @@ object LogSegment {
baseOffset,
indexIntervalBytes = config.indexInterval,
rollJitterMs = config.randomSegmentJitter,
time,
needsFlushParentDir = needsRecovery || !fileAlreadyExists)
time)
}

def deleteIfExists(dir: File, baseOffset: Long, fileSuffix: String = ""): Unit = {
Expand Down