Skip to content

Commit

Permalink
[Searchable Snapshot] Fix bug of Searchable Snapshot Dependency on re…
Browse files Browse the repository at this point in the history
…pository chunk_size (#12277)

* implement logic of fetching blocks from multiple chunks of snapshot file.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Refactor and address comments.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* apply spotless check

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Address comments of using a different data structure to fetch blob parts.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* remove unnecessary code.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Refactor outputstream usage.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* refactor blobpart logic into a separate method and add unit tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Add new unit tests.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

---------

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 committed Feb 27, 2024
1 parent 7921175 commit 3125b94
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ protected Settings.Builder randomRepositorySettings() {
return settings;
}

private Settings.Builder chunkedRepositorySettings() {
private Settings.Builder chunkedRepositorySettings(long chunkSize) {
final Settings.Builder settings = Settings.builder();
settings.put("location", randomRepoPath()).put("compress", randomBoolean());
settings.put("chunk_size", 2 << 23, ByteSizeUnit.BYTES);
settings.put("chunk_size", chunkSize, ByteSizeUnit.BYTES);
return settings;
}

Expand Down Expand Up @@ -194,18 +194,44 @@ public void testSnapshottingSearchableSnapshots() throws Exception {
}

/**
* Tests a chunked repository scenario for searchable snapshots by creating an index,
* Tests a default 8mib chunked repository scenario for searchable snapshots by creating an index,
* taking a snapshot, restoring it as a searchable snapshot index.
*/
public void testCreateSearchableSnapshotWithChunks() throws Exception {
public void testCreateSearchableSnapshotWithDefaultChunks() throws Exception {
final int numReplicasIndex = randomIntBetween(1, 4);
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final String repoName = "test-repo";
final String snapshotName = "test-snap";
final Client client = client();

Settings.Builder repositorySettings = chunkedRepositorySettings();
Settings.Builder repositorySettings = chunkedRepositorySettings(2 << 23);

internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, 1000, indexName);
createRepositoryWithSettings(repositorySettings, repoName);
takeSnapshot(client, snapshotName, repoName, indexName);

deleteIndicesAndEnsureGreen(client, indexName);
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
assertRemoteSnapshotIndexSettings(client, restoredIndexName);

assertDocCount(restoredIndexName, 1000L);
}

/**
* Tests a small 1000 bytes chunked repository scenario for searchable snapshots by creating an index,
* taking a snapshot, restoring it as a searchable snapshot index.
*/
public void testCreateSearchableSnapshotWithSmallChunks() throws Exception {
final int numReplicasIndex = randomIntBetween(1, 4);
final String indexName = "test-idx";
final String restoredIndexName = indexName + "-copy";
final String repoName = "test-repo";
final String snapshotName = "test-snap";
final Client client = client();

Settings.Builder repositorySettings = chunkedRepositorySettings(1000);

internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1);
createIndexWithDocsAndEnsureGreen(numReplicasIndex, 1000, indexName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.opensearch.index.store.remote.utils.TransferManager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* This is an implementation of {@link OnDemandBlockIndexInput} where this class provides the main IndexInput using shard snapshot files.
Expand Down Expand Up @@ -136,25 +138,45 @@ protected IndexInput fetchBlock(int blockId) throws IOException {
final long blockStart = getBlockStart(blockId);
final long blockEnd = blockStart + getActualBlockSize(blockId);

// If the snapshot file is chunked, we must account for this by
// choosing the appropriate file part and updating the position
// accordingly.
final int part = (int) (blockStart / partSize);
final long partStart = part * partSize;

final long position = blockStart - partStart;
final long length = blockEnd - blockStart;

// Block may be present on multiple chunks of a file, so we need
// to fetch each chunk/blob part separately to fetch an entire block.
BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder()
.position(position)
.length(length)
.blobName(fileInfo.partName(part))
.blobParts(getBlobParts(blockStart, blockEnd))
.directory(directory)
.fileName(blockFileName)
.build();
return transferManager.fetchBlob(blobFetchRequest);
}

/**
* Returns list of blob parts/chunks in a file for a given block.
*/
protected List<BlobFetchRequest.BlobPart> getBlobParts(long blockStart, long blockEnd) {
// If the snapshot file is chunked, we must account for this by
// choosing the appropriate file part and updating the position
// accordingly.
int partNum = (int) (blockStart / partSize);
long pos = blockStart;
long diff = (blockEnd - blockStart);

List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
while (diff > 0) {
long partStart = pos % partSize;
long partEnd;
if ((partStart + diff) > partSize) {
partEnd = partSize;
} else {
partEnd = (partStart + diff);
}
long fetchBytes = partEnd - partStart;
blobParts.add(new BlobFetchRequest.BlobPart(fileInfo.partName(partNum), partStart, fetchBytes));
partNum++;
pos = pos + fetchBytes;
diff = (blockEnd - pos);
}
return blobParts;
}

@Override
public OnDemandBlockSnapshotIndexInput clone() {
OnDemandBlockSnapshotIndexInput clone = buildSlice("clone", 0L, this.length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.store.FSDirectory;

import java.nio.file.Path;
import java.util.List;

/**
* The specification to fetch specific block from blob store
Expand All @@ -20,37 +21,22 @@
*/
public class BlobFetchRequest {

private final long position;

private final long length;

private final String blobName;

private final Path filePath;

private final Directory directory;

private final String fileName;

private final List<BlobPart> blobParts;

private final long blobLength;

private BlobFetchRequest(Builder builder) {
this.position = builder.position;
this.length = builder.length;
this.blobName = builder.blobName;
this.fileName = builder.fileName;
this.filePath = builder.directory.getDirectory().resolve(fileName);
this.directory = builder.directory;
}

public long getPosition() {
return position;
}

public long getLength() {
return length;
}

public String getBlobName() {
return blobName;
this.blobParts = builder.blobParts;
this.blobLength = builder.blobParts.stream().mapToLong(o -> o.getLength()).sum();
}

public Path getFilePath() {
Expand All @@ -65,19 +51,23 @@ public String getFileName() {
return fileName;
}

public List<BlobPart> blobParts() {
return blobParts;
}

public long getBlobLength() {
return blobLength;
}

public static Builder builder() {
return new Builder();
}

@Override
public String toString() {
return "BlobFetchRequest{"
+ "position="
+ position
+ ", length="
+ length
+ ", blobName='"
+ blobName
+ "blobParts="
+ blobParts
+ '\''
+ ", filePath="
+ filePath
Expand All @@ -90,35 +80,45 @@ public String toString() {
}

/**
* Builder for BlobFetchRequest
* BlobPart represents a single chunk of a file
*/
public static final class Builder {
public static class BlobPart {
private String blobName;
private long position;
private long length;
private String blobName;
private FSDirectory directory;
private String fileName;

private Builder() {}

public Builder position(long position) {
this.position = position;
return this;
}

public Builder length(long length) {
public BlobPart(String blobName, long position, long length) {
this.blobName = blobName;
if (length <= 0) {
throw new IllegalArgumentException("Length for blob fetch request needs to be non-negative");
throw new IllegalArgumentException("Length for blob part fetch request needs to be non-negative");
}
this.length = length;
return this;
this.position = position;
}

public Builder blobName(String blobName) {
this.blobName = blobName;
return this;
public String getBlobName() {
return blobName;
}

public long getPosition() {
return position;
}

public long getLength() {
return length;
}
}

/**
* Builder for BlobFetchRequest
*/
public static final class Builder {
private List<BlobPart> blobParts;
private FSDirectory directory;
private String fileName;

private Builder() {}

public Builder directory(FSDirectory directory) {
this.directory = directory;
return this;
Expand All @@ -129,6 +129,11 @@ public Builder fileName(String fileName) {
return this;
}

public Builder blobParts(List<BlobPart> blobParts) {
this.blobParts = blobParts;
return this;
}

public BlobFetchRequest build() {
return new BlobFetchRequest(this);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ public TransferManager(final BlobContainer blobContainer, final FileCache fileCa
}

/**
* Given a blobFetchRequest, return it's corresponding IndexInput.
* Given a blobFetchRequestList, return it's corresponding IndexInput.
* @param blobFetchRequest to fetch
* @return future of IndexInput augmented with internal caching maintenance tasks
*/
public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException {

final Path key = blobFetchRequest.getFilePath();

final CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> {
Expand Down Expand Up @@ -85,15 +86,20 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobCo
try {
if (Files.exists(request.getFilePath()) == false) {
try (
InputStream snapshotFileInputStream = blobContainer.readBlob(
request.getBlobName(),
request.getPosition(),
request.getLength()
);
OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath());
OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream)
) {
snapshotFileInputStream.transferTo(localFileOutputStream);
for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) {
try (
InputStream snapshotFileInputStream = blobContainer.readBlob(
blobPart.getBlobName(),
blobPart.getPosition(),
blobPart.getLength()
);
) {
snapshotFileInputStream.transferTo(localFileOutputStream);
}
}
}
}
final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ);
Expand Down Expand Up @@ -153,7 +159,7 @@ public IndexInput getIndexInput() throws IOException {

@Override
public long length() {
return request.getLength();
return request.getBlobLength();
}

@Override
Expand Down
Loading

0 comments on commit 3125b94

Please sign in to comment.