Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix FileInStreamIntegrationTest #18178

Merged
merged 11 commits into from
Oct 10, 2023
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,10 @@
import alluxio.AlluxioTestDirectory;
import alluxio.AlluxioURI;
import alluxio.Constants;
import alluxio.annotation.dora.DoraTestTodoItem;
import alluxio.client.ReadType;
import alluxio.client.file.FileInStream;
import alluxio.client.file.FileOutStream;
import alluxio.client.file.FileSystem;
import alluxio.client.file.FileSystemTestUtils;
import alluxio.client.file.URIStatus;
import alluxio.conf.PropertyKey;
import alluxio.grpc.CreateFilePOptions;
import alluxio.grpc.OpenFilePOptions;
Expand All @@ -29,16 +26,13 @@
import alluxio.security.authorization.Mode;
import alluxio.testutils.BaseIntegrationTest;
import alluxio.testutils.LocalAlluxioClusterResource;
import alluxio.util.CommonUtils;
import alluxio.util.io.BufferUtils;
import alluxio.util.io.PathUtils;
import alluxio.wire.FileBlockInfo;
import alluxio.worker.block.BlockStoreType;

import com.google.common.collect.ImmutableList;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
Expand All @@ -59,9 +53,6 @@
* Integration tests for {@link alluxio.client.file.FileInStream}.
*/
@RunWith(Parameterized.class)
@Ignore
@DoraTestTodoItem(action = DoraTestTodoItem.Action.FIX, owner = "jiaming",
comment = "fix the tests")
public final class FileInStreamIntegrationTest extends BaseIntegrationTest {
// The block size needs to be sufficiently large based on TCP send/receive buffers, set to 1MB.
private static final int BLOCK_SIZE = Constants.MB;
Expand Down Expand Up @@ -137,49 +128,6 @@ private List<CreateFilePOptions> getOptionSet() {
return ret;
}

/**
* Tests {@link FileInStream#read()} across block boundary.
*/
@Test
@LocalAlluxioClusterResource.Config(
confParams = {PropertyKey.Name.USER_STREAMING_READER_CHUNK_SIZE_BYTES, "64KB"})
public void readTest1() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this test irrelevant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for (int k = MIN_LEN; k <= MAX_LEN; k += DELTA) {
for (CreateFilePOptions op : getOptionSet()) {
String filename = mTestPath + "/file_" + k + "_" + op.hashCode();
AlluxioURI uri = new AlluxioURI(filename);

FileInStream is = mFileSystem.openFile(uri, FileSystemTestUtils.toOpenFileOptions(op));
byte[] ret = new byte[k];
int value = is.read();
int cnt = 0;
while (value != -1) {
Assert.assertTrue(value >= 0);
Assert.assertTrue(value < 256);
ret[cnt++] = (byte) value;
value = is.read();
}
Assert.assertEquals(cnt, k);
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(k, ret));
is.close();

is = mFileSystem.openFile(uri, FileSystemTestUtils.toOpenFileOptions(op));
ret = new byte[k];
value = is.read();
cnt = 0;
while (value != -1) {
Assert.assertTrue(value >= 0);
Assert.assertTrue(value < 256);
ret[cnt++] = (byte) value;
value = is.read();
}
Assert.assertEquals(cnt, k);
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(k, ret));
is.close();
}
}
}

