Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Store] Add RemoteSegmentStoreDirectory to interact with remote segment store #4020

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*
* @opensearch.internal
*/
public final class RemoteDirectory extends Directory {
public class RemoteDirectory extends Directory {

private final BlobContainer blobContainer;

Expand All @@ -50,6 +50,16 @@ public String[] listAll() throws IOException {
return blobContainer.listBlobs().keySet().stream().sorted().toArray(String[]::new);
}

/**
* Returns names of files with given prefix in this directory.
* @param filenamePrefix The prefix to match against file names in the directory
* @return A list of the matching filenames in the directory
* @throws IOException if there were any failures in reading from the blob container
*/
public Collection<String> listFilesByPrefix(String filenamePrefix) throws IOException {
return blobContainer.listBlobsByPrefix(filenamePrefix).keySet();
}

/**
* Removes an existing file in the directory.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,372 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.UUIDs;

import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* A RemoteDirectory extension for remote segment store. We need to make sure we don't overwrite a segment file once uploaded.
* In order to prevent segment overwrite which can occur due to two primary nodes for the same shard at the same time,
* a unique suffix is added to the uploaded segment file. This class keeps track of filename of segments stored
* in remote segment store vs filename in local filesystem and provides the consistent Directory interface so that
* caller will be accessing segment files in the same way as {@code FSDirectory}. Apart from storing actual segment files,
* remote segment store also keeps track of refresh checkpoints as metadata in a separate path which is handled by
* another instance of {@code RemoteDirectory}.
* @opensearch.internal
*/
public final class RemoteSegmentStoreDirectory extends FilterDirectory {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given there are 2 directories where the calls can be delegated, what value addition does FilterDirectory provides us here? Could we have extended Directory directly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extending directory would require most of the abstract methods to be re-implemented where we will be delegating it mostly to the data directory.

If you check RemoteDirectory class, most of the methods are throwing unsupported exception. Once we have the implementation for those methods and we think some pre-processing is required before delegating, we need to implement them in this class as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quoting from FilterDirectory javadoc:

if you plan to write your own Directory implementation, you should consider extending directly Directory or BaseDirectory rather than try to reuse functionality of existing Directorys by extending this class.

Based on this, we should extend FilterDirectory only if we plan to use the Directory implementation. But in this case, we're not really using Directory implementation. Again, any benefit we're getting here by extending FilterDirectory?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By extending FilterDirectory, we don't have to implement all the methods that are present in the Directory. We delegate most of the methods directly to remoteDataDirectory. For the methods that need pre/post processing, they are implemented in this class. The first line of javadoc of FilterDirectory says:

Directory implementation that delegates calls to another directory.

This is exactly what we want.

/**
* Each segment file is uploaded with unique suffix.
* For example, _0.cfe in local filesystem will be uploaded to remote segment store as _0.cfe__gX7bNIIBrs0AUNsR2yEG
*/
public static final String SEGMENT_NAME_UUID_SEPARATOR = "__";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the segment name or UUID also contain __ ? Just wondering if we can run into corner cases where the segment name translation can break.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UUID should not contain special character other than -. As per lucene file format explained here: https://lucene.apache.org/core/3_0_3/fileformats.html#File%20Naming, __ should not be included in the segment name. But I agree with the concern. Not sure which delimiter can be used which will guarantee that it will not be used in the segment name.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Should be fine then. First occurrence is always guaranteed to be the separator.


public static final MetadataFilenameUtils.MetadataFilenameComparator METADATA_FILENAME_COMPARATOR =
new MetadataFilenameUtils.MetadataFilenameComparator();

/**
* remoteDataDirectory is used to store segment files at path: cluster_UUID/index_UUID/shardId/segments/data
*/
private final RemoteDirectory remoteDataDirectory;
/**
* remoteMetadataDirectory is used to store metadata files at path: cluster_UUID/index_UUID/shardId/segments/metadata
*/
private final RemoteDirectory remoteMetadataDirectory;

/**
* To prevent explosion of refresh metadata files, we replace refresh files for the given primary term and generation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean collision here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Idea is to keep count of metadata files in check as we will be calling listAll on metadataDirectory in order to get the list of segments.
Each commit will be followed by many refreshes. We don't want to add one metadata file per refresh. We will just replace the previous one. This works as we still have unique filenames for commit and scales as commits are not as frequent as refreshes.

* This is achieved by uploading refresh metadata file with the same UUID suffix.
*/
private String metadataFileUniqueSuffix;
Comment on lines +61 to +65
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make refresh metadata immutable as well. Are there remote cases where an earlier refresh metadata might override a most recent refresh metadata based on thread scheduling unless the metadata is strictly serialized.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the current implementation, there won't be any conflicts as uploads are part of refresh and they are in sync.
But the scenario you mentioned can happen when we start perf optimizations by making segment upload async.

My only concern is huge number of files getting created and their clean-up. Either we clean-up stale files as a part of commit, or delete stale files per refresh (keeping last N files).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller method is in the synchronized block: https://github.com/sachinpkale/OpenSearch/blob/remote_segment_failure_handling/server/src/main/java/org/opensearch/index/shard/RemoteStoreRefreshListener.java#L80

This will make sure that only one upload is in progress at one time.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks while it helps but it doesn't give a strong guarantee that there wouldn't be multiple callers. What might help is having the synchronisation in the upload metadata part of the directory

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Made changes in the latest commit. Thanks @Bukhtawar


/**
* Keeps track of local segment filename to uploaded filename along with other attributes like checksum.
* This map acts as a cache layer for uploaded segment filenames which helps avoid calling listAll() each time.
* It is important to initialize this map on creation of RemoteSegmentStoreDirectory and update it on each upload and delete.
*/
private Map<String, UploadedSegmentMetadata> segmentsUploadedToRemoteStore;
Comment on lines +67 to +72
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we model tracking uploads separately as a part of an upload listener. An upload listener can in future be leverage to track upload metrics as well

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, can be done. I will create a separate issue to track the same.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracking issue: #4198


private static final Logger logger = LogManager.getLogger(RemoteSegmentStoreDirectory.class);

public RemoteSegmentStoreDirectory(RemoteDirectory remoteDataDirectory, RemoteDirectory remoteMetadataDirectory) throws IOException {
Comment on lines +55 to +76
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only concern I see with having two RemoteDirectory instances is they can fundamentally be two separate stores altogether with varying consistency properties. Do we really need them to be exposed separately. Can we not keep the metadata repository internal to this class, to avoid possible misuse of this class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remoteMetadataDirectory will be internal to this class. Callers of RemoteSegmentStoreDirectory will not be aware of the existence of two different remote directories.

On that part of creating an instance of RemoteSegmentStoreDirectory, it will be handled by the factory. The implementation of factory method will be part of subsequent PR but you can find the code in this commit: https://github.com/sachinpkale/OpenSearch/blob/remote_segment_failure_handling/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryFactory.java

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure but the uploadMetadata is still public

 public void uploadMetadata(Collection<String> segmentFiles, Directory storeDirectory, long primaryTerm, long generation)

Copy link
Member Author

@sachinpkale sachinpkale Aug 8, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think you are suggesting to make uploadMetadata private. That is that plan but I haven't made that change as not sure on how to decide the trigger for metadata upload.
Today, it will be called by RemoteStoreRefereshListener as RemoteStoreRefreshListener knows when the refresh is happening.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bukhtawar Created a tracking issue for the same: #4171

super(remoteDataDirectory);
this.remoteDataDirectory = remoteDataDirectory;
this.remoteMetadataDirectory = remoteMetadataDirectory;
init();
}

/**
* Initializes the cache which keeps track of all the segment files uploaded to the remote segment store.
* As this cache is specific to an instance of RemoteSegmentStoreDirectory, it is possible that cache becomes stale
* if another instance of RemoteSegmentStoreDirectory is used to upload/delete segment files.
* It is caller's responsibility to call init() again to ensure that cache is properly updated.
* @throws IOException if there were any failures in reading the metadata file
*/
public void init() throws IOException {
this.metadataFileUniqueSuffix = UUIDs.base64UUID();
this.segmentsUploadedToRemoteStore = new ConcurrentHashMap<>(readLatestMetadataFile());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We haven't really uploaded any segments to remote store at this point right? or does this handle the cases where replica gets promoted to primary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, failover or node restart.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sachinpkale it seems like there is no cleanup mechanism in place: if init() is called for initialized bu stale cache, the stale files have to be cleaned up (using previous metadataFileUniqueSuffix).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reta We always read latest metadata file. So, even if there are more than 1 metadata files, it won't impact the correctness.
But you are right on cleanup part. We need cleanup of stale metadata files in order to keep performance of init() in check. I will be raising a follow-up PR which will contain clean-up logic. Implementation is part of another branch: sachinpkale@b0d95c9#diff-f57628264c235a133d2d6db458404e07a43f18026e40ed5e956536c03add9925R273

}

/**
* Read the latest metadata file to get the list of segments uploaded to the remote segment store.
* We upload a metadata file per refresh, but it is not unique per refresh. Refresh metadata file is unique for a given commit.
* The format of refresh metadata filename is: refresh_metadata__PrimaryTerm__Generation__UUID
* Refresh metadata files keep track of active segments for the shard at the time of refresh.
* In order to get the list of segment files uploaded to the remote segment store, we need to read the latest metadata file.
* Each metadata file contains a map where
* Key is - Segment local filename and
* Value is - local filename::uploaded filename::checksum
* @return Map of segment filename to uploaded filename with checksum
* @throws IOException if there were any failures in reading the metadata file
*/
private Map<String, UploadedSegmentMetadata> readLatestMetadataFile() throws IOException {
Map<String, UploadedSegmentMetadata> segmentMetadataMap = new HashMap<>();

Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX);
Optional<String> latestMetadataFile = metadataFiles.stream().max(METADATA_FILENAME_COMPARATOR);

if (latestMetadataFile.isPresent()) {
logger.info("Reading latest Metadata file {}", latestMetadataFile.get());
segmentMetadataMap = readMetadataFile(latestMetadataFile.get());
} else {
logger.info("No metadata file found, this can happen for new index with no data uploaded to remote segment store");
}

return segmentMetadataMap;
}

private Map<String, UploadedSegmentMetadata> readMetadataFile(String metadataFilename) throws IOException {
try (IndexInput indexInput = remoteMetadataDirectory.openInput(metadataFilename, IOContext.DEFAULT)) {
Map<String, String> segmentMetadata = indexInput.readMapOfStrings();
return segmentMetadata.entrySet()
.stream()
.collect(Collectors.toMap(Map.Entry::getKey, entry -> UploadedSegmentMetadata.fromString(entry.getValue())));
}
}

/**
* Metadata of a segment that is uploaded to remote segment store.
*/
static class UploadedSegmentMetadata {
private static final String SEPARATOR = "::";
private final String originalFilename;
private final String uploadedFilename;
private final String checksum;

UploadedSegmentMetadata(String originalFilename, String uploadedFilename, String checksum) {
this.originalFilename = originalFilename;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add assertions here to be sure that these are valid values?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UploadedSegmentMetadata is a POJO which contains 3 strings. IMO, It doesn't/shouldn't do any validation.

this.uploadedFilename = uploadedFilename;
this.checksum = checksum;
}

@Override
public String toString() {
return String.join(SEPARATOR, originalFilename, uploadedFilename, checksum);
}

public static UploadedSegmentMetadata fromString(String uploadedFilename) {
String[] values = uploadedFilename.split(SEPARATOR);
return new UploadedSegmentMetadata(values[0], values[1], values[2]);
Copy link
Member

@ashking94 ashking94 Jul 29, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we handle exception use cases? Ideally a user should not be putting any other file than what the Opensearch remote store is putting in the concerned prefix. However, is there any behaviour if we see dubious files?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above comment. UploadedSegmentMetadata should not validate this. The caller, on the other hand, should.

}
}

/**
* Contains utility methods that provide various parts of metadata filename along with comparator
* Each metadata filename is of format: PREFIX__PrimaryTerm__Generation__UUID
*/
static class MetadataFilenameUtils {
public static final String SEPARATOR = "__";
public static final String METADATA_PREFIX = "metadata";

/**
* Comparator to sort the metadata filenames. The order of sorting is: Primary Term, Generation, UUID
* Even though UUID sort does not provide any info on recency, it provides a consistent way to sort the filenames.
*/
static class MetadataFilenameComparator implements Comparator<String> {
@Override
public int compare(String first, String second) {
String[] firstTokens = first.split(SEPARATOR);
String[] secondTokens = second.split(SEPARATOR);
if (!firstTokens[0].equals(secondTokens[0])) {
return firstTokens[0].compareTo(secondTokens[0]);
}
long firstPrimaryTerm = getPrimaryTerm(firstTokens);
long secondPrimaryTerm = getPrimaryTerm(secondTokens);
if (firstPrimaryTerm != secondPrimaryTerm) {
return firstPrimaryTerm > secondPrimaryTerm ? 1 : -1;
} else {
long firstGeneration = getGeneration(firstTokens);
long secondGeneration = getGeneration(secondTokens);
if (firstGeneration != secondGeneration) {
return firstGeneration > secondGeneration ? 1 : -1;
} else {
return getUuid(firstTokens).compareTo(getUuid(secondTokens));
}
}
}
}

// Visible for testing
static String getMetadataFilename(long primaryTerm, long generation, String uuid) {
return String.join(
SEPARATOR,
METADATA_PREFIX,
Long.toString(primaryTerm),
Long.toString(generation, Character.MAX_RADIX),
uuid
);
}

// Visible for testing
static long getPrimaryTerm(String[] filenameTokens) {
return Long.parseLong(filenameTokens[1]);
}

// Visible for testing
static long getGeneration(String[] filenameTokens) {
return Long.parseLong(filenameTokens[2], Character.MAX_RADIX);
}

// Visible for testing
static String getUuid(String[] filenameTokens) {
return filenameTokens[3];
}
}

/**
* Returns list of all the segment files uploaded to remote segment store till the last refresh checkpoint.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the method only returns the latest segment files from the last known commit (remotely) and the incremental refreshed segment files. Seems a bit misleading from the description that Returns list of all the segment files uploaded to remote segment store till the last refresh checkpoint.which sounds like all segment files ever.

Do we foresee where all we intend to use this method?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commit metadata contains all the segment files till that point.

This method will be used in restore flow where we get the list of segment files from remote store and use it to copy to local directory.

* Any segment file that is uploaded without corresponding metadata file will not be visible as part of listAll().
* We chose not to return cache entries for listAll as cache can have entries for stale segments as well.
* Even if we plan to delete stale segments from remote segment store, it will be a periodic operation.
* @return segment filenames stored in remote segment store
* @throws IOException if there were any failures in reading the metadata file
*/
@Override
public String[] listAll() throws IOException {
return readLatestMetadataFile().keySet().toArray(new String[0]);
}

/**
* Delete segment file from remote segment store.
* @param name the name of an existing segment file in local filesystem.
* @throws IOException if the file exists but could not be deleted.
*/
@Override
public void deleteFile(String name) throws IOException {
String remoteFilename = getExistingRemoteFilename(name);
if (remoteFilename != null) {
remoteDataDirectory.deleteFile(remoteFilename);
segmentsUploadedToRemoteStore.remove(name);
}
}
Comment on lines +239 to +246
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One rare race could be we delete file and segmentsUploadedToRemoteStore hasn't been updated and then a concurrent upload happens and we check for containsFile where the stale entry that the file is uploaded is present, will cause the new segment file to get skipped from being uploaded.
Can you confirm if there is a valid scenario that might hit this race?

Copy link
Member Author

@sachinpkale sachinpkale Aug 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think of a scenario where this should happen due to following reasons:

