diff --git a/ambry-api/src/main/java/com.github.ambry/store/Store.java b/ambry-api/src/main/java/com.github.ambry/store/Store.java index 91916e405e..336b153ad3 100644 --- a/ambry-api/src/main/java/com.github.ambry/store/Store.java +++ b/ambry-api/src/main/java/com.github.ambry/store/Store.java @@ -80,7 +80,6 @@ public interface Store { /** * Get the corresponding {@link StoreStats} instance for this store. * @return a {@link StoreStats} instance which can be used to fetch store related stats - * @throws StoreException */ StoreStats getStoreStats(); @@ -105,7 +104,7 @@ public interface Store { boolean isEmpty(); /** - * Shutsdown the store + * Shuts down the store */ void shutdown() throws StoreException; } diff --git a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java index 2ee5e677b5..e4bba63f98 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java +++ b/ambry-store/src/main/java/com.github.ambry.store/BlobStore.java @@ -684,6 +684,15 @@ CompactionDetails getCompactionDetails(CompactionPolicy compactionPolicy) throws @Override public void shutdown() throws StoreException { + shutdown(false); + } + + /** + * Shuts down the store. + * @param skipDiskFlush {@code true} should skip any disk flush operations during shutdown. {@code false} otherwise. + * @throws StoreException + */ + private void shutdown(boolean skipDiskFlush) throws StoreException { long startTimeInMs = time.milliseconds(); synchronized (storeWriteLock) { checkStarted(); @@ -691,8 +700,8 @@ public void shutdown() throws StoreException { logger.info("Store : " + dataDir + " shutting down"); blobStoreStats.close(); compactor.close(30); - index.close(); - log.close(); + index.close(skipDiskFlush); + log.close(skipDiskFlush); metrics.deregisterMetrics(storeId); started = false; } catch (Exception e) { @@ -715,7 +724,7 @@ private void onError() throws StoreException { int count = errorCount.incrementAndGet(); if (count == config.storeIoErrorCountToTriggerShutdown) { logger.error("Shutting down BlobStore {} because IO error count exceeds threshold", storeId); - shutdown(); + shutdown(true); metrics.storeIoErrorTriggeredShutdownCount.inc(); } } diff --git a/ambry-store/src/main/java/com.github.ambry.store/BlobStoreCompactor.java b/ambry-store/src/main/java/com.github.ambry.store/BlobStoreCompactor.java index 3b2f42c4b1..a11b064540 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/BlobStoreCompactor.java +++ b/ambry-store/src/main/java/com.github.ambry.store/BlobStoreCompactor.java @@ -386,8 +386,8 @@ private void copy() throws InterruptedException, IOException, StoreException { long savedBytes = srcLog.getSegmentCapacity() * segmentCountDiff; srcMetrics.compactionBytesReclaimedCount.inc(savedBytes); } - tgtIndex.close(); - tgtLog.close(); + tgtIndex.close(false); + tgtLog.close(false); // persist the bloom of the "latest" index segment if it exists if (numSwapsUsed > 0) { tgtIndex.getIndexSegments().lastEntry().getValue().seal(); diff --git a/ambry-store/src/main/java/com.github.ambry.store/Log.java b/ambry-store/src/main/java/com.github.ambry.store/Log.java index ab61122829..f79eec4191 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/Log.java +++ b/ambry-store/src/main/java/com.github.ambry.store/Log.java @@ -284,11 +284,12 @@ void flush() throws IOException { /** * Closes the Log and all its segments. + * @param skipDiskFlush whether to skip any disk flush operations. * @throws IOException if the flush encountered an I/O error. */ - void close() throws IOException { + void close(boolean skipDiskFlush) throws IOException { for (LogSegment segment : segmentsByName.values()) { - segment.close(); + segment.close(skipDiskFlush); } } @@ -406,7 +407,7 @@ File allocate(String filename, long size) throws StoreException { private void free(LogSegment logSegment) throws StoreException { File segmentFile = logSegment.getView().getFirst(); try { - logSegment.close(); + logSegment.close(false); diskSpaceAllocator.free(segmentFile, logSegment.getCapacityInBytes()); } catch (IOException e) { StoreErrorCodes errorCode = StoreException.resolveErrorCode(e); diff --git a/ambry-store/src/main/java/com.github.ambry.store/LogSegment.java b/ambry-store/src/main/java/com.github.ambry.store/LogSegment.java index cbe4b41f8b..09d7dd3a57 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/LogSegment.java +++ b/ambry-store/src/main/java/com.github.ambry.store/LogSegment.java @@ -30,6 +30,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import net.smacke.jaydio.DirectRandomAccessFile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -55,9 +57,11 @@ class LogSegment implements Read, Write { private final AtomicLong endOffset; private final AtomicLong refCount = new AtomicLong(0); private final AtomicBoolean open = new AtomicBoolean(true); + static final int BYTE_BUFFER_SIZE_FOR_APPEND = 1024 * 1024; private ByteBuffer byteBufferForAppend = null; static final AtomicInteger byteBufferForAppendTotalCount = new AtomicInteger(0); + private static final Logger logger = LoggerFactory.getLogger(LogSegment.class); /** * Creates a LogSegment abstraction with the given capacity. @@ -99,7 +103,7 @@ class LogSegment implements Read, Write { * @param name the desired name of the segment. The name signifies the handle/ID of the LogSegment and may be * different from the filename of the {@code file}. * @param file the backing {@link File} for this segment. - * @param metrics he {@link StoreMetrics} instance to use. + * @param metrics the {@link StoreMetrics} instance to use. * @throws StoreException */ LogSegment(String name, File file, StoreMetrics metrics) throws StoreException { @@ -142,6 +146,28 @@ class LogSegment implements Read, Write { } } + /** + * Creates a LogSegment abstraction with the given file and given file channel (for testing purpose currently) + * @param file the backing {@link File} for this segment. + * @param capacityInBytes the intended capacity of the segment + * @param metrics the {@link StoreMetrics} instance to use. + * @param fileChannel the {@link FileChannel} associated with this segment. + * @throws StoreException + */ + LogSegment(File file, long capacityInBytes, StoreMetrics metrics, FileChannel fileChannel) throws StoreException { + this.file = file; + this.name = file.getName(); + this.capacityInBytes = capacityInBytes; + this.metrics = metrics; + this.fileChannel = fileChannel; + segmentView = new Pair<>(file, fileChannel); + // externals will set the correct value of end offset. + endOffset = new AtomicLong(0); + // update end offset + writeHeader(capacityInBytes); + startOffset = endOffset.get(); + } + /** * {@inheritDoc} *

@@ -463,11 +489,24 @@ void flush() throws IOException { /** * Closes this log segment + * @param skipDiskFlush whether to skip any disk flush operations. + * @throws IOException if there is an I/O error while closing the log segment. */ - void close() throws IOException { + void close(boolean skipDiskFlush) throws IOException { if (open.compareAndSet(true, false)) { - flush(); - fileChannel.close(); + if (!skipDiskFlush) { + flush(); + } + try { + // attempt to close file descriptors even when there is a disk I/O error. + fileChannel.close(); + } catch (IOException e) { + if (!skipDiskFlush) { + throw e; + } + logger.warn( + "I/O exception occurred when closing file channel in log segment. Skipping it to complete store shutdown process"); + } dropBufferForAppend(); } } diff --git a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java index cd4e02a122..6e39df5d24 100644 --- a/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java +++ b/ambry-store/src/main/java/com.github.ambry.store/PersistentIndex.java @@ -1330,27 +1330,30 @@ private void filterDeleteEntries(List messageEntries) { /** * Closes the index + * @param skipDiskFlush whether to skip any disk flush operations. * @throws StoreException */ - void close() throws StoreException { + void close(boolean skipDiskFlush) throws StoreException { long startTimeInMs = time.milliseconds(); try { if (persistorTask != null) { persistorTask.cancel(false); } - persistor.write(); - if (hardDeleter != null) { + if (!skipDiskFlush) { + persistor.write(); + if (hardDeleter != null) { + try { + hardDeleter.shutdown(); + } catch (Exception e) { + logger.error("Index : " + dataDir + " error while persisting cleanup token ", e); + } + } try { - hardDeleter.shutdown(); - } catch (Exception e) { - logger.error("Index : " + dataDir + " error while persisting cleanup token ", e); + cleanShutdownFile.createNewFile(); + } catch (IOException e) { + logger.error("Index : " + dataDir + " error while creating clean shutdown file ", e); } } - try { - cleanShutdownFile.createNewFile(); - } catch (IOException e) { - logger.error("Index : " + dataDir + " error while creating clean shutdown file ", e); - } } finally { metrics.indexShutdownTimeInMs.update(time.milliseconds() - startTimeInMs); } diff --git a/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java b/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java index 4a6a9971df..4d00a57714 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/BlobStoreTest.java @@ -942,6 +942,8 @@ public void isKeyDeletedTest() throws StoreException { @Test public void shutdownTest() throws StoreException { store.shutdown(); + File cleanShutDownFile = new File(tempDir, PersistentIndex.CLEAN_SHUTDOWN_FILENAME); + assertTrue("Clean shutdown file should exist.", cleanShutDownFile.exists()); // no operations should be possible if store is not up or has been shutdown verifyOperationFailuresOnInactiveStore(store); store = createBlobStore(getMockReplicaId(tempDirStr)); @@ -1027,6 +1029,10 @@ public void storeIoErrorCountTest() throws StoreException, IOException { // verify error count would be reset after successful Put operation testStore1.put(validWriteSet1); assertEquals("Error count should be reset", 0, testStore1.getErrorCount().get()); + // trigger a normal shutdown to persist data (otherwise following delete/ttl update operation will encounter ID_Not_Found error) + testStore1.shutdown(); + // restart for subsequent tests + testStore1.start(); // verify consecutive two failed Puts would make store shutdown (storeIoErrorCountToTriggerShutdown = 2) for (int i = 0; i < 2; ++i) { try { @@ -1178,7 +1184,9 @@ public void storeIoErrorCountTest() throws StoreException, IOException { // general /** - * Verify store method can capture store exception and correctly handle it. + * Verify store method can capture store exception and correctly handle it. The method also verifies that if exception + * is really caused by disk I/O error, store shutdown process would skip any disk flush operation and no clean shutdown + * file should exist in directory. * @param methodCaller the method caller to invoke store methods to trigger store exception * @throws StoreException */ @@ -1187,8 +1195,14 @@ private void catchStoreExceptionAndVerifyErrorCode(StoreMethodCaller methodCalle MockBlobStore mockBlobStore = new MockBlobStore(getMockReplicaId(tempDirStr), new StoreConfig(new VerifiableProperties(properties)), mock(ReplicaStatusDelegate.class), new StoreMetrics(new MetricRegistry())); + // First, verify that a normal shutdown will create a clean shutdown file in the store directory. + mockBlobStore.start(); + mockBlobStore.shutdown(); + File shutdownFile = new File(tempDir, PersistentIndex.CLEAN_SHUTDOWN_FILENAME); + assertTrue("Clean shutdown file should exist", shutdownFile.exists()); + mockBlobStore.start(); - // Verify that store won't be shut down if Unknown_Error occurred. + // Second, verify that store won't be shut down if Unknown_Error occurred. StoreException storeExceptionInIndex = new StoreException("Mock Unknown error", StoreErrorCodes.Unknown_Error); mockBlobStore.setPersistentIndex(storeExceptionInIndex); try { @@ -1197,18 +1211,22 @@ private void catchStoreExceptionAndVerifyErrorCode(StoreMethodCaller methodCalle } catch (StoreException e) { assertEquals("Mismatch in StoreErrorCode", StoreErrorCodes.Unknown_Error, e.getErrorCode()); } + assertTrue("Store should not be shut down", mockBlobStore.isStarted()); assertEquals("Mismatch in store io error count", 0, mockBlobStore.getErrorCount().get()); - // Verify that store will be shut down if IOError occurred + + // Third, verify that store will be shut down if IOError occurred (disk I/O error) storeExceptionInIndex = new StoreException("Mock disk I/O error", StoreErrorCodes.IOError); mockBlobStore.setPersistentIndex(storeExceptionInIndex); try { methodCaller.invoke(mockBlobStore); - //mockBlobStore.findMissingKeys(idsToProvide); fail("should fail"); } catch (StoreException e) { assertEquals("Mismatch in StoreErrorCode", StoreErrorCodes.IOError, e.getErrorCode()); } assertFalse("Store should be shutdown after error count exceeded threshold", mockBlobStore.isStarted()); + + // In the end, verify that store shutdown would skip any disk flush operation if it is triggered by a real disk I/O error. + assertFalse("When encountering disk I/O error, clean shutdown file shouldn't exist", shutdownFile.exists()); } /** diff --git a/ambry-store/src/test/java/com.github.ambry.store/CuratedLogIndexState.java b/ambry-store/src/test/java/com.github.ambry.store/CuratedLogIndexState.java index 9bfee3297d..b9da4dba34 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/CuratedLogIndexState.java +++ b/ambry-store/src/test/java/com.github.ambry.store/CuratedLogIndexState.java @@ -196,8 +196,8 @@ class CuratedLogIndexState { */ void destroy() throws IOException, StoreException { shutDownExecutorService(scheduler, 30, TimeUnit.SECONDS); - index.close(); - log.close(); + index.close(false); + log.close(false); assertTrue(tempDir + " could not be cleaned", StoreTestUtils.cleanDirectory(tempDir, false)); } @@ -842,7 +842,7 @@ void initIndex(ScheduledExecutorService newScheduler) throws StoreException { */ void reloadIndex(boolean closeBeforeReload, boolean deleteCleanShutdownFile) throws StoreException { if (closeBeforeReload) { - index.close(); + index.close(false); if (deleteCleanShutdownFile) { assertTrue("The clean shutdown file could not be deleted", new File(tempDir, PersistentIndex.CLEAN_SHUTDOWN_FILENAME).delete()); @@ -860,8 +860,8 @@ void reloadIndex(boolean closeBeforeReload, boolean deleteCleanShutdownFile) thr */ void reloadLog(boolean initIndex) throws IOException, StoreException { long segmentCapacity = log.getSegmentCapacity(); - index.close(); - log.close(); + index.close(false); + log.close(false); log = new Log(tempDirStr, LOG_CAPACITY, segmentCapacity, StoreTestUtils.DEFAULT_DISK_SPACE_ALLOCATOR, metrics); index = null; if (initIndex) { @@ -874,7 +874,7 @@ void reloadLog(boolean initIndex) throws IOException, StoreException { * @throws StoreException */ void closeAndClearIndex() throws StoreException { - index.close(); + index.close(false); // delete all index files File[] indexSegmentFiles = tempDir.listFiles(new FilenameFilter() { @Override diff --git a/ambry-store/src/test/java/com.github.ambry.store/HardDeleterTest.java b/ambry-store/src/test/java/com.github.ambry.store/HardDeleterTest.java index 082af76ba4..76efdbdcd6 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/HardDeleterTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/HardDeleterTest.java @@ -186,8 +186,8 @@ public void setup() throws Exception { @After public void cleanup() throws StoreException, IOException { scheduler.shutdown(); - index.close(); - log.close(); + index.close(false); + log.close(false); } /** @@ -357,7 +357,7 @@ public void testHardDelete() { // reset the internal tokens index.resetHardDeleterTokens(); } - index.close(); + index.close(false); } catch (Exception e) { e.printStackTrace(); assertEquals(false, true); diff --git a/ambry-store/src/test/java/com.github.ambry.store/IndexTest.java b/ambry-store/src/test/java/com.github.ambry.store/IndexTest.java index 62b3aff1f0..7e1f8f5cab 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/IndexTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/IndexTest.java @@ -1169,7 +1169,7 @@ public void indexPersistorTest() throws IOException, StoreException { state.reloadIndex(true, false); try { doThrow(new IOException(StoreException.IO_ERROR_STR)).when(mockLog).flush(); - state.index.close(); + state.index.close(false); fail("Should have thrown exception due to I/O error"); } catch (StoreException e) { assertEquals("StoreException error code mismatch ", StoreErrorCodes.IOError, e.getErrorCode()); @@ -1177,7 +1177,7 @@ public void indexPersistorTest() throws IOException, StoreException { // test that when IOException's error message is null, the error code should be Unknown_Error try { doThrow(new IOException()).when(mockLog).flush(); - state.index.close(); + state.index.close(false); fail("Should have thrown exception due to I/O error"); } catch (StoreException e) { assertEquals("StoreException error code mismatch ", StoreErrorCodes.Unknown_Error, e.getErrorCode()); @@ -1273,7 +1273,7 @@ public void getIndexSegmentFilesForLogSegmentTest() { } /** - * Tests {@link PersistentIndex#close()} can correctly cancel the scheduled persistor task and makes sure no persistor + * Tests {@link PersistentIndex#close(boolean)} can correctly cancel the scheduled persistor task and makes sure no persistor * is running background after index closed. * @throws StoreException * @throws InterruptedException @@ -1281,7 +1281,7 @@ public void getIndexSegmentFilesForLogSegmentTest() { @Test public void closeIndexToCancelPersistorTest() throws StoreException, InterruptedException { long SCHEDULER_PERIOD_MS = 10; - state.index.close(); + state.index.close(false); // re-initialize index by using mock scheduler (the intention is to speed up testing by using shorter period) ScheduledThreadPoolExecutor scheduler = (ScheduledThreadPoolExecutor) Utils.newScheduler(1, false); ScheduledThreadPoolExecutor mockScheduler = Mockito.spy(scheduler); @@ -1307,7 +1307,7 @@ public void run() { // verify that the persistor task is successfully scheduled assertTrue("The persistor task wasn't invoked within the expected time", mockPersistor.invokeCountDown.await(2 * SCHEDULER_PERIOD_MS, TimeUnit.MILLISECONDS)); - state.index.close(); + state.index.close(false); mockPersistor.invokeCountDown = new CountDownLatch(1); // verify that the persisitor task is canceled after index closed and is never invoked again. assertTrue("The persistor task should be canceled after index closed", persistorTask.get().isCancelled()); @@ -1322,7 +1322,7 @@ public void run() { */ @Test public void cleanupIndexSegmentFilesForLogSegmentTest() throws StoreException { - state.index.close(); + state.index.close(false); LogSegment logSegment = state.log.getFirstSegment(); while (logSegment != null) { LogSegment nextSegment = state.log.getNextSegment(logSegment); diff --git a/ambry-store/src/test/java/com.github.ambry.store/LogSegmentTest.java b/ambry-store/src/test/java/com.github.ambry.store/LogSegmentTest.java index 8211aa06ac..4ed19c187a 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/LogSegmentTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/LogSegmentTest.java @@ -32,8 +32,10 @@ import java.util.Random; import org.junit.After; import org.junit.Test; +import org.mockito.Mockito; import static org.junit.Assert.*; +import static org.mockito.Mockito.*; /** @@ -69,6 +71,7 @@ public void cleanup() { } } assertTrue("The directory [" + tempDir.getAbsolutePath() + "] could not be deleted", tempDir.delete()); + validateMockitoUsage(); } /** @@ -125,7 +128,7 @@ public void basicWriteAndReadTest() throws IOException, StoreException { // ensure flush doesn't throw any errors. segment.flush(); // close and reopen segment and ensure persistence. - segment.close(); + segment.close(false); segment = new LogSegment(segmentName, new File(tempDir, segmentName), metrics); segment.setEndOffset(writeStartOffset + buf.length); readAndEnsureMatch(segment, writeStartOffset, buf); @@ -319,7 +322,7 @@ public void readTest() throws IOException, StoreException { } } - segment.close(); + segment.close(false); // read after close buffer = ByteBuffer.allocate(1); try { @@ -409,7 +412,7 @@ public void writeFromTest() throws IOException, StoreException { } } - segment.close(); + segment.close(false); // ensure that writeFrom fails. try { segment.writeFrom(Channels.newChannel(new ByteBufferInputStream(buffer)), writeStartOffset, buffer.remaining()); @@ -422,6 +425,72 @@ public void writeFromTest() throws IOException, StoreException { } } + /** + * Test a normal shutdown of log segment. Verify that disk flush operation is invoked. + * @throws Exception + */ + @Test + public void closeLogSegmentTest() throws Exception { + Pair segmentAndFileChannel = getSegmentAndFileChannel("log_current1"); + LogSegment segment = segmentAndFileChannel.getFirst(); + FileChannel mockFileChannel = segmentAndFileChannel.getSecond(); + + // test that log segment is closed successfully, ensure that the flush method is invoked + segment.close(false); + verify(mockFileChannel).force(true); + assertFalse("File channel is not closed", segment.getView().getSecond().isOpen()); + assertTrue("File couldn't be deleted.", (new File(tempDir, segment.getName()).delete())); + } + + /** + * Test that closing log segment and skip any disk flush operation. Verify that flush method is never invoked. + * @throws Exception + */ + @Test + public void closeLogSegmentAndSkipFlushTest() throws Exception { + Pair segmentAndFileChannel = getSegmentAndFileChannel("log_current2"); + LogSegment segment = segmentAndFileChannel.getFirst(); + FileChannel mockFileChannel = segmentAndFileChannel.getSecond(); + + // test that log segment is being closed due to disk I/O error and flush operation should be skipped + segment.close(true); + verify(mockFileChannel, times(0)).force(true); + assertFalse("File channel is not closed", segment.getView().getSecond().isOpen()); + assertTrue("File couldn't be deleted.", (new File(tempDir, segment.getName()).delete())); + } + + /** + * Test that closing log segment (skip disk flush) and exception occurs. The shutdown process should continue. + */ + @Test + public void closeLogSegmentWithExceptionTest() throws Exception { + Pair segmentAndFileChannel = getSegmentAndFileChannel("log_current3"); + LogSegment segment = segmentAndFileChannel.getFirst(); + FileChannel mockFileChannel = segmentAndFileChannel.getSecond(); + + // test that log segment is being closed due to disk I/O error (shouldSkipDiskFlush = true) and exception occurs + // when closing file channel + doThrow(new IOException("close channel failure")).when(mockFileChannel).close(); + segment.close(true); + verify(mockFileChannel, times(0)).force(true); + assertTrue("File couldn't be deleted.", (new File(tempDir, segment.getName()).delete())); + + segmentAndFileChannel = getSegmentAndFileChannel("log_current4"); + segment = segmentAndFileChannel.getFirst(); + mockFileChannel = segmentAndFileChannel.getSecond(); + + // test that log segment is being closed during normal shutdown (shouldSkipDiskFlush = false) and exception occurs + doThrow(new IOException("close channel failure")).when(mockFileChannel).close(); + try { + segment.close(false); + fail("should fail because IOException occurred"); + } catch (IOException e) { + //expected + } + verify(mockFileChannel).force(true); + assertTrue("File couldn't be deleted.", (new File(tempDir, segment.getName()).delete())); + } + /** * Tests for special constructor cases. * @throws IOException @@ -543,6 +612,25 @@ private LogSegment getSegment(String segmentName, long capacityInBytes, boolean } } + /** + * Create a {@link LogSegment} and a mock file channel associated with it for testing. + * @param name the name of log segment + * @return a pair that contains log segment and mock file channel + * @throws Exception + */ + private Pair getSegmentAndFileChannel(String name) throws Exception { + File file = new File(tempDir, name); + if (file.exists()) { + assertTrue(file.getAbsolutePath() + " already exists and could not be deleted", file.delete()); + } + assertTrue("Segment file could not be created at path " + file.getAbsolutePath(), file.createNewFile()); + file.deleteOnExit(); + FileChannel fileChannel = Utils.openChannel(file, true); + FileChannel mockFileChannel = Mockito.spy(fileChannel); + LogSegment segment = new LogSegment(file, STANDARD_SEGMENT_SIZE, metrics, mockFileChannel); + return new Pair<>(segment, mockFileChannel); + } + /** * Appends random data of size {@code size} to given {@code segment}. * @param segment the {@link LogSegment} to append data to. @@ -591,7 +679,7 @@ private void readDirectlyAndEnsureMatch(LogSegment segment, long offsetToStartRe * @throws IOException */ private void closeSegmentAndDeleteFile(LogSegment segment) throws IOException { - segment.close(); + segment.close(false); assertFalse("File channel is not closed", segment.getView().getSecond().isOpen()); File segmentFile = new File(tempDir, segment.getName()); assertTrue("The segment file [" + segmentFile.getAbsolutePath() + "] could not be deleted", segmentFile.delete()); @@ -673,7 +761,7 @@ private void doAppendTest(Appender appender) throws IOException, StoreException readDirectlyAndEnsureMatch(segment, writeStartOffset + bufOne.length, bufTwo); readDirectlyAndEnsureMatch(segment, writeStartOffset + bufOne.length + bufTwo.length, bufThree); - segment.close(); + segment.close(false); // ensure that append fails. ByteBuffer buffer = ByteBuffer.wrap(TestUtils.getRandomBytes(1)); try { diff --git a/ambry-store/src/test/java/com.github.ambry.store/LogTest.java b/ambry-store/src/test/java/com.github.ambry.store/LogTest.java index bdb06e802b..43eebfd540 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/LogTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/LogTest.java @@ -185,7 +185,7 @@ public void appendErrorCasesTest() throws IOException, StoreException { assertEquals("Position of buffer has changed", 0, buffer.position()); } } finally { - log.close(); + log.close(false); cleanDirectory(tempDir); } } @@ -206,7 +206,7 @@ public void setActiveSegmentBadArgsTest() throws IOException, StoreException { } catch (IllegalArgumentException e) { // expected. Nothing to do. } finally { - log.close(); + log.close(false); cleanDirectory(tempDir); } } @@ -227,7 +227,7 @@ public void getNextSegmentBadArgsTest() throws IOException, StoreException { } catch (IllegalArgumentException e) { // expected. Nothing to do. } finally { - log.close(); + log.close(false); cleanDirectory(tempDir); } } @@ -248,7 +248,7 @@ public void getPrevSegmentBadArgsTest() throws IOException, StoreException { } catch (IllegalArgumentException e) { // expected. Nothing to do. } finally { - log.close(); + log.close(false); cleanDirectory(tempDir); } } @@ -283,7 +283,7 @@ public void getFileSpanForMessageBadArgsTest() throws IOException, StoreExceptio // expected. Nothing to do. } } finally { - log.close(); + log.close(false); cleanDirectory(tempDir); } } @@ -579,7 +579,7 @@ private List createSegmentFiles(int numToCreate, long numFinalSegments, if (numFinalSegments == 1) { String name = LogSegmentNameHelper.generateFirstSegmentName(false); File file = create(LogSegmentNameHelper.nameToFilename(name)); - new LogSegment(name, file, segmentCapacity, metrics, false).close(); + new LogSegment(name, file, segmentCapacity, metrics, false).close(false); segmentNames.add(name); } else { for (int i = 0; i < numToCreate; i++) { @@ -591,7 +591,7 @@ private List createSegmentFiles(int numToCreate, long numFinalSegments, long gen = Utils.getRandomLong(TestUtils.RANDOM, 1000); String name = LogSegmentNameHelper.getName(pos, gen); File file = create(LogSegmentNameHelper.nameToFilename(name)); - new LogSegment(name, file, segmentCapacity, metrics, true).close(); + new LogSegment(name, file, segmentCapacity, metrics, true).close(false); segmentNames.add(name); } } @@ -657,7 +657,7 @@ private void doComprehensiveTest(long logCapacity, long segmentCapacity, long wr flushCloseAndValidate(log); checkLogReload(logCapacity, Math.min(logCapacity, segmentCapacity), allSegmentNames); } finally { - log.close(); + log.close(false); cleanDirectory(tempDir); } } @@ -807,7 +807,7 @@ private void checkLogReload(long originalLogCapacity, long originalSegmentCapaci // the new config should be ignored. checkLog(log, originalSegmentCapacity, allSegmentNames); } finally { - log.close(); + log.close(false); } } } @@ -821,7 +821,7 @@ private void flushCloseAndValidate(Log log) throws IOException { // flush should not throw any exceptions log.flush(); // close log and ensure segments are closed - log.close(); + log.close(false); LogSegment segment = log.getFirstSegment(); while (segment != null) { assertFalse("LogSegment has not been closed", segment.getView().getSecond().isOpen()); diff --git a/ambry-store/src/test/java/com.github.ambry.store/StoreMessageReadSetTest.java b/ambry-store/src/test/java/com.github.ambry.store/StoreMessageReadSetTest.java index 886878ef4d..0c7ae5c563 100644 --- a/ambry-store/src/test/java/com.github.ambry.store/StoreMessageReadSetTest.java +++ b/ambry-store/src/test/java/com.github.ambry.store/StoreMessageReadSetTest.java @@ -232,7 +232,7 @@ public void storeMessageReadSetTest() throws IOException, StoreException { // expected. Nothing to do. } } finally { - log.close(); + log.close(false); } } @@ -297,7 +297,7 @@ public void blobReadOptionsTest() throws IOException, StoreException { // expected. Nothing to do. } } finally { - log.close(); + log.close(false); assertTrue(tempDir + " could not be cleaned", StoreTestUtils.cleanDirectory(tempDir, false)); } } diff --git a/ambry-tools/src/main/java/com.github.ambry/store/CompactionVerifier.java b/ambry-tools/src/main/java/com.github.ambry/store/CompactionVerifier.java index 0fd07cbce9..565a7509b7 100644 --- a/ambry-tools/src/main/java/com.github.ambry/store/CompactionVerifier.java +++ b/ambry-tools/src/main/java/com.github.ambry/store/CompactionVerifier.java @@ -269,13 +269,13 @@ private CompactionVerifier(CompactionVerifierConfig verifierConfig, StoreConfig @Override public void close() throws IOException { try { - srcIndex.close(); - tgtIndex.close(); + srcIndex.close(false); + tgtIndex.close(false); } catch (StoreException e) { throw new IOException(e); } - srcLog.close(); - tgtLog.close(); + srcLog.close(false); + tgtLog.close(false); cLog.close(); }