/**
* Tests {@link FileInStream#read(byte[])}.
*/
Expand Down Expand Up @@ -433,7 +381,7 @@ public void run() {
PropertyKey.Name.USER_STREAMING_READER_CHUNK_SIZE_BYTES, "64KB",
PropertyKey.Name.WORKER_RAMDISK_SIZE, "1GB"})
public void remoteReadLargeFile() throws Exception {
// write a file outside of Alluxio
// write a file outside Alluxio
AlluxioURI filePath = new AlluxioURI(mTestPath + "/test");
try (FileOutStream os = mFileSystem.createFile(filePath, CreateFilePOptions.newBuilder()
.setBlockSizeBytes(16 * Constants.MB).setWriteType(WritePType.THROUGH).build())) {
Expand Down Expand Up @@ -463,220 +411,9 @@ public void positionedReadWithoutCaching() throws Exception {

FileInStream is = mFileSystem.openFile(uri, FileSystemTestUtils.toOpenFileOptions(op));
byte[] ret = new byte[DELTA - 1];
Assert.assertEquals(DELTA - 1, is.positionedRead(MIN_LEN - DELTA + 1, ret, 0, DELTA));
Assert.assertEquals(DELTA - 1, is.positionedRead(MIN_LEN - DELTA + 1, ret, 0, DELTA - 1));
Assert.assertTrue(BufferUtils.equalIncreasingByteArray(MIN_LEN - DELTA + 1, DELTA - 1, ret));
is.close();
}
}

@Test
@LocalAlluxioClusterResource.Config(
confParams = {PropertyKey.Name.USER_FILE_SEQUENTIAL_PREAD_THRESHOLD, "700KB"})
jiacheliu3 marked this conversation as resolved.
Show resolved Hide resolved
public void positionedReadWithLargeThreshold() throws Exception {
List<CreateFilePOptions> optionSet = new ArrayList<>(2);
optionSet.add(mWriteBoth);
optionSet.add(mWriteUnderStore);
for (CreateFilePOptions op : optionSet) {
String filename = mTestPath + "/file_" + MIN_LEN + "_" + op.hashCode();
AlluxioURI uri = new AlluxioURI(filename);

try (FileInStream is = mFileSystem.openFile(uri, FileSystemTestUtils.toOpenFileOptions(op))) {
byte[] ret = new byte[DELTA - 1];
Assert.assertEquals(DELTA - 1,
is.positionedRead(MIN_LEN - DELTA + 1, ret, 0, DELTA));
Assert.assertTrue(
BufferUtils.equalIncreasingByteArray(MIN_LEN - DELTA + 1, DELTA - 1, ret));
}
}
}

@Test
@LocalAlluxioClusterResource.Config(
confParams = {PropertyKey.Name.USER_FILE_SEQUENTIAL_PREAD_THRESHOLD, "200KB"})
public void positionedReadWithSmallThreshold() throws Exception {
jiacheliu3 marked this conversation as resolved.
Show resolved Hide resolved
List<CreateFilePOptions> optionSet = new ArrayList<>(2);
optionSet.add(mWriteBoth);
optionSet.add(mWriteUnderStore);
for (CreateFilePOptions op : optionSet) {
String filename = mTestPath + "/file_" + MIN_LEN + "_" + op.hashCode();
AlluxioURI uri = new AlluxioURI(filename);

try (FileInStream is = mFileSystem.openFile(uri, FileSystemTestUtils.toOpenFileOptions(op))) {
byte[] ret = new byte[DELTA - 1];
Assert.assertEquals(DELTA - 1,
is.positionedRead(MIN_LEN - DELTA + 1, ret, 0, DELTA));
Assert.assertTrue(
BufferUtils.equalIncreasingByteArray(MIN_LEN - DELTA + 1, DELTA - 1, ret));
}
}
}

@Test(timeout = 10000)
@LocalAlluxioClusterResource.Config(
confParams = {PropertyKey.Name.WORKER_BLOCK_HEARTBEAT_INTERVAL_MS, "2000"})
public void asyncCacheFirstBlock() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async cache feature has been deprecated and removed

String filename = mTestPath + "/file_" + MAX_LEN + "_" + mWriteUnderStore.hashCode();
AlluxioURI uri = new AlluxioURI(filename);

for (ReadType readType : ReadType.values()) {
mFileSystem.free(uri);
CommonUtils.waitFor("No in-Alluxio data left from previous iteration.", () -> {
try {
URIStatus st = mFileSystem.getStatus(uri);
return st.getInAlluxioPercentage() == 0;
} catch (Exception e) {
return false;
}
});
FileInStream is = mFileSystem.openFile(uri,
OpenFilePOptions.newBuilder().setReadType(readType.toProto()).build());
is.read();
URIStatus status = mFileSystem.getStatus(uri);
// if the test is running extremely slow, this check can happen after the worker reports
// the newly cached blocks to master, and thus failing the assertion
Assert.assertEquals(0, status.getInAlluxioPercentage());
is.close();
if (readType.isCache()) {
CommonUtils.waitFor("First block to be cached.", () -> {
try {
URIStatus st = mFileSystem.getStatus(uri);
boolean achieved = true;
// Expect only first block to be cached, other blocks should be empty in Alluxio
for (int i = 0; i < st.getFileBlockInfos().size(); i++) {
FileBlockInfo info = st.getFileBlockInfos().get(i);
if (i == 0) {
achieved = achieved && !info.getBlockInfo().getLocations().isEmpty();
} else {
achieved = achieved && info.getBlockInfo().getLocations().isEmpty();
}
}
return achieved;
} catch (Exception e) {
return false;
}
});
} else {
Thread.sleep(1000);
status = mFileSystem.getStatus(uri);
Assert.assertEquals(0, status.getInAlluxioPercentage());
}
}
}

@Test(timeout = 10000)
@LocalAlluxioClusterResource.Config(
confParams = {PropertyKey.Name.WORKER_BLOCK_HEARTBEAT_INTERVAL_MS, "2000"})
public void asyncCacheAfterSeek() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async cache feature has been deprecated and removed

String filename = mTestPath + "/file_" + MAX_LEN + "_" + mWriteUnderStore.hashCode();
AlluxioURI uri = new AlluxioURI(filename);