  1. Segment file with the same name is not created, even after segments are deleted as part of merge, new segment files will always be prefixed with next generation number.
  2. Even if segment file with the same name is re-created (this contradicts with point number 1), we make sure each segment file is uploaded with the unique suffix.


/**
* Returns the byte length of a segment file in the remote segment store.
* @param name the name of an existing segment file in local filesystem.
* @throws IOException in case of I/O error
* @throws NoSuchFileException if the file does not exist in the cache or remote segment store
*/
@Override
public long fileLength(String name) throws IOException {
String remoteFilename = getExistingRemoteFilename(name);
if (remoteFilename != null) {
return remoteDataDirectory.fileLength(remoteFilename);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remoteDataDirectory.fileLength throws NoSuchFileException(name). You might want to handle the exception here and not return the internal filename outside?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are more occurrences where this would be applicable. Pls look into those as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we are not exposing the internal filename, right? name is passed to this method.

} else {
throw new NoSuchFileException(name);
}
}

/**
* Creates and returns a new instance of {@link RemoteIndexOutput} which will be used to copy files to the remote
* segment store.
* @param name the name of the file to create.
* @throws IOException in case of I/O error
*/
@Override
public IndexOutput createOutput(String name, IOContext context) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this is not really throwing an IOException nor any other exception. We can remove throws part where we are not really throwing exceptions.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remoteDataDirectory.createOutput throws IOException.

return remoteDataDirectory.createOutput(getNewRemoteSegmentFilename(name), context);
}

/**
* Opens a stream for reading an existing file and returns {@link RemoteIndexInput} enclosing the stream.
* @param name the name of an existing file.
* @throws IOException in case of I/O error
* @throws NoSuchFileException if the file does not exist either in cache or remote segment store
*/
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
String remoteFilename = getExistingRemoteFilename(name);
if (remoteFilename != null) {
return remoteDataDirectory.openInput(remoteFilename, context);
} else {
throw new NoSuchFileException(name);
}
}

