From 459facbd4f7c21941f60a8d8836a74b3e5dacc25 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 25 Mar 2021 16:07:58 -0700 Subject: [PATCH 01/25] [KAFKA-3968] fsync the parent directory of a segment file when the file is created --- .../kafka/common/record/FileRecords.java | 24 ++++++++++++++++--- .../kafka/common/record/FileRecordsTest.java | 10 ++++---- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 21bc8bd9d1710..77cc2f170d037 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.network.TransferableChannel; import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; @@ -50,6 +51,7 @@ public class FileRecords extends AbstractRecords implements Closeable { private final AtomicInteger size; private final FileChannel channel; private volatile File file; + private final AtomicBoolean needFlushParentDir; /** * The {@code FileRecords.open} methods should be used instead of this constructor whenever possible. @@ -59,13 +61,15 @@ public class FileRecords extends AbstractRecords implements Closeable { FileChannel channel, int start, int end, - boolean isSlice) throws IOException { + boolean isSlice, + boolean needFlushParentDir) throws IOException { this.file = file; this.channel = channel; this.start = start; this.end = end; this.isSlice = isSlice; this.size = new AtomicInteger(); + this.needFlushParentDir = new AtomicBoolean(needFlushParentDir); if (isSlice) { // don't check the file size if this is just a slice view @@ -136,7 +140,7 @@ public void readInto(ByteBuffer buffer, int position) throws IOException { public FileRecords slice(int position, int size) throws IOException { int availableBytes = availableBytes(position, size); int startPosition = this.start + position; - return new FileRecords(file, channel, startPosition, startPosition + availableBytes, true); + return new FileRecords(file, channel, startPosition, startPosition + availableBytes, true, false); } /** @@ -195,6 +199,19 @@ public int append(MemoryRecords records) throws IOException { */ public void flush() throws IOException { channel.force(true); + if (needFlushParentDir.getAndSet(false)) { + FileChannel dir = null; + try { + dir = FileChannel.open(file.getAbsoluteFile().getParentFile().toPath(), + java.nio.file.StandardOpenOption.READ); + dir.force(true); + } catch (Exception e) { + throw new KafkaException("Attempt to flush the parent directory of " + file + " failed."); + } finally { + if (dir != null) + dir.close(); + } + } } /** @@ -249,6 +266,7 @@ public void renameTo(File f) throws IOException { } finally { this.file = f; } + needFlushParentDir.set(true); } /** @@ -427,7 +445,7 @@ public static FileRecords open(File file, boolean preallocate) throws IOException { FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate); int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE; - return new FileRecords(file, channel, 0, end, false); + return new FileRecords(file, channel, 0, end, false, mutable && !fileAlreadyExists); } public static FileRecords open(File file, diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index c32359b4467c7..705d2a1c73d49 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -90,7 +90,7 @@ public void testAppendProtectsFromOverflow() throws Exception { FileChannel fileChannelMock = mock(FileChannel.class); when(fileChannelMock.size()).thenReturn((long) Integer.MAX_VALUE); - FileRecords records = new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false); + FileRecords records = new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false, false); assertThrows(IllegalArgumentException.class, () -> append(records, values)); } @@ -100,7 +100,7 @@ public void testOpenOversizeFile() throws Exception { FileChannel fileChannelMock = mock(FileChannel.class); when(fileChannelMock.size()).thenReturn(Integer.MAX_VALUE + 5L); - assertThrows(KafkaException.class, () -> new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false)); + assertThrows(KafkaException.class, () -> new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false, false)); } @Test @@ -314,7 +314,7 @@ public void testTruncateNotCalledIfSizeIsSameAsTargetSize() throws IOException { when(channelMock.size()).thenReturn(42L); when(channelMock.position(42L)).thenReturn(null); - FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); + FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false, false); fileRecords.truncateTo(42); verify(channelMock, atLeastOnce()).size(); @@ -331,7 +331,7 @@ public void testTruncateNotCalledIfSizeIsBiggerThanTargetSize() throws IOExcepti when(channelMock.size()).thenReturn(42L); - FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); + FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false, false); try { fileRecords.truncateTo(43); @@ -353,7 +353,7 @@ public void testTruncateIfSizeIsDifferentToTargetSize() throws IOException { when(channelMock.size()).thenReturn(42L); when(channelMock.truncate(anyLong())).thenReturn(channelMock); - FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); + FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false, false); fileRecords.truncateTo(23); verify(channelMock, atLeastOnce()).size(); From b4c8284fdef3ab2fd0ada533fc747ee64a7df792 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 25 Mar 2021 23:32:47 -0700 Subject: [PATCH 02/25] move import --- .../main/java/org/apache/kafka/common/record/FileRecords.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 77cc2f170d037..7c75a4d20cc33 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.record; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.network.TransferableChannel; import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; @@ -34,6 +33,7 @@ import java.nio.file.StandardOpenOption; import java.util.Objects; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** From b2601698a5c8610790f94c387c4ea5a85256a619 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Mon, 29 Mar 2021 10:00:13 -0700 Subject: [PATCH 03/25] address comments (except the topic partition one) --- .../kafka/common/record/FileRecords.java | 33 +++++++++++-------- core/src/main/scala/kafka/log/Log.scala | 4 +-- .../src/main/scala/kafka/log/LogSegment.scala | 5 ++- 3 files changed, 26 insertions(+), 16 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 7c75a4d20cc33..50307b08ebb2e 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.common.record; +import java.nio.file.Path; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.network.TransferableChannel; import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; @@ -200,17 +201,24 @@ public int append(MemoryRecords records) throws IOException { public void flush() throws IOException { channel.force(true); if (needFlushParentDir.getAndSet(false)) { - FileChannel dir = null; - try { - dir = FileChannel.open(file.getAbsoluteFile().getParentFile().toPath(), - java.nio.file.StandardOpenOption.READ); - dir.force(true); - } catch (Exception e) { - throw new KafkaException("Attempt to flush the parent directory of " + file + " failed."); - } finally { - if (dir != null) - dir.close(); - } + flushParentDir(); + } + } + + /** + * Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing. + */ + public void flushParentDir() throws IOException { + FileChannel dir = null; + try { + dir = FileChannel.open(file.toPath().getParent(), StandardOpenOption.READ); + dir.force(true); + } catch (Exception e) { + throw new KafkaException("Attempt to flush the parent directory of " + file + " failed."); + } finally { + if (dir != null) + dir.close(); + needFlushParentDir.set(false); } } @@ -266,7 +274,6 @@ public void renameTo(File f) throws IOException { } finally { this.file = f; } - needFlushParentDir.set(true); } /** @@ -445,7 +452,7 @@ public static FileRecords open(File file, boolean preallocate) throws IOException { FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate); int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE; - return new FileRecords(file, channel, 0, end, false, mutable && !fileAlreadyExists); + return new FileRecords(file, channel, 0, end, false, mutable); } public static FileRecords open(File file, diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 0ad82f46b9d4b..814a25a4de011 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2342,7 +2342,7 @@ class Log(@volatile private var _dir: File, * @throws IOException if the file can't be renamed and still exists */ private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = { - segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix)) + segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix, true)) def deleteSegments(): Unit = { info(s"Deleting segment files ${segments.mkString(",")}") @@ -2418,7 +2418,7 @@ class Log(@volatile private var _dir: File, deleteSegmentFiles(List(seg), asyncDelete = true, deleteProducerStateSnapshots = !newSegmentBaseOffsets.contains(seg.baseOffset)) } // okay we are safe now, remove the swap suffix - sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, "")) + sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, "", true)) } } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 37882ffa52592..381e13bcaa65a 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -490,11 +490,14 @@ class LogSegment private[log] (val log: FileRecords, * Change the suffix for the index and log files for this log segment * IOException from this method should be handled by the caller */ - def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = { + def changeFileSuffixes(oldSuffix: String, newSuffix: String, needFlushParentDir: Boolean = false): Unit = { log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix))) lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix))) txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix))) + // We only need to flush the parent of the log file because all other files share the same parent + if (needFlushParentDir) + log.flushParentDir() } /** From ba086e91a9400cd830a9af1b18d30c65b0c99f42 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Mon, 29 Mar 2021 14:20:46 -0700 Subject: [PATCH 04/25] remove import --- .../main/java/org/apache/kafka/common/record/FileRecords.java | 1 - 1 file changed, 1 deletion(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 50307b08ebb2e..3de4044e61c53 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.common.record; -import java.nio.file.Path; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.network.TransferableChannel; import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch; From 2a19e0e92ef3465935849ec61a62aba8c6bc06f2 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Tue, 30 Mar 2021 13:53:51 -0700 Subject: [PATCH 05/25] reuse the function in utils.java --- .../kafka/common/record/FileRecords.java | 8 +------- .../org/apache/kafka/common/utils/Utils.java | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 3de4044e61c53..2b69668c52f7b 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -208,15 +208,9 @@ public void flush() throws IOException { * Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing. */ public void flushParentDir() throws IOException { - FileChannel dir = null; try { - dir = FileChannel.open(file.toPath().getParent(), StandardOpenOption.READ); - dir.force(true); - } catch (Exception e) { - throw new KafkaException("Attempt to flush the parent directory of " + file + " failed."); + Utils.flushParentDir(file.toPath()); } finally { - if (dir != null) - dir.close(); needFlushParentDir.set(false); } } diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index c3c7dc39fff84..da92396eb3485 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.utils; import java.nio.BufferUnderflowException; +import java.nio.file.StandardOpenOption; import java.util.AbstractMap; import java.util.EnumSet; import java.util.SortedSet; @@ -911,6 +912,24 @@ public static void atomicMoveWithFallback(Path source, Path target) throws IOExc } } + /** + * Flushes the parent directory to guarantee crash consistency. + * + * @throws IOException if flushing the parent directory fails. + */ + public static void flushParentDir(Path path) throws IOException { + FileChannel dir = null; + try { + dir = FileChannel.open(path.getParent(), StandardOpenOption.READ); + dir.force(true); + } catch (Exception e) { + throw new KafkaException("Attempt to flush the parent directory of " + path + " failed."); + } finally { + if (dir != null) + dir.close(); + } + } + /** * Closes all the provided closeables. * @throws IOException if any of the close methods throws an IOException. From 40a1abe825a9c1539031ad9ff12b79114103916b Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Tue, 30 Mar 2021 13:56:29 -0700 Subject: [PATCH 06/25] simplify logic --- .../java/org/apache/kafka/common/record/FileRecords.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 2b69668c52f7b..fe2d03a9882fc 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -208,11 +208,8 @@ public void flush() throws IOException { * Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing. */ public void flushParentDir() throws IOException { - try { - Utils.flushParentDir(file.toPath()); - } finally { - needFlushParentDir.set(false); - } + needFlushParentDir.set(false); + Utils.flushParentDir(file.toPath()); } /** From 1ac80b61a7325f7522078d88b8784f6227f673c5 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Tue, 30 Mar 2021 14:03:12 -0700 Subject: [PATCH 07/25] default changeFileSuffixes flush to true --- .../java/org/apache/kafka/common/record/FileRecords.java | 2 +- core/src/main/scala/kafka/log/Log.scala | 6 +++--- core/src/main/scala/kafka/log/LogSegment.scala | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index fe2d03a9882fc..c317429ab3498 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -200,7 +200,7 @@ public int append(MemoryRecords records) throws IOException { public void flush() throws IOException { channel.force(true); if (needFlushParentDir.getAndSet(false)) { - flushParentDir(); + Utils.flushParentDir(file.toPath()); } } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 814a25a4de011..8077790a3f13a 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2342,7 +2342,7 @@ class Log(@volatile private var _dir: File, * @throws IOException if the file can't be renamed and still exists */ private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = { - segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix, true)) + segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix)) def deleteSegments(): Unit = { info(s"Deleting segment files ${segments.mkString(",")}") @@ -2405,7 +2405,7 @@ class Log(@volatile private var _dir: File, // need to do this in two phases to be crash safe AND do the delete asynchronously // if we crash in the middle of this we complete the swap in loadSegments() if (!isRecoveredSwapFile) - sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix)) + sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, Log.SwapFileSuffix, false)) sortedNewSegments.reverse.foreach(addSegment(_)) val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet @@ -2418,7 +2418,7 @@ class Log(@volatile private var _dir: File, deleteSegmentFiles(List(seg), asyncDelete = true, deleteProducerStateSnapshots = !newSegmentBaseOffsets.contains(seg.baseOffset)) } // okay we are safe now, remove the swap suffix - sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, "", true)) + sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, "")) } } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 381e13bcaa65a..298a0abc2487c 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -490,7 +490,7 @@ class LogSegment private[log] (val log: FileRecords, * Change the suffix for the index and log files for this log segment * IOException from this method should be handled by the caller */ - def changeFileSuffixes(oldSuffix: String, newSuffix: String, needFlushParentDir: Boolean = false): Unit = { + def changeFileSuffixes(oldSuffix: String, newSuffix: String, needFlushParentDir: Boolean = true): Unit = { log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix))) lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix))) From 09cac0b5aeb690df657b735777b2cf07c0f77f8e Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Tue, 30 Mar 2021 14:20:19 -0700 Subject: [PATCH 08/25] flush when mkdirs --- core/src/main/scala/kafka/log/LogManager.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 432cf4a185b2f..b2979f3edc03b 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -21,14 +21,13 @@ import java.io._ import java.nio.file.Files import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger - import kafka.metrics.KafkaMetricsGroup import kafka.server.checkpoints.OffsetCheckpointFile import kafka.server.metadata.ConfigRepository import kafka.server._ import kafka.utils._ import org.apache.kafka.common.{KafkaException, TopicPartition} -import org.apache.kafka.common.utils.Time +import org.apache.kafka.common.utils.{Time, Utils} import org.apache.kafka.common.errors.{KafkaStorageException, LogDirNotFoundException} import scala.jdk.CollectionConverters._ @@ -150,6 +149,7 @@ class LogManager(logDirs: Seq[File], val created = dir.mkdirs() if (!created) throw new IOException(s"Failed to create data directory ${dir.getAbsolutePath}") + Utils.flushParentDir(dir.toPath) } if (!dir.isDirectory || !dir.canRead) throw new IOException(s"${dir.getAbsolutePath} is not a readable log directory.") @@ -848,6 +848,7 @@ class LogManager(logDirs: Seq[File], val dir = new File(logDirPath, logDirName) try { Files.createDirectories(dir.toPath) + Utils.flushParentDir(dir.toPath) Success(dir) } catch { case e: IOException => From 5be95aa919627f014ae64e6e4d4dd8bd38bf3a6c Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Tue, 30 Mar 2021 14:21:34 -0700 Subject: [PATCH 09/25] revert accidential change --- core/src/main/scala/kafka/log/LogManager.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index b2979f3edc03b..3f0becca6e055 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -21,6 +21,7 @@ import java.io._ import java.nio.file.Files import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger + import kafka.metrics.KafkaMetricsGroup import kafka.server.checkpoints.OffsetCheckpointFile import kafka.server.metadata.ConfigRepository From c9448c8ddbc60a2d93c2b6b4505acfca3ffc1fe6 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Tue, 30 Mar 2021 15:43:07 -0700 Subject: [PATCH 10/25] atomicMoveWithFallback --- .../org/apache/kafka/common/utils/Utils.java | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index da92396eb3485..2f3208072a2a9 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -894,10 +894,23 @@ public static ClassLoader getContextOrKafkaClassLoader() { /** * Attempts to move source to target atomically and falls back to a non-atomic move if it fails. + * This function also flushes the parent directory to guarantee crash consistency. * * @throws IOException if both atomic and non-atomic moves fail */ public static void atomicMoveWithFallback(Path source, Path target) throws IOException { + atomicMoveWithFallback(source, target, true); + } + + /** + * Attempts to move source to target atomically and falls back to a non-atomic move if it fails. + * This function allows callers to decide whether to flush the parent directory. This is needed + * when a sequence of atomicMoveWithFallback is called for the same directory and we don't want + * to repeatedly flush the same parent directory. + * + * @throws IOException if both atomic and non-atomic moves fail + */ + public static void atomicMoveWithFallback(Path source, Path target, boolean needFlushParentDir) throws IOException { try { Files.move(source, target, StandardCopyOption.ATOMIC_MOVE); } catch (IOException outer) { @@ -909,6 +922,10 @@ public static void atomicMoveWithFallback(Path source, Path target) throws IOExc inner.addSuppressed(outer); throw inner; } + } finally { + if (needFlushParentDir) { + flushParentDir(target); + } } } From daeb698248f424d2dffe5aef0b865def9ba4d2b6 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Tue, 30 Mar 2021 15:48:22 -0700 Subject: [PATCH 11/25] no flush parent dir in test --- core/src/test/scala/unit/kafka/log/LogCleanerTest.scala | 4 ++-- core/src/test/scala/unit/kafka/log/LogTest.scala | 6 +++--- .../test/java/org/apache/kafka/snapshot/SnapshotsTest.java | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index dc51ce6d4e506..6b561c620ada0 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -1368,7 +1368,7 @@ class LogCleanerTest { // On recovery, clean operation is aborted. All messages should be present in the log log.logSegments.head.changeFileSuffixes("", Log.CleanedFileSuffix) for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) { - Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) + Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false) } log = recoverAndCheck(config, allKeys) @@ -1384,7 +1384,7 @@ class LogCleanerTest { // renamed to .deleted. Clean operation is resumed during recovery. log.logSegments.head.changeFileSuffixes("", Log.SwapFileSuffix) for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) { - Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) + Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false) } log = recoverAndCheck(config, cleanedKeys) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 1e5257df49853..700e87a23b78a 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -3403,7 +3403,7 @@ class LogTest { segment.truncateTo(0) }) for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) - Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) + Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false) val recoveredLog = recoverAndCheck(logConfig, expectedKeys) assertEquals(expectedKeys, LogTest.keysInLog(recoveredLog)) @@ -3431,7 +3431,7 @@ class LogTest { segment.truncateTo(0) } for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) - Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) + Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false) val recoveredLog = recoverAndCheck(logConfig, expectedKeys) assertEquals(expectedKeys, LogTest.keysInLog(recoveredLog)) @@ -3455,7 +3455,7 @@ class LogTest { segment.changeFileSuffixes("", Log.SwapFileSuffix) }) for (file <- logDir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) - Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, ""))) + Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")), false) // Truncate the old segment segmentWithOverflow.truncateTo(0) diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java index 868db6fe7af22..7960a834ea49d 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotsTest.java @@ -116,7 +116,7 @@ public void testDeleteSnapshot(boolean renameBeforeDeleting) throws IOException if (renameBeforeDeleting) // rename snapshot before deleting - Utils.atomicMoveWithFallback(snapshotPath, Snapshots.deleteRename(snapshotPath, snapshotId)); + Utils.atomicMoveWithFallback(snapshotPath, Snapshots.deleteRename(snapshotPath, snapshotId), false); assertTrue(Snapshots.deleteSnapshotIfExists(logDirPath, snapshot.snapshotId())); assertFalse(Files.exists(snapshotPath)); From 0d4800b43ead8d174ab154010e579814182dfff0 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Tue, 30 Mar 2021 17:05:45 -0700 Subject: [PATCH 12/25] check null pointer --- .../src/main/java/org/apache/kafka/common/utils/Utils.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 2f3208072a2a9..e1df0ae66c233 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -937,8 +937,11 @@ public static void atomicMoveWithFallback(Path source, Path target, boolean need public static void flushParentDir(Path path) throws IOException { FileChannel dir = null; try { - dir = FileChannel.open(path.getParent(), StandardOpenOption.READ); - dir.force(true); + Path parent = path.toAbsolutePath().getParent(); + if (parent != null) { + dir = FileChannel.open(parent, StandardOpenOption.READ); + dir.force(true); + } } catch (Exception e) { throw new KafkaException("Attempt to flush the parent directory of " + path + " failed."); } finally { From 95a6c3fe5aa4be1d366e36ac56b946fd9984022a Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Wed, 31 Mar 2021 01:26:30 -0700 Subject: [PATCH 13/25] fix unit test error --- clients/src/main/java/org/apache/kafka/common/utils/Utils.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index e1df0ae66c233..2a6affe2eb5a4 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -942,8 +942,6 @@ public static void flushParentDir(Path path) throws IOException { dir = FileChannel.open(parent, StandardOpenOption.READ); dir.force(true); } - } catch (Exception e) { - throw new KafkaException("Attempt to flush the parent directory of " + path + " failed."); } finally { if (dir != null) dir.close(); From 8c859f3265b5354a016487c259b811d00ecdb30f Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Wed, 31 Mar 2021 20:55:32 -0700 Subject: [PATCH 14/25] set flag after flush --- .../java/org/apache/kafka/common/record/FileRecords.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index c317429ab3498..2592ca11e7f80 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -200,7 +200,12 @@ public int append(MemoryRecords records) throws IOException { public void flush() throws IOException { channel.force(true); if (needFlushParentDir.getAndSet(false)) { - Utils.flushParentDir(file.toPath()); + try { + Utils.flushParentDir(file.toPath()); + } catch (Exception e) { + needFlushParentDir.set(true); + throw e; + } } } @@ -208,8 +213,8 @@ public void flush() throws IOException { * Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing. */ public void flushParentDir() throws IOException { - needFlushParentDir.set(false); Utils.flushParentDir(file.toPath()); + needFlushParentDir.set(false); } /** From fdc1faa979b6d801e35d2c3f7c64c261d14c6cfd Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Wed, 31 Mar 2021 21:01:35 -0700 Subject: [PATCH 15/25] disable flushing on renameTo --- .../main/java/org/apache/kafka/common/record/FileRecords.java | 2 +- core/src/main/scala/kafka/log/AbstractIndex.scala | 2 +- core/src/main/scala/kafka/log/LazyIndex.scala | 2 +- core/src/main/scala/kafka/log/TransactionIndex.scala | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 2592ca11e7f80..07918fa1adb17 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -265,7 +265,7 @@ public void updateParentDir(File parentDir) { */ public void renameTo(File f) throws IOException { try { - Utils.atomicMoveWithFallback(file.toPath(), f.toPath()); + Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false); } finally { this.file = f; } diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index 5d7de88b96a42..31b9f6d8dd71e 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -207,7 +207,7 @@ abstract class AbstractIndex(@volatile private var _file: File, val baseOffset: * @throws IOException if rename fails */ def renameTo(f: File): Unit = { - try Utils.atomicMoveWithFallback(file.toPath, f.toPath) + try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false) finally _file = f } diff --git a/core/src/main/scala/kafka/log/LazyIndex.scala b/core/src/main/scala/kafka/log/LazyIndex.scala index a5a7c34a6e5b7..5ef18931a84cf 100644 --- a/core/src/main/scala/kafka/log/LazyIndex.scala +++ b/core/src/main/scala/kafka/log/LazyIndex.scala @@ -131,7 +131,7 @@ object LazyIndex { def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName) def renameTo(f: File): Unit = { - try Utils.atomicMoveWithFallback(file.toPath, f.toPath) + try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false) catch { case _: NoSuchFileException if !file.exists => () } diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala index 565c4eb574060..ca3d1bb31012d 100644 --- a/core/src/main/scala/kafka/log/TransactionIndex.scala +++ b/core/src/main/scala/kafka/log/TransactionIndex.scala @@ -109,7 +109,7 @@ class TransactionIndex(val startOffset: Long, @volatile private var _file: File) def renameTo(f: File): Unit = { try { if (file.exists) - Utils.atomicMoveWithFallback(file.toPath, f.toPath) + Utils.atomicMoveWithFallback(file.toPath, f.toPath, false) } finally _file = f } From 6795ec9154a55eabbc4a68b0939ed43ea07f4f21 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 1 Apr 2021 11:49:48 -0700 Subject: [PATCH 16/25] address comments based on offline discussion with Jun --- .../org/apache/kafka/common/record/FileRecords.java | 13 ++++--------- core/src/main/scala/kafka/log/AbstractIndex.scala | 4 ++-- core/src/main/scala/kafka/log/LazyIndex.scala | 12 ++++++------ core/src/main/scala/kafka/log/Log.scala | 2 +- core/src/main/scala/kafka/log/LogSegment.scala | 8 ++++---- .../src/main/scala/kafka/log/TransactionIndex.scala | 4 ++-- .../main/scala/kafka/raft/KafkaMetadataLog.scala | 2 +- core/src/test/scala/unit/kafka/log/LogTest.scala | 4 ++-- .../scala/unit/kafka/log/TransactionIndexTest.scala | 2 +- 9 files changed, 23 insertions(+), 28 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 07918fa1adb17..225de6ac0116d 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -200,12 +200,7 @@ public int append(MemoryRecords records) throws IOException { public void flush() throws IOException { channel.force(true); if (needFlushParentDir.getAndSet(false)) { - try { - Utils.flushParentDir(file.toPath()); - } catch (Exception e) { - needFlushParentDir.set(true); - throw e; - } + Utils.flushParentDir(file.toPath()); } } @@ -213,8 +208,8 @@ public void flush() throws IOException { * Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing. */ public void flushParentDir() throws IOException { - Utils.flushParentDir(file.toPath()); needFlushParentDir.set(false); + Utils.flushParentDir(file.toPath()); } /** @@ -263,9 +258,9 @@ public void updateParentDir(File parentDir) { * Rename the file that backs this message set * @throws IOException if rename fails. */ - public void renameTo(File f) throws IOException { + public void renameTo(File f, boolean needFlushParentDir) throws IOException { try { - Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false); + Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), needFlushParentDir); } finally { this.file = f; } diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index 31b9f6d8dd71e..8dc4a499cf3a9 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -206,8 +206,8 @@ abstract class AbstractIndex(@volatile private var _file: File, val baseOffset: * * @throws IOException if rename fails */ - def renameTo(f: File): Unit = { - try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false) + def renameTo(f: File, needFlushParentDir: Boolean): Unit = { + try Utils.atomicMoveWithFallback(file.toPath, f.toPath, needFlushParentDir) finally _file = f } diff --git a/core/src/main/scala/kafka/log/LazyIndex.scala b/core/src/main/scala/kafka/log/LazyIndex.scala index 5ef18931a84cf..da618542533b9 100644 --- a/core/src/main/scala/kafka/log/LazyIndex.scala +++ b/core/src/main/scala/kafka/log/LazyIndex.scala @@ -74,9 +74,9 @@ class LazyIndex[T <: AbstractIndex] private (@volatile private var indexWrapper: } } - def renameTo(f: File): Unit = { + def renameTo(f: File, needFlushParentDir: Boolean): Unit = { inLock(lock) { - indexWrapper.renameTo(f) + indexWrapper.renameTo(f, needFlushParentDir) } } @@ -114,7 +114,7 @@ object LazyIndex { def updateParentDir(f: File): Unit - def renameTo(f: File): Unit + def renameTo(f: File, needFlushParentDir: Boolean): Unit def deleteIfExists(): Boolean @@ -130,8 +130,8 @@ object LazyIndex { def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName) - def renameTo(f: File): Unit = { - try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false) + def renameTo(f: File, needFlushParentDir: Boolean): Unit = { + try Utils.atomicMoveWithFallback(file.toPath, f.toPath, needFlushParentDir) catch { case _: NoSuchFileException if !file.exists => () } @@ -152,7 +152,7 @@ object LazyIndex { def updateParentDir(parentDir: File): Unit = index.updateParentDir(parentDir) - def renameTo(f: File): Unit = index.renameTo(f) + def renameTo(f: File, needFlushParentDir: Boolean): Unit = index.renameTo(f, needFlushParentDir) def deleteIfExists(): Boolean = index.deleteIfExists() diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 8077790a3f13a..e8fb9ba8d75f1 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -2342,7 +2342,7 @@ class Log(@volatile private var _dir: File, * @throws IOException if the file can't be renamed and still exists */ private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = { - segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix)) + segments.foreach(_.changeFileSuffixes("", Log.DeletedFileSuffix, false)) def deleteSegments(): Unit = { info(s"Deleting segment files ${segments.mkString(",")}") diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 298a0abc2487c..9953fddb99a6e 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -491,10 +491,10 @@ class LogSegment private[log] (val log: FileRecords, * IOException from this method should be handled by the caller */ def changeFileSuffixes(oldSuffix: String, newSuffix: String, needFlushParentDir: Boolean = true): Unit = { - log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) - lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix))) - lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix))) - txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix))) + log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)), false) + lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix)), false) + lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix)), false) + txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)), false) // We only need to flush the parent of the log file because all other files share the same parent if (needFlushParentDir) log.flushParentDir() diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala index ca3d1bb31012d..7dfafda2005ec 100644 --- a/core/src/main/scala/kafka/log/TransactionIndex.scala +++ b/core/src/main/scala/kafka/log/TransactionIndex.scala @@ -106,10 +106,10 @@ class TransactionIndex(val startOffset: Long, @volatile private var _file: File) maybeChannel = None } - def renameTo(f: File): Unit = { + def renameTo(f: File, needFlushParentDir: Boolean): Unit = { try { if (file.exists) - Utils.atomicMoveWithFallback(file.toPath, f.toPath, false) + Utils.atomicMoveWithFallback(file.toPath, f.toPath, needFlushParentDir) } finally _file = f } diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala index 95de4fe3923b7..279de3c3e93d3 100644 --- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala +++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala @@ -300,7 +300,7 @@ final class KafkaMetadataLog private ( val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId) val destination = Snapshots.deleteRename(path, snapshotId) try { - Utils.atomicMoveWithFallback(path, destination) + Utils.atomicMoveWithFallback(path, destination, false) } catch { case e: IOException => error(s"Error renaming snapshot file: $path to $destination", e) diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 700e87a23b78a..220fb58e74476 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -3022,7 +3022,7 @@ class LogTest { append(10) // Kind of a hack, but renaming the index to a directory ensures that the append // to the index will fail. - log.activeSegment.txnIndex.renameTo(log.dir) + log.activeSegment.txnIndex.renameTo(log.dir, false) assertThrows(classOf[KafkaStorageException], () => appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)) assertThrows(classOf[KafkaStorageException], () => log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0)) assertThrows(classOf[KafkaStorageException], () => readLog(log, 0, 4096).records.records.iterator.next().offset) @@ -4587,7 +4587,7 @@ class LogTest { // Kind of a hack, but renaming the index to a directory ensures that the append // to the index will fail. - log.activeSegment.txnIndex.renameTo(log.dir) + log.activeSegment.txnIndex.renameTo(log.dir, false) // The append will be written to the log successfully, but the write to the index will fail assertThrows(classOf[KafkaStorageException], diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala index 790bcd88a9a09..f4eea7eb32e8a 100644 --- a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala @@ -159,7 +159,7 @@ class TransactionIndexTest { val renamed = TestUtils.tempFile() index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2)) - index.renameTo(renamed) + index.renameTo(renamed, false) index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16)) val abortedTxns = index.collectAbortedTxns(0L, 100L).abortedTransactions From e653af4dc06b6c16b1fb4c2ce2aa4f412dcb2f1f Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 1 Apr 2021 12:37:57 -0700 Subject: [PATCH 17/25] check hadCleanShutdown for open FileRecord --- .../org/apache/kafka/common/record/FileRecords.java | 11 ++++++----- .../apache/kafka/common/record/FileRecordsTest.java | 10 +++++----- .../common/record/LazyDownConversionRecordsTest.java | 2 +- core/src/main/scala/kafka/log/Log.scala | 11 ++++++----- core/src/main/scala/kafka/log/LogSegment.scala | 4 ++-- .../apache/kafka/snapshot/FileRawSnapshotReader.java | 3 ++- 6 files changed, 22 insertions(+), 19 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 225de6ac0116d..ea343ac1cc1af 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -439,21 +439,22 @@ public static FileRecords open(File file, boolean mutable, boolean fileAlreadyExists, int initFileSize, - boolean preallocate) throws IOException { + boolean preallocate, + boolean hadCleanShutdown) throws IOException { FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate); int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE; - return new FileRecords(file, channel, 0, end, false, mutable); + return new FileRecords(file, channel, 0, end, false, mutable && !hadCleanShutdown); } public static FileRecords open(File file, boolean fileAlreadyExists, int initFileSize, - boolean preallocate) throws IOException { - return open(file, true, fileAlreadyExists, initFileSize, preallocate); + boolean preallocate, boolean hadCleanShutdown) throws IOException { + return open(file, true, fileAlreadyExists, initFileSize, preallocate, hadCleanShutdown); } public static FileRecords open(File file, boolean mutable) throws IOException { - return open(file, mutable, false, 0, false); + return open(file, mutable, false, 0, false, true); } public static FileRecords open(File file) throws IOException { diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 705d2a1c73d49..ac2e6218cc53d 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -366,7 +366,7 @@ public void testTruncateIfSizeIsDifferentToTargetSize() throws IOException { @Test public void testPreallocateTrue() throws IOException { File temp = tempFile(); - FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true); + FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true, true); long position = fileRecords.channel().position(); int size = fileRecords.sizeInBytes(); assertEquals(0, position); @@ -380,7 +380,7 @@ public void testPreallocateTrue() throws IOException { @Test public void testPreallocateFalse() throws IOException { File temp = tempFile(); - FileRecords set = FileRecords.open(temp, false, 1024 * 1024, false); + FileRecords set = FileRecords.open(temp, false, 1024 * 1024, false, true); long position = set.channel().position(); int size = set.sizeInBytes(); assertEquals(0, position); @@ -394,7 +394,7 @@ public void testPreallocateFalse() throws IOException { @Test public void testPreallocateClearShutdown() throws IOException { File temp = tempFile(); - FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true); + FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true, true); append(fileRecords, values); int oldPosition = (int) fileRecords.channel().position(); @@ -404,7 +404,7 @@ public void testPreallocateClearShutdown() throws IOException { fileRecords.close(); File tempReopen = new File(temp.getAbsolutePath()); - FileRecords setReopen = FileRecords.open(tempReopen, true, 1024 * 1024, true); + FileRecords setReopen = FileRecords.open(tempReopen, true, 1024 * 1024, true, true); int position = (int) setReopen.channel().position(); int size = setReopen.sizeInBytes(); @@ -439,7 +439,7 @@ public void testSearchForTimestamp() throws IOException { private void testSearchForTimestamp(RecordVersion version) throws IOException { File temp = tempFile(); - FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true); + FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true, true); appendWithOffsetAndTimestamp(fileRecords, version, 10L, 5, 0); appendWithOffsetAndTimestamp(fileRecords, version, 11L, 6, 1); diff --git a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java index ebc29829cbadf..a0e1acd2d692c 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java @@ -153,7 +153,7 @@ private static MemoryRecords convertRecords(MemoryRecords recordsToConvert, byte try (TransferableChannel channel = toTransferableChannel(FileChannel.open(outputFile.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE))) { int written = 0; while (written < bytesToConvert) written += lazySend.writeTo(channel, written, bytesToConvert - written); - try (FileRecords convertedRecords = FileRecords.open(outputFile, true, written, false)) { + try (FileRecords convertedRecords = FileRecords.open(outputFile, true, written, false, true)) { convertedRecordsBuffer = ByteBuffer.allocate(convertedRecords.sizeInBytes()); convertedRecords.readInto(convertedRecordsBuffer, 0); } diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 9083ca70ec484..56b268000644b 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -320,7 +320,7 @@ class Log(@volatile private var _dir: File, initializeLeaderEpochCache() initializePartitionMetadata() - val nextOffset = loadSegments() + val nextOffset = loadSegments(hadCleanShutdown) /* Calculate the offset of the next message */ nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size) @@ -669,7 +669,7 @@ class Log(@volatile private var _dir: File, * caller is responsible for closing them appropriately, if needed. * @throws LogSegmentOffsetOverflowException if the log directory contains a segment with messages that overflow the index offset */ - private def loadSegmentFiles(): Unit = { + private def loadSegmentFiles(hadCleanShutdown: Boolean): Unit = { // load segments in ascending order because transactional data from one segment may depend on the // segments that come before it for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) { @@ -689,7 +689,8 @@ class Log(@volatile private var _dir: File, baseOffset = baseOffset, config, time = time, - fileAlreadyExists = true) + fileAlreadyExists = true, + hadCleanShutdown = hadCleanShutdown) try segment.sanityCheck(timeIndexFileNewlyCreated) catch { @@ -769,7 +770,7 @@ class Log(@volatile private var _dir: File, * @throws LogSegmentOffsetOverflowException if we encounter a .swap file with messages that overflow index offset; or when * we find an unexpected number of .log files with overflow */ - private def loadSegments(): Long = { + private def loadSegments(hadCleanShutdown: Boolean): Long = { // first do a pass through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles() @@ -783,7 +784,7 @@ class Log(@volatile private var _dir: File, // call to loadSegmentFiles(). segments.close() segments.clear() - loadSegmentFiles() + loadSegmentFiles(hadCleanShutdown) } // Finally, complete any interrupted swap operations. To be crash-safe, diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 9953fddb99a6e..c62f595405335 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -660,10 +660,10 @@ class LogSegment private[log] (val log: FileRecords, object LogSegment { def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false, - initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = ""): LogSegment = { + initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = "", hadCleanShutdown: Boolean = true): LogSegment = { val maxIndexSize = config.maxIndexSize new LogSegment( - FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate), + FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate, hadCleanShutdown), LazyIndex.forOffset(Log.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize), LazyIndex.forTime(Log.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize), new TransactionIndex(baseOffset, Log.transactionIndexFile(dir, baseOffset, fileSuffix)), diff --git a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java index d0218c79cc427..329bbab1949c9 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java @@ -73,7 +73,8 @@ public static FileRawSnapshotReader open(Path logDir, OffsetAndEpoch snapshotId) false, // mutable true, // fileAlreadyExists 0, // initFileSize - false // preallocate + false, // preallocate + true // hadCleanShutdown ); return new FileRawSnapshotReader(fileRecords, snapshotId); From 85861ee433ce60c40d8f422c2227e9df35cd07ee Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 1 Apr 2021 15:36:16 -0700 Subject: [PATCH 18/25] address comments --- .../kafka/common/record/FileRecords.java | 23 ++++++++++--------- core/src/main/scala/kafka/log/Log.scala | 10 ++++---- .../src/main/scala/kafka/log/LogSegment.scala | 5 ++-- 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index ea343ac1cc1af..ca6034b07eaa4 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -51,7 +51,7 @@ public class FileRecords extends AbstractRecords implements Closeable { private final AtomicInteger size; private final FileChannel channel; private volatile File file; - private final AtomicBoolean needFlushParentDir; + private final AtomicBoolean needsFlushParentDir; /** * The {@code FileRecords.open} methods should be used instead of this constructor whenever possible. @@ -62,14 +62,14 @@ public class FileRecords extends AbstractRecords implements Closeable { int start, int end, boolean isSlice, - boolean needFlushParentDir) throws IOException { + boolean needsFlushParentDir) throws IOException { this.file = file; this.channel = channel; this.start = start; this.end = end; this.isSlice = isSlice; this.size = new AtomicInteger(); - this.needFlushParentDir = new AtomicBoolean(needFlushParentDir); + this.needsFlushParentDir = new AtomicBoolean(needsFlushParentDir); if (isSlice) { // don't check the file size if this is just a slice view @@ -199,7 +199,7 @@ public int append(MemoryRecords records) throws IOException { */ public void flush() throws IOException { channel.force(true); - if (needFlushParentDir.getAndSet(false)) { + if (needsFlushParentDir.getAndSet(false)) { Utils.flushParentDir(file.toPath()); } } @@ -208,7 +208,7 @@ public void flush() throws IOException { * Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing. */ public void flushParentDir() throws IOException { - needFlushParentDir.set(false); + needsFlushParentDir.set(false); Utils.flushParentDir(file.toPath()); } @@ -258,9 +258,9 @@ public void updateParentDir(File parentDir) { * Rename the file that backs this message set * @throws IOException if rename fails. */ - public void renameTo(File f, boolean needFlushParentDir) throws IOException { + public void renameTo(File f, boolean needsFlushParentDir) throws IOException { try { - Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), needFlushParentDir); + Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), needsFlushParentDir); } finally { this.file = f; } @@ -440,17 +440,18 @@ public static FileRecords open(File file, boolean fileAlreadyExists, int initFileSize, boolean preallocate, - boolean hadCleanShutdown) throws IOException { + boolean needsRecovery) throws IOException { FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate); int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE; - return new FileRecords(file, channel, 0, end, false, mutable && !hadCleanShutdown); + return new FileRecords(file, channel, 0, end, false, mutable && needsRecovery); } public static FileRecords open(File file, boolean fileAlreadyExists, int initFileSize, - boolean preallocate, boolean hadCleanShutdown) throws IOException { - return open(file, true, fileAlreadyExists, initFileSize, preallocate, hadCleanShutdown); + boolean preallocate, + boolean needsRecovery) throws IOException { + return open(file, true, fileAlreadyExists, initFileSize, preallocate, needsRecovery); } public static FileRecords open(File file, boolean mutable) throws IOException { diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala index 56b268000644b..edd213628dc7d 100644 --- a/core/src/main/scala/kafka/log/Log.scala +++ b/core/src/main/scala/kafka/log/Log.scala @@ -320,7 +320,7 @@ class Log(@volatile private var _dir: File, initializeLeaderEpochCache() initializePartitionMetadata() - val nextOffset = loadSegments(hadCleanShutdown) + val nextOffset = loadSegments() /* Calculate the offset of the next message */ nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size) @@ -669,7 +669,7 @@ class Log(@volatile private var _dir: File, * caller is responsible for closing them appropriately, if needed. * @throws LogSegmentOffsetOverflowException if the log directory contains a segment with messages that overflow the index offset */ - private def loadSegmentFiles(hadCleanShutdown: Boolean): Unit = { + private def loadSegmentFiles(): Unit = { // load segments in ascending order because transactional data from one segment may depend on the // segments that come before it for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) { @@ -690,7 +690,7 @@ class Log(@volatile private var _dir: File, config, time = time, fileAlreadyExists = true, - hadCleanShutdown = hadCleanShutdown) + needsRecovery = !hadCleanShutdown) try segment.sanityCheck(timeIndexFileNewlyCreated) catch { @@ -770,7 +770,7 @@ class Log(@volatile private var _dir: File, * @throws LogSegmentOffsetOverflowException if we encounter a .swap file with messages that overflow index offset; or when * we find an unexpected number of .log files with overflow */ - private def loadSegments(hadCleanShutdown: Boolean): Long = { + private def loadSegments(): Long = { // first do a pass through the files in the log directory and remove any temporary files // and find any interrupted swap operations val swapFiles = removeTempFilesAndCollectSwapFiles() @@ -784,7 +784,7 @@ class Log(@volatile private var _dir: File, // call to loadSegmentFiles(). segments.close() segments.clear() - loadSegmentFiles(hadCleanShutdown) + loadSegmentFiles() } // Finally, complete any interrupted swap operations. To be crash-safe, diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index c62f595405335..1872b76a77ba8 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -660,10 +660,11 @@ class LogSegment private[log] (val log: FileRecords, object LogSegment { def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false, - initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = "", hadCleanShutdown: Boolean = true): LogSegment = { + initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = "", + needsRecovery: Boolean = true): LogSegment = { val maxIndexSize = config.maxIndexSize new LogSegment( - FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate, hadCleanShutdown), + FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate, needsRecovery), LazyIndex.forOffset(Log.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize), LazyIndex.forTime(Log.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize), new TransactionIndex(baseOffset, Log.transactionIndexFile(dir, baseOffset, fileSuffix)), From 15786783a84144af08f9e44d021178d015f0a99f Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 1 Apr 2021 15:41:18 -0700 Subject: [PATCH 19/25] fix default values --- .../org/apache/kafka/common/record/FileRecords.java | 2 +- .../apache/kafka/common/record/FileRecordsTest.java | 10 +++++----- .../common/record/LazyDownConversionRecordsTest.java | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index ca6034b07eaa4..5c3ec433382a6 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -455,7 +455,7 @@ public static FileRecords open(File file, } public static FileRecords open(File file, boolean mutable) throws IOException { - return open(file, mutable, false, 0, false, true); + return open(file, mutable, false, 0, false, false); } public static FileRecords open(File file) throws IOException { diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index ac2e6218cc53d..643a86aff9a36 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -366,7 +366,7 @@ public void testTruncateIfSizeIsDifferentToTargetSize() throws IOException { @Test public void testPreallocateTrue() throws IOException { File temp = tempFile(); - FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true, true); + FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true, false); long position = fileRecords.channel().position(); int size = fileRecords.sizeInBytes(); assertEquals(0, position); @@ -380,7 +380,7 @@ public void testPreallocateTrue() throws IOException { @Test public void testPreallocateFalse() throws IOException { File temp = tempFile(); - FileRecords set = FileRecords.open(temp, false, 1024 * 1024, false, true); + FileRecords set = FileRecords.open(temp, false, 1024 * 1024, false, false); long position = set.channel().position(); int size = set.sizeInBytes(); assertEquals(0, position); @@ -394,7 +394,7 @@ public void testPreallocateFalse() throws IOException { @Test public void testPreallocateClearShutdown() throws IOException { File temp = tempFile(); - FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true, true); + FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true, false); append(fileRecords, values); int oldPosition = (int) fileRecords.channel().position(); @@ -404,7 +404,7 @@ public void testPreallocateClearShutdown() throws IOException { fileRecords.close(); File tempReopen = new File(temp.getAbsolutePath()); - FileRecords setReopen = FileRecords.open(tempReopen, true, 1024 * 1024, true, true); + FileRecords setReopen = FileRecords.open(tempReopen, true, 1024 * 1024, true, false); int position = (int) setReopen.channel().position(); int size = setReopen.sizeInBytes(); @@ -439,7 +439,7 @@ public void testSearchForTimestamp() throws IOException { private void testSearchForTimestamp(RecordVersion version) throws IOException { File temp = tempFile(); - FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true, true); + FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true, false); appendWithOffsetAndTimestamp(fileRecords, version, 10L, 5, 0); appendWithOffsetAndTimestamp(fileRecords, version, 11L, 6, 1); diff --git a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java index a0e1acd2d692c..953c676a6193d 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java @@ -153,7 +153,7 @@ private static MemoryRecords convertRecords(MemoryRecords recordsToConvert, byte try (TransferableChannel channel = toTransferableChannel(FileChannel.open(outputFile.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE))) { int written = 0; while (written < bytesToConvert) written += lazySend.writeTo(channel, written, bytesToConvert - written); - try (FileRecords convertedRecords = FileRecords.open(outputFile, true, written, false, true)) { + try (FileRecords convertedRecords = FileRecords.open(outputFile, true, written, false, false)) { convertedRecordsBuffer = ByteBuffer.allocate(convertedRecords.sizeInBytes()); convertedRecords.readInto(convertedRecordsBuffer, 0); } From fffc3535370d5902c14baac123fdb6749b801e13 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 1 Apr 2021 15:49:26 -0700 Subject: [PATCH 20/25] more default value --- .../java/org/apache/kafka/snapshot/FileRawSnapshotReader.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java index 329bbab1949c9..a45af5ebfc844 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java @@ -74,7 +74,7 @@ public static FileRawSnapshotReader open(Path logDir, OffsetAndEpoch snapshotId) true, // fileAlreadyExists 0, // initFileSize false, // preallocate - true // hadCleanShutdown + false // needsRecovery ); return new FileRawSnapshotReader(fileRecords, snapshotId); From f66c545aa3d12ca06176bdcc3deff3b2f08b600b Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 1 Apr 2021 17:50:01 -0700 Subject: [PATCH 21/25] do flush in the LogSegment class --- .../kafka/common/record/FileRecords.java | 25 ++++++------------- .../kafka/common/record/FileRecordsTest.java | 10 ++++---- .../main/scala/kafka/log/AbstractIndex.scala | 4 +-- core/src/main/scala/kafka/log/LazyIndex.scala | 12 ++++----- .../src/main/scala/kafka/log/LogSegment.scala | 25 +++++++++++++------ .../scala/kafka/log/TransactionIndex.scala | 4 +-- .../test/scala/unit/kafka/log/LogTest.scala | 4 +-- .../unit/kafka/log/TransactionIndexTest.scala | 2 +- .../kafka/snapshot/FileRawSnapshotReader.java | 3 +-- 9 files changed, 44 insertions(+), 45 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 5c3ec433382a6..462804955a522 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -33,7 +33,6 @@ import java.nio.file.StandardOpenOption; import java.util.Objects; import java.util.Optional; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** @@ -51,7 +50,6 @@ public class FileRecords extends AbstractRecords implements Closeable { private final AtomicInteger size; private final FileChannel channel; private volatile File file; - private final AtomicBoolean needsFlushParentDir; /** * The {@code FileRecords.open} methods should be used instead of this constructor whenever possible. @@ -61,15 +59,13 @@ public class FileRecords extends AbstractRecords implements Closeable { FileChannel channel, int start, int end, - boolean isSlice, - boolean needsFlushParentDir) throws IOException { + boolean isSlice) throws IOException { this.file = file; this.channel = channel; this.start = start; this.end = end; this.isSlice = isSlice; this.size = new AtomicInteger(); - this.needsFlushParentDir = new AtomicBoolean(needsFlushParentDir); if (isSlice) { // don't check the file size if this is just a slice view @@ -140,7 +136,7 @@ public void readInto(ByteBuffer buffer, int position) throws IOException { public FileRecords slice(int position, int size) throws IOException { int availableBytes = availableBytes(position, size); int startPosition = this.start + position; - return new FileRecords(file, channel, startPosition, startPosition + availableBytes, true, false); + return new FileRecords(file, channel, startPosition, startPosition + availableBytes, true); } /** @@ -199,16 +195,12 @@ public int append(MemoryRecords records) throws IOException { */ public void flush() throws IOException { channel.force(true); - if (needsFlushParentDir.getAndSet(false)) { - Utils.flushParentDir(file.toPath()); - } } /** * Flush the parent directory of a file to the physical disk, which makes sure the file is accessible after crashing. */ public void flushParentDir() throws IOException { - needsFlushParentDir.set(false); Utils.flushParentDir(file.toPath()); } @@ -258,9 +250,9 @@ public void updateParentDir(File parentDir) { * Rename the file that backs this message set * @throws IOException if rename fails. */ - public void renameTo(File f, boolean needsFlushParentDir) throws IOException { + public void renameTo(File f) throws IOException { try { - Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), needsFlushParentDir); + Utils.atomicMoveWithFallback(file.toPath(), f.toPath(), false); } finally { this.file = f; } @@ -439,11 +431,10 @@ public static FileRecords open(File file, boolean mutable, boolean fileAlreadyExists, int initFileSize, - boolean preallocate, - boolean needsRecovery) throws IOException { + boolean preallocate) throws IOException { FileChannel channel = openChannel(file, mutable, fileAlreadyExists, initFileSize, preallocate); int end = (!fileAlreadyExists && preallocate) ? 0 : Integer.MAX_VALUE; - return new FileRecords(file, channel, 0, end, false, mutable && needsRecovery); + return new FileRecords(file, channel, 0, end, false); } public static FileRecords open(File file, @@ -451,11 +442,11 @@ public static FileRecords open(File file, int initFileSize, boolean preallocate, boolean needsRecovery) throws IOException { - return open(file, true, fileAlreadyExists, initFileSize, preallocate, needsRecovery); + return open(file, true, fileAlreadyExists, initFileSize, preallocate); } public static FileRecords open(File file, boolean mutable) throws IOException { - return open(file, mutable, false, 0, false, false); + return open(file, mutable, false, 0, false); } public static FileRecords open(File file) throws IOException { diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 643a86aff9a36..0629fa762e990 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -90,7 +90,7 @@ public void testAppendProtectsFromOverflow() throws Exception { FileChannel fileChannelMock = mock(FileChannel.class); when(fileChannelMock.size()).thenReturn((long) Integer.MAX_VALUE); - FileRecords records = new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false, false); + FileRecords records = new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false); assertThrows(IllegalArgumentException.class, () -> append(records, values)); } @@ -100,7 +100,7 @@ public void testOpenOversizeFile() throws Exception { FileChannel fileChannelMock = mock(FileChannel.class); when(fileChannelMock.size()).thenReturn(Integer.MAX_VALUE + 5L); - assertThrows(KafkaException.class, () -> new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false, false)); + assertThrows(KafkaException.class, () -> new FileRecords(fileMock, fileChannelMock, 0, Integer.MAX_VALUE, false)); } @Test @@ -314,7 +314,7 @@ public void testTruncateNotCalledIfSizeIsSameAsTargetSize() throws IOException { when(channelMock.size()).thenReturn(42L); when(channelMock.position(42L)).thenReturn(null); - FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false, false); + FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); fileRecords.truncateTo(42); verify(channelMock, atLeastOnce()).size(); @@ -331,7 +331,7 @@ public void testTruncateNotCalledIfSizeIsBiggerThanTargetSize() throws IOExcepti when(channelMock.size()).thenReturn(42L); - FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false, false); + FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); try { fileRecords.truncateTo(43); @@ -353,7 +353,7 @@ public void testTruncateIfSizeIsDifferentToTargetSize() throws IOException { when(channelMock.size()).thenReturn(42L); when(channelMock.truncate(anyLong())).thenReturn(channelMock); - FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false, false); + FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false); fileRecords.truncateTo(23); verify(channelMock, atLeastOnce()).size(); diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala index 8dc4a499cf3a9..31b9f6d8dd71e 100644 --- a/core/src/main/scala/kafka/log/AbstractIndex.scala +++ b/core/src/main/scala/kafka/log/AbstractIndex.scala @@ -206,8 +206,8 @@ abstract class AbstractIndex(@volatile private var _file: File, val baseOffset: * * @throws IOException if rename fails */ - def renameTo(f: File, needFlushParentDir: Boolean): Unit = { - try Utils.atomicMoveWithFallback(file.toPath, f.toPath, needFlushParentDir) + def renameTo(f: File): Unit = { + try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false) finally _file = f } diff --git a/core/src/main/scala/kafka/log/LazyIndex.scala b/core/src/main/scala/kafka/log/LazyIndex.scala index da618542533b9..5ef18931a84cf 100644 --- a/core/src/main/scala/kafka/log/LazyIndex.scala +++ b/core/src/main/scala/kafka/log/LazyIndex.scala @@ -74,9 +74,9 @@ class LazyIndex[T <: AbstractIndex] private (@volatile private var indexWrapper: } } - def renameTo(f: File, needFlushParentDir: Boolean): Unit = { + def renameTo(f: File): Unit = { inLock(lock) { - indexWrapper.renameTo(f, needFlushParentDir) + indexWrapper.renameTo(f) } } @@ -114,7 +114,7 @@ object LazyIndex { def updateParentDir(f: File): Unit - def renameTo(f: File, needFlushParentDir: Boolean): Unit + def renameTo(f: File): Unit def deleteIfExists(): Boolean @@ -130,8 +130,8 @@ object LazyIndex { def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName) - def renameTo(f: File, needFlushParentDir: Boolean): Unit = { - try Utils.atomicMoveWithFallback(file.toPath, f.toPath, needFlushParentDir) + def renameTo(f: File): Unit = { + try Utils.atomicMoveWithFallback(file.toPath, f.toPath, false) catch { case _: NoSuchFileException if !file.exists => () } @@ -152,7 +152,7 @@ object LazyIndex { def updateParentDir(parentDir: File): Unit = index.updateParentDir(parentDir) - def renameTo(f: File, needFlushParentDir: Boolean): Unit = index.renameTo(f, needFlushParentDir) + def renameTo(f: File): Unit = index.renameTo(f) def deleteIfExists(): Boolean = index.deleteIfExists() diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 1872b76a77ba8..05ba4aa39c09d 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -20,6 +20,7 @@ import java.io.{File, IOException} import java.nio.file.{Files, NoSuchFileException} import java.nio.file.attribute.FileTime import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean import kafka.common.LogSegmentOffsetOverflowException import kafka.metrics.{KafkaMetricsGroup, KafkaTimer} import kafka.server.epoch.LeaderEpochFileCache @@ -59,7 +60,8 @@ class LogSegment private[log] (val log: FileRecords, val baseOffset: Long, val indexIntervalBytes: Int, val rollJitterMs: Long, - val time: Time) extends Logging { + val time: Time, + val needsFlushParentDir: Boolean = false) extends Logging { def offsetIndex: OffsetIndex = lazyOffsetIndex.get @@ -95,6 +97,9 @@ class LogSegment private[log] (val log: FileRecords, /* the number of bytes since we last added an entry in the offset index */ private var bytesSinceLastIndexEntry = 0 + /* whether or not we need to flush the parent dir during flush */ + private val atomicNeedsFlushParentDir = new AtomicBoolean(needsFlushParentDir) + // The timestamp we used for time based log rolling and for ensuring max compaction delay // volatile for LogCleaner to see the update @volatile private var rollingBasedTimestamp: Option[Long] = None @@ -472,6 +477,9 @@ class LogSegment private[log] (val log: FileRecords, offsetIndex.flush() timeIndex.flush() txnIndex.flush() + // We only need to flush the parent of the log file because all other files share the same parent + if (atomicNeedsFlushParentDir.getAndSet(false)) + log.flushParentDir() } } @@ -490,13 +498,13 @@ class LogSegment private[log] (val log: FileRecords, * Change the suffix for the index and log files for this log segment * IOException from this method should be handled by the caller */ - def changeFileSuffixes(oldSuffix: String, newSuffix: String, needFlushParentDir: Boolean = true): Unit = { - log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)), false) - lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix)), false) - lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix)), false) - txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)), false) + def changeFileSuffixes(oldSuffix: String, newSuffix: String, needsFlushParentDir: Boolean = true): Unit = { + log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix))) + lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix))) + lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix))) + txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix))) // We only need to flush the parent of the log file because all other files share the same parent - if (needFlushParentDir) + if (needsFlushParentDir) log.flushParentDir() } @@ -671,7 +679,8 @@ object LogSegment { baseOffset, indexIntervalBytes = config.indexInterval, rollJitterMs = config.randomSegmentJitter, - time) + time, + needsRecovery || !fileAlreadyExists) } def deleteIfExists(dir: File, baseOffset: Long, fileSuffix: String = ""): Unit = { diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala index 7dfafda2005ec..ca3d1bb31012d 100644 --- a/core/src/main/scala/kafka/log/TransactionIndex.scala +++ b/core/src/main/scala/kafka/log/TransactionIndex.scala @@ -106,10 +106,10 @@ class TransactionIndex(val startOffset: Long, @volatile private var _file: File) maybeChannel = None } - def renameTo(f: File, needFlushParentDir: Boolean): Unit = { + def renameTo(f: File): Unit = { try { if (file.exists) - Utils.atomicMoveWithFallback(file.toPath, f.toPath, needFlushParentDir) + Utils.atomicMoveWithFallback(file.toPath, f.toPath, false) } finally _file = f } diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala index 220fb58e74476..700e87a23b78a 100755 --- a/core/src/test/scala/unit/kafka/log/LogTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogTest.scala @@ -3022,7 +3022,7 @@ class LogTest { append(10) // Kind of a hack, but renaming the index to a directory ensures that the append // to the index will fail. - log.activeSegment.txnIndex.renameTo(log.dir, false) + log.activeSegment.txnIndex.renameTo(log.dir) assertThrows(classOf[KafkaStorageException], () => appendEndTxnMarkerAsLeader(log, pid, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)) assertThrows(classOf[KafkaStorageException], () => log.appendAsLeader(TestUtils.singletonRecords(value = null), leaderEpoch = 0)) assertThrows(classOf[KafkaStorageException], () => readLog(log, 0, 4096).records.records.iterator.next().offset) @@ -4587,7 +4587,7 @@ class LogTest { // Kind of a hack, but renaming the index to a directory ensures that the append // to the index will fail. - log.activeSegment.txnIndex.renameTo(log.dir, false) + log.activeSegment.txnIndex.renameTo(log.dir) // The append will be written to the log successfully, but the write to the index will fail assertThrows(classOf[KafkaStorageException], diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala index f4eea7eb32e8a..790bcd88a9a09 100644 --- a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala +++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala @@ -159,7 +159,7 @@ class TransactionIndexTest { val renamed = TestUtils.tempFile() index.append(new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2)) - index.renameTo(renamed, false) + index.renameTo(renamed) index.append(new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16)) val abortedTxns = index.collectAbortedTxns(0L, 100L).abortedTransactions diff --git a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java index a45af5ebfc844..d0218c79cc427 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/FileRawSnapshotReader.java @@ -73,8 +73,7 @@ public static FileRawSnapshotReader open(Path logDir, OffsetAndEpoch snapshotId) false, // mutable true, // fileAlreadyExists 0, // initFileSize - false, // preallocate - false // needsRecovery + false // preallocate ); return new FileRawSnapshotReader(fileRecords, snapshotId); From 1ecf94bfd8b9efba422520cd127965f1bac787f1 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 1 Apr 2021 18:18:07 -0700 Subject: [PATCH 22/25] remove parameter from FileRecord.open --- .../org/apache/kafka/common/record/FileRecords.java | 3 +-- .../apache/kafka/common/record/FileRecordsTest.java | 10 +++++----- .../common/record/LazyDownConversionRecordsTest.java | 2 +- core/src/main/scala/kafka/log/LogSegment.scala | 4 ++-- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 462804955a522..72d2d6183ffd3 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -440,8 +440,7 @@ public static FileRecords open(File file, public static FileRecords open(File file, boolean fileAlreadyExists, int initFileSize, - boolean preallocate, - boolean needsRecovery) throws IOException { + boolean preallocate) throws IOException { return open(file, true, fileAlreadyExists, initFileSize, preallocate); } diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 0629fa762e990..c32359b4467c7 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -366,7 +366,7 @@ public void testTruncateIfSizeIsDifferentToTargetSize() throws IOException { @Test public void testPreallocateTrue() throws IOException { File temp = tempFile(); - FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true, false); + FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true); long position = fileRecords.channel().position(); int size = fileRecords.sizeInBytes(); assertEquals(0, position); @@ -380,7 +380,7 @@ public void testPreallocateTrue() throws IOException { @Test public void testPreallocateFalse() throws IOException { File temp = tempFile(); - FileRecords set = FileRecords.open(temp, false, 1024 * 1024, false, false); + FileRecords set = FileRecords.open(temp, false, 1024 * 1024, false); long position = set.channel().position(); int size = set.sizeInBytes(); assertEquals(0, position); @@ -394,7 +394,7 @@ public void testPreallocateFalse() throws IOException { @Test public void testPreallocateClearShutdown() throws IOException { File temp = tempFile(); - FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true, false); + FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true); append(fileRecords, values); int oldPosition = (int) fileRecords.channel().position(); @@ -404,7 +404,7 @@ public void testPreallocateClearShutdown() throws IOException { fileRecords.close(); File tempReopen = new File(temp.getAbsolutePath()); - FileRecords setReopen = FileRecords.open(tempReopen, true, 1024 * 1024, true, false); + FileRecords setReopen = FileRecords.open(tempReopen, true, 1024 * 1024, true); int position = (int) setReopen.channel().position(); int size = setReopen.sizeInBytes(); @@ -439,7 +439,7 @@ public void testSearchForTimestamp() throws IOException { private void testSearchForTimestamp(RecordVersion version) throws IOException { File temp = tempFile(); - FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true, false); + FileRecords fileRecords = FileRecords.open(temp, false, 1024 * 1024, true); appendWithOffsetAndTimestamp(fileRecords, version, 10L, 5, 0); appendWithOffsetAndTimestamp(fileRecords, version, 11L, 6, 1); diff --git a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java index 953c676a6193d..ebc29829cbadf 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/LazyDownConversionRecordsTest.java @@ -153,7 +153,7 @@ private static MemoryRecords convertRecords(MemoryRecords recordsToConvert, byte try (TransferableChannel channel = toTransferableChannel(FileChannel.open(outputFile.toPath(), StandardOpenOption.READ, StandardOpenOption.WRITE))) { int written = 0; while (written < bytesToConvert) written += lazySend.writeTo(channel, written, bytesToConvert - written); - try (FileRecords convertedRecords = FileRecords.open(outputFile, true, written, false, false)) { + try (FileRecords convertedRecords = FileRecords.open(outputFile, true, written, false)) { convertedRecordsBuffer = ByteBuffer.allocate(convertedRecords.sizeInBytes()); convertedRecords.readInto(convertedRecordsBuffer, 0); } diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 05ba4aa39c09d..6010c3a705b3c 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -672,7 +672,7 @@ object LogSegment { needsRecovery: Boolean = true): LogSegment = { val maxIndexSize = config.maxIndexSize new LogSegment( - FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate, needsRecovery), + FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate), LazyIndex.forOffset(Log.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize), LazyIndex.forTime(Log.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize), new TransactionIndex(baseOffset, Log.transactionIndexFile(dir, baseOffset, fileSuffix)), @@ -680,7 +680,7 @@ object LogSegment { indexIntervalBytes = config.indexInterval, rollJitterMs = config.randomSegmentJitter, time, - needsRecovery || !fileAlreadyExists) + needsFlushParentDir = needsRecovery || !fileAlreadyExists) } def deleteIfExists(dir: File, baseOffset: Long, fileSuffix: String = ""): Unit = { From 080a79a3f9ad60e63de7d9e653c953c70051a1e4 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 1 Apr 2021 19:21:17 -0700 Subject: [PATCH 23/25] default to false --- core/src/main/scala/kafka/log/LogSegment.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 6010c3a705b3c..49168818fcf26 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -669,7 +669,7 @@ object LogSegment { def open(dir: File, baseOffset: Long, config: LogConfig, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false, fileSuffix: String = "", - needsRecovery: Boolean = true): LogSegment = { + needsRecovery: Boolean = false): LogSegment = { val maxIndexSize = config.maxIndexSize new LogSegment( FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate), From 61eee4a2f46e27813d2f6b71d97fd11f1fc9515c Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 1 Apr 2021 19:25:43 -0700 Subject: [PATCH 24/25] add param to javadoc --- core/src/main/scala/kafka/log/LogSegment.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index 49168818fcf26..e06ef5cbaac09 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -51,6 +51,7 @@ import scala.math._ * @param indexIntervalBytes The approximate number of bytes between entries in the index * @param rollJitterMs The maximum random jitter subtracted from the scheduled segment roll time * @param time The time instance + * @param needsFlushParentDir Whether or not we need to flush the parent directory during the first flush */ @nonthreadsafe class LogSegment private[log] (val log: FileRecords, From 75439382c50b845f4bfca6563bbc4e9500fd35d4 Mon Sep 17 00:00:00 2001 From: Cong Ding Date: Thu, 1 Apr 2021 19:27:11 -0700 Subject: [PATCH 25/25] during flush -> during the next flush --- core/src/main/scala/kafka/log/LogSegment.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala index e06ef5cbaac09..e3b09d4b90f8b 100755 --- a/core/src/main/scala/kafka/log/LogSegment.scala +++ b/core/src/main/scala/kafka/log/LogSegment.scala @@ -98,7 +98,7 @@ class LogSegment private[log] (val log: FileRecords, /* the number of bytes since we last added an entry in the offset index */ private var bytesSinceLastIndexEntry = 0 - /* whether or not we need to flush the parent dir during flush */ + /* whether or not we need to flush the parent dir during the next flush */ private val atomicNeedsFlushParentDir = new AtomicBoolean(needsFlushParentDir) // The timestamp we used for time based log rolling and for ensuring max compaction delay