for (ReadType readType : ReadType.values()) {
mFileSystem.free(uri);
CommonUtils.waitFor("No in-Alluxio data left from previous iteration.", () -> {
try {
URIStatus st = mFileSystem.getStatus(uri);
return st.getInAlluxioPercentage() == 0;
} catch (Exception e) {
return false;
}
});
FileInStream is = mFileSystem.openFile(uri,
OpenFilePOptions.newBuilder().setReadType(readType.toProto()).build());
URIStatus status = mFileSystem.getStatus(uri);
is.seek(status.getBlockSizeBytes() + 1);
is.read();
status = mFileSystem.getStatus(uri);
Assert.assertEquals(0, status.getInAlluxioPercentage());
is.close();
if (readType.isCache()) {
CommonUtils.waitFor("Second block to be cached.", () -> {
try {
URIStatus st = mFileSystem.getStatus(uri);
boolean achieved = true;
// Expect only second block to be cached, other blocks should be empty in Alluxio
for (int i = 0; i < st.getFileBlockInfos().size(); i++) {
FileBlockInfo info = st.getFileBlockInfos().get(i);
if (i == 1) {
achieved = achieved && !info.getBlockInfo().getLocations().isEmpty();
} else {
achieved = achieved && info.getBlockInfo().getLocations().isEmpty();
}
}
return achieved;
} catch (Exception e) {
return false;
}
});
} else {
Thread.sleep(1000);
status = mFileSystem.getStatus(uri);
Assert.assertEquals(0, status.getInAlluxioPercentage());
}
}
}

@Test(timeout = 10000)
public void asyncCacheFirstBlockPRead() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

async cache feature has been deprecated and removed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's leave this comment open here so if anyone traces the code to this PR they will know what's going on

String filename = mTestPath + "/file_" + MAX_LEN + "_" + mWriteUnderStore.hashCode();
AlluxioURI uri = new AlluxioURI(filename);

for (ReadType readType : ReadType.values()) {
mFileSystem.free(uri);
CommonUtils.waitFor("No in-Alluxio data left from previous iteration.", () -> {
try {
URIStatus st = mFileSystem.getStatus(uri);
return st.getInAlluxioPercentage() == 0;
} catch (Exception e) {
return false;
}
});
FileInStream is = mFileSystem.openFile(uri,
OpenFilePOptions.newBuilder().setReadType(readType.toProto()).build());
// Positioned reads trigger async caching after reading and do not need to wait for a close
// or a block boundary to be crossed.
URIStatus status = mFileSystem.getStatus(uri);
Assert.assertEquals(0, status.getInAlluxioPercentage());
is.positionedRead(BLOCK_SIZE / 2, new byte[1], 0, 1);
if (readType.isCache()) {
CommonUtils.waitFor("First block to be cached.", () -> {
try {
URIStatus st = mFileSystem.getStatus(uri);
boolean achieved = true;
// Expect only first block to be cached, other blocks should be empty in Alluxio
for (int i = 0; i < st.getFileBlockInfos().size(); i++) {
FileBlockInfo info = st.getFileBlockInfos().get(i);
if (i == 0) {
achieved = achieved && !info.getBlockInfo().getLocations().isEmpty();
} else {
achieved = achieved && info.getBlockInfo().getLocations().isEmpty();
}
}
return achieved;
} catch (Exception e) {
return false;
}
});
} else {
Thread.sleep(1000);
status = mFileSystem.getStatus(uri);
Assert.assertEquals(0, status.getInAlluxioPercentage());
}
is.close();
}
}

@Test
public void syncCacheFirstBlock() throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dbw9580 do you know if this test should still be relevant? I figure it's the getFileBlockInfos() is somehow a deprecated API? But we should still fix this one?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it basically requires the first portion of the file is cached when the length of the read is larger than the size of a block. The requirement still applies to page store, but it may not be able to report block level information. I suggest to use the cache usage API on worker's cache manager directly to verify the size of the file being cached. something like

CacheManager cm = worker.getCacheManager(); // <- this method may not exist yet
long usedBytes = cm.getUsage()
    .partitionedBy(file(uri))
    .map(CacheUsage::used)
    .orElse(0);
assertEquals(data.length, usedBytes)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove this test for now as it's still using Block API. We can add a new one.

String filename = mTestPath + "/file_" + MAX_LEN + "_" + mWriteUnderStore.hashCode();
AlluxioURI uri = new AlluxioURI(filename);

FileInStream is = mFileSystem.openFile(uri,
OpenFilePOptions.newBuilder().setReadType(ReadPType.CACHE).build());
URIStatus status = mFileSystem.getStatus(uri);
byte[] data = new byte[(int) status.getBlockSizeBytes() + 1];
is.read(data);
status = mFileSystem.getStatus(uri);
Assert.assertFalse(status.getFileBlockInfos().get(0).getBlockInfo().getLocations().isEmpty());
is.close();
}
}
Loading