/**
* Copies an existing src file from directory from to a non-existent file dest in this directory.
* Once the segment is uploaded to remote segment store, update the cache accordingly.
*/
@Override
public void copyFrom(Directory from, String src, String dest, IOContext context) throws IOException {
String remoteFilename = getNewRemoteSegmentFilename(dest);
remoteDataDirectory.copyFrom(from, src, remoteFilename, context);
String checksum = getChecksumOfLocalFile(from, src);
UploadedSegmentMetadata segmentMetadata = new UploadedSegmentMetadata(src, remoteFilename, checksum);
segmentsUploadedToRemoteStore.put(src, segmentMetadata);
}
Comment on lines +295 to +302
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we need to pass the computed checksum to remote store when we upload too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do that as part of metadata upload. Each line in metadata file would be: original filename, uploaded filename, checksum


/**
* Checks if the file exists in the uploadedSegments cache and the checksum matches.
* It is important to match the checksum as the same segment filename can be used for different
* segments due to a concurrency issue.
* @param localFilename filename of segment stored in local filesystem
* @param checksum checksum of the segment file
* @return true if file exists in cache and checksum matches.
*/
public boolean containsFile(String localFilename, String checksum) {
return segmentsUploadedToRemoteStore.containsKey(localFilename)
&& segmentsUploadedToRemoteStore.get(localFilename).checksum.equals(checksum);
}

