Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow store to skip disk flush during shutdown if disk I/O error occurs #1212

Merged
merged 2 commits into from
Jul 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ambry-api/src/main/java/com.github.ambry/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ public interface Store {
/**
* Get the corresponding {@link StoreStats} instance for this store.
* @return a {@link StoreStats} instance which can be used to fetch store related stats
* @throws StoreException
*/
StoreStats getStoreStats();

Expand All @@ -105,7 +104,7 @@ public interface Store {
boolean isEmpty();

/**
* Shutsdown the store
* Shuts down the store
*/
void shutdown() throws StoreException;
}
15 changes: 12 additions & 3 deletions ambry-store/src/main/java/com.github.ambry.store/BlobStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -684,15 +684,24 @@ CompactionDetails getCompactionDetails(CompactionPolicy compactionPolicy) throws

@Override
public void shutdown() throws StoreException {
shutdown(false);
}

/**
* Shuts down the store.
* @param skipDiskFlush {@code true} should skip any disk flush operations during shutdown. {@code false} otherwise.
* @throws StoreException
*/
private void shutdown(boolean skipDiskFlush) throws StoreException {
long startTimeInMs = time.milliseconds();
synchronized (storeWriteLock) {
checkStarted();
try {
logger.info("Store : " + dataDir + " shutting down");
blobStoreStats.close();
compactor.close(30);
index.close();
log.close();
index.close(skipDiskFlush);
log.close(skipDiskFlush);
metrics.deregisterMetrics(storeId);
started = false;
} catch (Exception e) {
Expand All @@ -715,7 +724,7 @@ private void onError() throws StoreException {
int count = errorCount.incrementAndGet();
if (count == config.storeIoErrorCountToTriggerShutdown) {
logger.error("Shutting down BlobStore {} because IO error count exceeds threshold", storeId);
shutdown();
shutdown(true);
metrics.storeIoErrorTriggeredShutdownCount.inc();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,8 +386,8 @@ private void copy() throws InterruptedException, IOException, StoreException {
long savedBytes = srcLog.getSegmentCapacity() * segmentCountDiff;
srcMetrics.compactionBytesReclaimedCount.inc(savedBytes);
}
tgtIndex.close();
tgtLog.close();
tgtIndex.close(false);
tgtLog.close(false);
// persist the bloom of the "latest" index segment if it exists
if (numSwapsUsed > 0) {
tgtIndex.getIndexSegments().lastEntry().getValue().seal();
Expand Down
7 changes: 4 additions & 3 deletions ambry-store/src/main/java/com.github.ambry.store/Log.java
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,12 @@ void flush() throws IOException {

/**
* Closes the Log and all its segments.
* @param skipDiskFlush whether to skip any disk flush operations.
* @throws IOException if the flush encountered an I/O error.
*/
void close() throws IOException {
void close(boolean skipDiskFlush) throws IOException {
for (LogSegment segment : segmentsByName.values()) {
segment.close();
segment.close(skipDiskFlush);
}
}

Expand Down Expand Up @@ -406,7 +407,7 @@ File allocate(String filename, long size) throws StoreException {
private void free(LogSegment logSegment) throws StoreException {
File segmentFile = logSegment.getView().getFirst();
try {
logSegment.close();
logSegment.close(false);
diskSpaceAllocator.free(segmentFile, logSegment.getCapacityInBytes());
} catch (IOException e) {
StoreErrorCodes errorCode = StoreException.resolveErrorCode(e);
Expand Down
47 changes: 43 additions & 4 deletions ambry-store/src/main/java/com.github.ambry.store/LogSegment.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import net.smacke.jaydio.DirectRandomAccessFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
Expand All @@ -55,9 +57,11 @@ class LogSegment implements Read, Write {
private final AtomicLong endOffset;
private final AtomicLong refCount = new AtomicLong(0);
private final AtomicBoolean open = new AtomicBoolean(true);

static final int BYTE_BUFFER_SIZE_FOR_APPEND = 1024 * 1024;
private ByteBuffer byteBufferForAppend = null;
static final AtomicInteger byteBufferForAppendTotalCount = new AtomicInteger(0);
private static final Logger logger = LoggerFactory.getLogger(LogSegment.class);

/**
* Creates a LogSegment abstraction with the given capacity.
Expand Down Expand Up @@ -99,7 +103,7 @@ class LogSegment implements Read, Write {
* @param name the desired name of the segment. The name signifies the handle/ID of the LogSegment and may be
* different from the filename of the {@code file}.
* @param file the backing {@link File} for this segment.
* @param metrics he {@link StoreMetrics} instance to use.
* @param metrics the {@link StoreMetrics} instance to use.
* @throws StoreException
*/
LogSegment(String name, File file, StoreMetrics metrics) throws StoreException {
Expand Down Expand Up @@ -142,6 +146,28 @@ class LogSegment implements Read, Write {
}
}

/**
* Creates a LogSegment abstraction with the given file and given file channel (for testing purpose currently)
* @param file the backing {@link File} for this segment.
* @param capacityInBytes the intended capacity of the segment
* @param metrics the {@link StoreMetrics} instance to use.
* @param fileChannel the {@link FileChannel} associated with this segment.
* @throws StoreException
*/
LogSegment(File file, long capacityInBytes, StoreMetrics metrics, FileChannel fileChannel) throws StoreException {
this.file = file;
this.name = file.getName();
this.capacityInBytes = capacityInBytes;
this.metrics = metrics;
this.fileChannel = fileChannel;
segmentView = new Pair<>(file, fileChannel);
// externals will set the correct value of end offset.
endOffset = new AtomicLong(0);
// update end offset
writeHeader(capacityInBytes);
startOffset = endOffset.get();
}

/**
* {@inheritDoc}
* <p/>
Expand Down Expand Up @@ -463,11 +489,24 @@ void flush() throws IOException {

/**
* Closes this log segment
* @param skipDiskFlush whether to skip any disk flush operations.
* @throws IOException if there is an I/O error while closing the log segment.
*/
void close() throws IOException {
void close(boolean skipDiskFlush) throws IOException {
if (open.compareAndSet(true, false)) {
flush();
fileChannel.close();
if (!skipDiskFlush) {
flush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why skip this completely, as opposed to catching and ignoring the exception like you do when closing the channel?

Copy link
Contributor Author

@jsjtzyy jsjtzyy Jul 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flush() is always attempting to write onto disk, which will definitely throw IOException if the disk is bad. The code skips it to avoid unnecessary disk flush operations (note that this method is in LogSegment, each store may attempt to write to disk multiple times because there are multiple log segments within store). However, I am not sure fileChannel.close() will invoke IOException if there is disk I/O error. My intention is to give it a try to close the file descriptor of this segment.

}
try {
// attempt to close file descriptors even when there is a disk I/O error.
fileChannel.close();
} catch (IOException e) {
if (!skipDiskFlush) {
throw e;
}
logger.warn(
"I/O exception occurred when closing file channel in log segment. Skipping it to complete store shutdown process");
}
dropBufferForAppend();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1330,27 +1330,30 @@ private void filterDeleteEntries(List<MessageInfo> messageEntries) {

/**
* Closes the index
* @param skipDiskFlush whether to skip any disk flush operations.
* @throws StoreException
*/
void close() throws StoreException {
void close(boolean skipDiskFlush) throws StoreException {
long startTimeInMs = time.milliseconds();
try {
if (persistorTask != null) {
persistorTask.cancel(false);
}
persistor.write();
if (hardDeleter != null) {
if (!skipDiskFlush) {
persistor.write();
if (hardDeleter != null) {
try {
hardDeleter.shutdown();
} catch (Exception e) {
logger.error("Index : " + dataDir + " error while persisting cleanup token ", e);
}
}
try {
hardDeleter.shutdown();
} catch (Exception e) {
logger.error("Index : " + dataDir + " error while persisting cleanup token ", e);
cleanShutdownFile.createNewFile();
} catch (IOException e) {
logger.error("Index : " + dataDir + " error while creating clean shutdown file ", e);
}
}
try {
cleanShutdownFile.createNewFile();
} catch (IOException e) {
logger.error("Index : " + dataDir + " error while creating clean shutdown file ", e);
}
} finally {
metrics.indexShutdownTimeInMs.update(time.milliseconds() - startTimeInMs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,8 @@ public void isKeyDeletedTest() throws StoreException {
@Test
public void shutdownTest() throws StoreException {
store.shutdown();
File cleanShutDownFile = new File(tempDir, PersistentIndex.CLEAN_SHUTDOWN_FILENAME);
assertTrue("Clean shutdown file should exist.", cleanShutDownFile.exists());
// no operations should be possible if store is not up or has been shutdown
verifyOperationFailuresOnInactiveStore(store);
store = createBlobStore(getMockReplicaId(tempDirStr));
Expand Down Expand Up @@ -1027,6 +1029,10 @@ public void storeIoErrorCountTest() throws StoreException, IOException {
// verify error count would be reset after successful Put operation
testStore1.put(validWriteSet1);
assertEquals("Error count should be reset", 0, testStore1.getErrorCount().get());
// trigger a normal shutdown to persist data (otherwise following delete/ttl update operation will encounter ID_Not_Found error)
testStore1.shutdown();
// restart for subsequent tests
testStore1.start();
// verify consecutive two failed Puts would make store shutdown (storeIoErrorCountToTriggerShutdown = 2)
for (int i = 0; i < 2; ++i) {
try {
Expand Down Expand Up @@ -1178,7 +1184,9 @@ public void storeIoErrorCountTest() throws StoreException, IOException {
// general

/**
* Verify store method can capture store exception and correctly handle it.
* Verify store method can capture store exception and correctly handle it. The method also verifies that if exception
* is really caused by disk I/O error, store shutdown process would skip any disk flush operation and no clean shutdown
* file should exist in directory.
* @param methodCaller the method caller to invoke store methods to trigger store exception
* @throws StoreException
*/
Expand All @@ -1187,8 +1195,14 @@ private void catchStoreExceptionAndVerifyErrorCode(StoreMethodCaller methodCalle
MockBlobStore mockBlobStore =
new MockBlobStore(getMockReplicaId(tempDirStr), new StoreConfig(new VerifiableProperties(properties)),
mock(ReplicaStatusDelegate.class), new StoreMetrics(new MetricRegistry()));
// First, verify that a normal shutdown will create a clean shutdown file in the store directory.
mockBlobStore.start();
mockBlobStore.shutdown();
File shutdownFile = new File(tempDir, PersistentIndex.CLEAN_SHUTDOWN_FILENAME);
assertTrue("Clean shutdown file should exist", shutdownFile.exists());

mockBlobStore.start();
// Verify that store won't be shut down if Unknown_Error occurred.
// Second, verify that store won't be shut down if Unknown_Error occurred.
StoreException storeExceptionInIndex = new StoreException("Mock Unknown error", StoreErrorCodes.Unknown_Error);
mockBlobStore.setPersistentIndex(storeExceptionInIndex);
try {
Expand All @@ -1197,18 +1211,22 @@ private void catchStoreExceptionAndVerifyErrorCode(StoreMethodCaller methodCalle
} catch (StoreException e) {
assertEquals("Mismatch in StoreErrorCode", StoreErrorCodes.Unknown_Error, e.getErrorCode());
}
assertTrue("Store should not be shut down", mockBlobStore.isStarted());
assertEquals("Mismatch in store io error count", 0, mockBlobStore.getErrorCount().get());
// Verify that store will be shut down if IOError occurred

// Third, verify that store will be shut down if IOError occurred (disk I/O error)
storeExceptionInIndex = new StoreException("Mock disk I/O error", StoreErrorCodes.IOError);
mockBlobStore.setPersistentIndex(storeExceptionInIndex);
try {
methodCaller.invoke(mockBlobStore);
//mockBlobStore.findMissingKeys(idsToProvide);
fail("should fail");
} catch (StoreException e) {
assertEquals("Mismatch in StoreErrorCode", StoreErrorCodes.IOError, e.getErrorCode());
}
assertFalse("Store should be shutdown after error count exceeded threshold", mockBlobStore.isStarted());

// In the end, verify that store shutdown would skip any disk flush operation if it is triggered by a real disk I/O error.
assertFalse("When encountering disk I/O error, clean shutdown file shouldn't exist", shutdownFile.exists());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,8 @@ class CuratedLogIndexState {
*/
void destroy() throws IOException, StoreException {
shutDownExecutorService(scheduler, 30, TimeUnit.SECONDS);
index.close();
log.close();
index.close(false);
log.close(false);
assertTrue(tempDir + " could not be cleaned", StoreTestUtils.cleanDirectory(tempDir, false));
}

Expand Down Expand Up @@ -842,7 +842,7 @@ void initIndex(ScheduledExecutorService newScheduler) throws StoreException {
*/
void reloadIndex(boolean closeBeforeReload, boolean deleteCleanShutdownFile) throws StoreException {
if (closeBeforeReload) {
index.close();
index.close(false);
if (deleteCleanShutdownFile) {
assertTrue("The clean shutdown file could not be deleted",
new File(tempDir, PersistentIndex.CLEAN_SHUTDOWN_FILENAME).delete());
Expand All @@ -860,8 +860,8 @@ void reloadIndex(boolean closeBeforeReload, boolean deleteCleanShutdownFile) thr
*/
void reloadLog(boolean initIndex) throws IOException, StoreException {
long segmentCapacity = log.getSegmentCapacity();
index.close();
log.close();
index.close(false);
log.close(false);
log = new Log(tempDirStr, LOG_CAPACITY, segmentCapacity, StoreTestUtils.DEFAULT_DISK_SPACE_ALLOCATOR, metrics);
index = null;
if (initIndex) {
Expand All @@ -874,7 +874,7 @@ void reloadLog(boolean initIndex) throws IOException, StoreException {
* @throws StoreException
*/
void closeAndClearIndex() throws StoreException {
index.close();
index.close(false);
// delete all index files
File[] indexSegmentFiles = tempDir.listFiles(new FilenameFilter() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,8 @@ public void setup() throws Exception {
@After
public void cleanup() throws StoreException, IOException {
scheduler.shutdown();
index.close();
log.close();
index.close(false);
log.close(false);
}

/**
Expand Down Expand Up @@ -357,7 +357,7 @@ public void testHardDelete() {
// reset the internal tokens
index.resetHardDeleterTokens();
}
index.close();
index.close(false);
} catch (Exception e) {
e.printStackTrace();
assertEquals(false, true);
Expand Down
Loading