Skip to content

Commit

Permalink
Use READ IOContext for all non Segment* files when copying node-node
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
  • Loading branch information
mch2 committed Sep 12, 2024
1 parent 1db75a0 commit 9df169a
Showing 1 changed file with 25 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.util.ArrayUtil;
Expand Down Expand Up @@ -104,13 +105,19 @@ public MultiChunkTransfer<StoreFileMetadata, FileChunk> createTransfer(
protected void onNewResource(StoreFileMetadata md) throws IOException {
offset = 0;
IOUtils.close(currentInput, () -> currentInput = null);
final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE);
currentInput = new InputStreamIndexInput(indexInput, md.length()) {
@Override
public void close() throws IOException {
IOUtils.close(indexInput, super::close); // InputStreamIndexInput's close is a noop
}
};
// Open all files other than Segments* using IOContext.READ.
// With Lucene9_12 a READONCE context will confine the underlying IndexInput (MemorySegmentIndexInput) to a single thread.
// Segments* files require IOContext.READONCE
// https://github.com/apache/lucene/blob/b2d3a2b37e00f19a74949097736be8fd64745f61/lucene/test-framework/src/java/org/apache/lucene/tests/store/MockDirectoryWrapper.java#L817
if (md.name().startsWith(IndexFileNames.SEGMENTS) == false) {
final IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READ);
currentInput = new InputStreamIndexInput(indexInput, md.length()) {
@Override
public void close() throws IOException {
IOUtils.close(indexInput, super::close); // InputStreamIndexInput's close is a noop
}
};
}
}

private byte[] acquireBuffer() {
Expand All @@ -126,7 +133,7 @@ protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException {
assert Transports.assertNotTransportThread("read file chunk");
cancellableThreads.checkForCancel();
final byte[] buffer = acquireBuffer();
final int bytesRead = currentInput.read(buffer);
final int bytesRead = readBytes(md, buffer);
if (bytesRead == -1) {
throw new CorruptIndexException("file truncated; length=" + md.length() + " offset=" + offset, md.name());
}
Expand All @@ -142,6 +149,16 @@ protected FileChunk nextChunkRequest(StoreFileMetadata md) throws IOException {
return chunk;
}

private int readBytes(StoreFileMetadata md, byte[] buffer) throws IOException {
// if we don't have a currentInput by now open once to create the chunk.
if (currentInput == null) {
try (IndexInput indexInput = store.directory().openInput(md.name(), IOContext.READONCE)) {
return new InputStreamIndexInput(indexInput, md.length()).read(buffer);
}
}
return currentInput.read(buffer);
}

@Override
protected void executeChunkRequest(FileChunk request, ActionListener<Void> listener1) {
cancellableThreads.checkForCancel();
Expand Down

0 comments on commit 9df169a

Please sign in to comment.