/**
* Upload metadata file
* @param segmentFiles segment files that are part of the shard at the time of the latest refresh
* @param storeDirectory instance of local directory to temporarily create metadata file before upload
* @param primaryTerm primary term to be used in the name of metadata file
* @param generation commit generation
* @throws IOException in case of I/O error while uploading the metadata file
*/
public void uploadMetadata(Collection<String> segmentFiles, Directory storeDirectory, long primaryTerm, long generation)
throws IOException {
synchronized (this) {
String metadataFilename = MetadataFilenameUtils.getMetadataFilename(primaryTerm, generation, this.metadataFileUniqueSuffix);
IndexOutput indexOutput = storeDirectory.createOutput(metadataFilename, IOContext.DEFAULT);
Map<String, String> uploadedSegments = new HashMap<>();
for (String file : segmentFiles) {
if (segmentsUploadedToRemoteStore.containsKey(file)) {
uploadedSegments.put(file, segmentsUploadedToRemoteStore.get(file).toString());
} else {
throw new NoSuchFileException(file);
}
}
indexOutput.writeMapOfStrings(uploadedSegments);
indexOutput.close();
storeDirectory.sync(Collections.singleton(metadataFilename));
remoteMetadataDirectory.copyFrom(storeDirectory, metadataFilename, metadataFilename, IOContext.DEFAULT);
storeDirectory.deleteFile(metadataFilename);
}
}

private String getChecksumOfLocalFile(Directory directory, String file) throws IOException {
try (IndexInput indexInput = directory.openInput(file, IOContext.DEFAULT)) {
return Long.toString(CodecUtil.retrieveChecksum(indexInput));
}
}

private String getExistingRemoteFilename(String localFilename) {
if (segmentsUploadedToRemoteStore.containsKey(localFilename)) {
return segmentsUploadedToRemoteStore.get(localFilename).uploadedFilename;
} else {
return null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we have a fallback to remote store here in the else part if the local cache for whatsoever reason does not contain the name in the map?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added this flow on purpose. This could result in a lot of load on the remote store itself.

}
}

private String getNewRemoteSegmentFilename(String localFilename) {
return localFilename + SEGMENT_NAME_UUID_SEPARATOR + UUIDs.base64UUID();
}

private String getLocalSegmentFilename(String remoteFilename) {
return remoteFilename.split(SEGMENT_NAME_UUID_SEPARATOR)[0];
}

// Visible for testing
Map<String, UploadedSegmentMetadata> getSegmentsUploadedToRemoteStore() {
return this.segmentsUploadedToRemoteStore;
}
}
Loading