Skip to content

Commit

Permalink
HDDS-11220. Initialize block length using the chunk list from DataNod…
Browse files Browse the repository at this point in the history
…e before seek (apache#7221)
  • Loading branch information
jojochuang authored Oct 28, 2024
1 parent 91d41a0 commit faf133d
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ public synchronized void initialize() throws IOException {
if (blockInfo != null && blockInfo.isUnderConstruction()) {
// use the block length from DN if block is under construction.
length = blockData.getSize();
LOG.debug("Updated block length to {} for block {}", length, blockID);
}
break;
// If we get a StorageContainerException or an IOException due to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
public class MultipartInputStream extends ExtendedInputStream {

private final String key;
private final long length;
private long length;

// List of PartInputStream, one for each part of the key
private final List<? extends PartInputStream> partStreams;
Expand All @@ -56,6 +56,8 @@ public class MultipartInputStream extends ExtendedInputStream {
// can be reset if a new position is seeked.
private int prevPartIndex;

private boolean initialized = false;

public MultipartInputStream(String keyName,
List<? extends PartInputStream> inputStreams) {

Expand Down Expand Up @@ -130,6 +132,9 @@ protected void checkPartBytesRead(int numBytesToRead, int numBytesRead,
@Override
public synchronized void seek(long pos) throws IOException {
checkOpen();
if (!initialized) {
initialize();
}
if (pos == 0 && length == 0) {
// It is possible for length and pos to be zero in which case
// seek should return instead of throwing exception
Expand Down Expand Up @@ -173,6 +178,26 @@ public synchronized void seek(long pos) throws IOException {
prevPartIndex = partIndex;
}

public synchronized void initialize() throws IOException {
// Pre-check that the stream has not been intialized already
if (initialized) {
return;
}

for (PartInputStream partInputStream : partStreams) {
if (partInputStream instanceof BlockInputStream) {
((BlockInputStream) partInputStream).initialize();
}
}

long streamLength = 0L;
for (PartInputStream partInputStream : partStreams) {
streamLength += partInputStream.getLength();
}
this.length = streamLength;
initialized = true;
}

@Override
public synchronized long getPos() throws IOException {
return length == 0 ? 0 :
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ public class TestHSync {
private static final int BLOCK_SIZE = 2 * MAX_FLUSH_SIZE;
private static final int SERVICE_INTERVAL = 100;
private static final int EXPIRE_THRESHOLD_MS = 140;
private static final int WAL_HEADER_LEN = 83;

private static OpenKeyCleanupService openKeyCleanupService;

Expand Down Expand Up @@ -417,6 +418,45 @@ private static String getChunkPathOnDataNode(FSDataOutputStream outputStream)
return chunkPath;
}

@Test
public void testHSyncSeek() throws Exception {
// Set the fs.defaultFS
final String rootPath = String.format("%s://%s.%s/",
OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName());
CONF.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);

final String dir = OZONE_ROOT + bucket.getVolumeName()
+ OZONE_URI_DELIMITER + bucket.getName();
final Path key1 = new Path(dir, "key-hsync-seek");

final byte[] data = new byte[1024];
final byte[] buffer = new byte[1024];
ThreadLocalRandom.current().nextBytes(data);

try (FileSystem fs = FileSystem.get(CONF)) {
// Create key1
try (FSDataOutputStream os = fs.create(key1, true)) {
os.write(data, 0, WAL_HEADER_LEN);
// the first hsync will update the correct length in the key info at OM
os.hsync();
os.write(data, 0, data.length);
os.hsync(); // the second hsync will not update the length at OM
try (FSDataInputStream in = fs.open(key1)) {
// the actual key length is WAL_HEADER_LEN + 1024, but the length in OM is WAL_HEADER_LEN (83)
in.seek(WAL_HEADER_LEN + 1);
final int n = in.read(buffer, 1, buffer.length - 1);
// expect to read 1023 bytes
assertEquals(buffer.length - 1, n);
for (int i = 1; i < buffer.length; i++) {
assertEquals(data[i], buffer[i], "expected at i=" + i);
}
}
} finally {
fs.delete(key1, false);
}
}
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testO3fsHSync(boolean incrementalChunkList) throws Exception {
Expand Down

0 comments on commit faf133d

Please sign in to comment.