Skip to content

Commit

Permalink
Improve the local cache fallback behavior for corrupted pages
Browse files Browse the repository at this point in the history
### What changes are proposed in this pull request?

Throw a PageCorruptedException when the length of page inconsistent with the metadata
Do our best efforts to delete the corrupted page file
Reset the offset of buffer when we found the data has been corrupted to avoid ArrayOutOfBound exception. 

### Why are the changes needed?
We found the cache make presto keep failing when some of the page file got corrupted

### Does this PR introduce any user facing changes?

No

			pr-link: #18498
			change-id: cid-9012c5432e2b979f8242f3b247deed94b501d194
  • Loading branch information
beinan authored Feb 23, 2024
1 parent c34921d commit a3ecbc7
Show file tree
Hide file tree
Showing 9 changed files with 150 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import alluxio.collections.ConcurrentHashSet;
import alluxio.collections.Pair;
import alluxio.exception.FileDoesNotExistException;
import alluxio.exception.PageCorruptedException;
import alluxio.exception.PageNotFoundException;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.file.ByteArrayTargetBuffer;
Expand Down Expand Up @@ -919,6 +920,7 @@ private boolean deletePage(PageInfo pageInfo, boolean isTemporary) {

private int getPage(PageInfo pageInfo, int pageOffset, int bytesToRead,
ReadTargetBuffer target, CacheContext cacheContext) {
int originOffset = target.offset();
try {
int ret = pageInfo.getLocalCacheDir().getPageStore()
.get(pageInfo.getPageId(), pageOffset, bytesToRead, target,
Expand All @@ -927,10 +929,20 @@ private int getPage(PageInfo pageInfo, int pageOffset, int bytesToRead,
// data read from page store is inconsistent from the metastore
LOG.error("Failed to read page {}: supposed to read {} bytes, {} bytes actually read",
pageInfo.getPageId(), bytesToRead, ret);
target.offset(originOffset); //reset the offset
//best efforts to delete the corrupted file without acquire the write lock
deletePage(pageInfo, false);
return -1;
}
} catch (PageCorruptedException e) {
LOG.error("Data corrupted page {} from pageStore", pageInfo.getPageId(), e);
target.offset(originOffset); //reset the offset
//best efforts to delete the corrupted file without acquire the write lock
deletePage(pageInfo, false);
return -1;
} catch (IOException | PageNotFoundException e) {
LOG.debug("Failed to get existing page {} from pageStore", pageInfo.getPageId(), e);
target.offset(originOffset); //reset the offset
return -1;
}
return bytesToRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageStore;
import alluxio.exception.PageCorruptedException;
import alluxio.exception.PageNotFoundException;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.file.ReadTargetBuffer;
Expand Down Expand Up @@ -100,11 +101,15 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer
}
Path pagePath = getPagePath(pageId, isTemporary);
try (RandomAccessFile localFile = new RandomAccessFile(pagePath.toString(), "r")) {
long pageLength = localFile.length();
if (pageOffset + bytesToRead > pageLength) {
throw new PageCorruptedException(String.format(
"The page %s (%s) probably has been corrupted, "
+ "page-offset %s, bytes to read %s, page file length %s",
pageId, pagePath, pageOffset, bytesToRead, pageLength));
}
int bytesSkipped = localFile.skipBytes(pageOffset);
if (pageOffset != bytesSkipped) {
long pageLength = pagePath.toFile().length();
Preconditions.checkArgument(pageOffset <= pageLength,
"page offset %s exceeded page size %s", pageOffset, pageLength);
throw new IOException(
String.format("Failed to read page %s (%s) from offset %s: %s bytes skipped",
pageId, pagePath, pageOffset, bytesSkipped));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageStore;
import alluxio.exception.PageCorruptedException;
import alluxio.exception.PageNotFoundException;
import alluxio.file.ReadTargetBuffer;

Expand Down Expand Up @@ -68,8 +69,12 @@ public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer
throw new PageNotFoundException(pageId.getFileId() + "_" + pageId.getPageIndex());
}
MemPage page = mPageStoreMap.get(pageKey);
Preconditions.checkArgument(pageOffset <= page.getPageLength(),
"page offset %s exceeded page size %s", pageOffset, page.getPageLength());
if (pageOffset + bytesToRead > page.getPageLength()) {
throw new PageCorruptedException(String.format(
"The page %s probably has been corrupted, "
+ "page-offset %s, bytes to read %s, page file length %s",
pageId, pageOffset, bytesToRead, page.getPageLength()));
}
int bytesLeft = (int) Math.min(page.getPageLength() - pageOffset, target.remaining());
bytesLeft = Math.min(bytesLeft, bytesToRead);
target.writeBytes(page.getPage(), pageOffset, bytesLeft);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -583,6 +583,23 @@ public void testUnbuffer() throws Exception {
Assert.assertEquals(1, manager.mPagesServed);
}

@Test
public void testPageDataFileCorrupted() throws Exception
{
int pages = 10;
int fileSize = mPageSize * pages;
byte[] testData = BufferUtils.getIncreasingByteArray(fileSize);
ByteArrayCacheManager manager = new ByteArrayCacheManager();
//by default local cache fallback is not enabled, the read should fail for any error
LocalCacheFileInStream streamWithOutFallback = setupWithSingleFile(testData, manager);

sConf.set(PropertyKey.USER_CLIENT_CACHE_FALLBACK_ENABLED, true);
LocalCacheFileInStream streamWithFallback = setupWithSingleFile(testData, manager);
Assert.assertEquals(100, streamWithFallback.positionedRead(0, new byte[10], 100, 100));
Assert.assertEquals(1,
MetricsSystem.counter(MetricKey.CLIENT_CACHE_POSITION_READ_FALLBACK.getName()).getCount());
}

@Test
public void testPositionReadFallBack() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import alluxio.conf.Configuration;
import alluxio.conf.InstancedConfiguration;
import alluxio.conf.PropertyKey;
import alluxio.exception.PageCorruptedException;
import alluxio.exception.PageNotFoundException;
import alluxio.exception.status.ResourceExhaustedException;
import alluxio.file.ByteArrayTargetBuffer;
Expand Down Expand Up @@ -981,7 +982,7 @@ public void getTimeout() throws Exception {
}

@Test
public void getFaultyRead() throws Exception {
public void getFaultyReadWithNoExceptionManager() throws Exception {
PageStoreOptions pageStoreOptions = PageStoreOptions.create(mConf).get(0);
FaultyPageStore pageStore = new FaultyPageStore();
PageStoreDir dir =
Expand All @@ -998,6 +999,40 @@ public void getFaultyRead() throws Exception {
assertEquals(0, targetBuffer.offset());
}

@Test
public void getFaultyReadWithLocalCacheManager() throws Exception {
PageStoreOptions pageStoreOptions = PageStoreOptions.create(mConf).get(0);
FaultyPageStore pageStore = new FaultyPageStore();
PageStoreDir dir =
new LocalPageStoreDir(pageStoreOptions, pageStore, mEvictor);

mPageMetaStore = new DefaultPageMetaStore(ImmutableList.of(dir));
LocalCacheManager cacheManager = createLocalCacheManager(mConf, mPageMetaStore);
cacheManager.put(PAGE_ID1, PAGE1);
ByteArrayTargetBuffer targetBuffer = new ByteArrayTargetBuffer(mBuf, 0);
pageStore.setGetFaulty(true);
assertEquals(-1, cacheManager.get(PAGE_ID1, PAGE1.length,
targetBuffer, CacheContext.defaults()));
assertEquals(0, targetBuffer.offset());
}

@Test
public void getCorruptedReadWithLocalCacheManager() throws Exception {
PageStoreOptions pageStoreOptions = PageStoreOptions.create(mConf).get(0);
FaultyPageStore pageStore = new FaultyPageStore();
PageStoreDir dir =
new LocalPageStoreDir(pageStoreOptions, pageStore, mEvictor);

mPageMetaStore = new DefaultPageMetaStore(ImmutableList.of(dir));
LocalCacheManager cacheManager = createLocalCacheManager(mConf, mPageMetaStore);
cacheManager.put(PAGE_ID1, PAGE1);
ByteArrayTargetBuffer targetBuffer = new ByteArrayTargetBuffer(mBuf, 0);
pageStore.setGetCorrupted(true);
assertEquals(-1, cacheManager.get(PAGE_ID1, PAGE1.length,
targetBuffer, CacheContext.defaults()));
assertEquals(0, targetBuffer.offset());
}

@Test
public void deleteTimeout() throws Exception {
mConf.set(PropertyKey.USER_CLIENT_CACHE_TIMEOUT_DURATION, "2s");
Expand Down Expand Up @@ -1157,13 +1192,19 @@ public FaultyPageStore() {
private AtomicBoolean mDeleteFaulty = new AtomicBoolean(false);
private AtomicBoolean mGetFaulty = new AtomicBoolean(false);

private AtomicBoolean mGetCorrupted = new AtomicBoolean(false);

@Override
public int get(PageId pageId, int pageOffset, int bytesToRead, ReadTargetBuffer target,
boolean isTemporary) throws IOException, PageNotFoundException {
if (mGetFaulty.get()) {
target.offset(target.offset() + 100);
throw new IOException("Page read fault");
}
if (mGetCorrupted.get()) {
target.offset(target.offset() + 100);
throw new PageCorruptedException("page corrupted");
}
return super.get(pageId, pageOffset, bytesToRead, target, isTemporary);
}

Expand Down Expand Up @@ -1194,6 +1235,10 @@ void setDeleteFaulty(boolean faulty) {
void setGetFaulty(boolean faulty) {
mGetFaulty.set(faulty);
}

void setGetCorrupted(boolean faulty) {
mGetCorrupted.set(faulty);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageStore;
import alluxio.exception.PageCorruptedException;
import alluxio.file.ByteArrayTargetBuffer;

import org.junit.Before;
Expand Down Expand Up @@ -171,12 +173,26 @@ public void cleanFileAndDirectory() throws Exception {
assertFalse(Files.exists(p.getParent()));
}

@Test
public void testCorruptedPages() throws Exception {
mOptions.setFileBuckets(1);
LocalPageStore pageStore = new LocalPageStore(mOptions);
byte[] buf = new byte[1000];
PageId id = new PageId("1", 0);
pageStore.put(id, "corrupted".getBytes());
assertThrows(PageCorruptedException.class, () -> {
//the bytes caller want to read is larger than the page file, mostly means the page corrupted
pageStore.get(id, 0, 100, new ByteArrayTargetBuffer(buf, 0));
});
}

private void helloWorldTest(PageStore store) throws Exception {
String msg = "Hello, World!";
PageId id = new PageId("0", 0);
store.put(id, msg.getBytes());
byte[] buf = new byte[1024];
assertEquals(msg.getBytes().length, store.get(id, new ByteArrayTargetBuffer(buf, 0)));
assertEquals(msg.getBytes().length, store.get(id, 0, msg.length(),
new ByteArrayTargetBuffer(buf, 0)));
assertArrayEquals(msg.getBytes(), Arrays.copyOfRange(buf, 0, msg.getBytes().length));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ private void helloWorldTest(PageStore store) throws Exception {
PageId id = new PageId("0", 0);
store.put(id, msg.getBytes());
byte[] buf = new byte[PAGE_SIZE];
assertEquals(msg.getBytes().length, store.get(id, new ByteArrayTargetBuffer(buf, 0)));
assertEquals(msg.getBytes().length,
store.get(id, 0, msg.length(), new ByteArrayTargetBuffer(buf, 0)));
assertArrayEquals(msg.getBytes(), Arrays.copyOfRange(buf, 0, msg.getBytes().length));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import alluxio.ProjectConstants;
import alluxio.client.file.cache.PageId;
import alluxio.client.file.cache.PageStore;
import alluxio.exception.PageCorruptedException;
import alluxio.exception.PageNotFoundException;
import alluxio.file.ByteArrayTargetBuffer;
import alluxio.util.io.BufferUtils;
Expand Down Expand Up @@ -79,7 +80,8 @@ public void helloWorldTest() throws Exception {
PageId id = new PageId("0", 0);
mPageStore.put(id, msgBytes);
byte[] buf = new byte[1024];
assertEquals(msgBytes.length, mPageStore.get(id, new ByteArrayTargetBuffer(buf, 0)));
assertEquals(msgBytes.length,
mPageStore.get(id, 0, msgBytes.length, new ByteArrayTargetBuffer(buf, 0)));
assertArrayEquals(msgBytes, Arrays.copyOfRange(buf, 0, msgBytes.length));
mPageStore.delete(id);
try {
Expand All @@ -97,7 +99,8 @@ public void getOffset() throws Exception {
mPageStore.put(id, BufferUtils.getIncreasingByteArray(len));
byte[] buf = new byte[len];
for (int offset = 1; offset < len; offset++) {
int bytesRead = mPageStore.get(id, offset, len, new ByteArrayTargetBuffer(buf, 0), false);
int bytesRead = mPageStore.get(id, offset, len - offset,
new ByteArrayTargetBuffer(buf, 0), false);
assertEquals(len - offset, bytesRead);
assertArrayEquals(BufferUtils.getIncreasingByteArray(offset, len - offset),
Arrays.copyOfRange(buf, 0, bytesRead));
Expand All @@ -111,7 +114,7 @@ public void getOffsetOverflow() throws Exception {
PageId id = new PageId("0", 0);
mPageStore.put(id, BufferUtils.getIncreasingByteArray(len));
byte[] buf = new byte[1024];
assertThrows(IllegalArgumentException.class, () ->
assertThrows(PageCorruptedException.class, () ->
mPageStore.get(id, offset, len, new ByteArrayTargetBuffer(buf, 0)));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* The Alluxio Open Foundation licenses this work under the Apache License, version 2.0
* (the "License"). You may not use this work except in compliance with the License, which is
* available at www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" basis, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied, as more fully set forth in the License.
*
* See the NOTICE file distributed with this work for information regarding copyright ownership.
*/

package alluxio.exception;

/**
* An exception that should be thrown when the data of a page has been corrupted in store.
*/
public class PageCorruptedException extends RuntimeException {

/**
* Construct PageCorruptedException with the specified message.
* @param message
*/
public PageCorruptedException(String message) {
super(message);
}

/**
* Construct PageCorruptedException with the specified message and cause.
* @param message
* @param cause
*/
public PageCorruptedException(String message, Throwable cause) {
super(message, cause);
}
}

0 comments on commit a3ecbc7

Please sign in to comment.