-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use RemoteSegmentStoreDirectory instead of RemoteDirectory #4240
Changes from all commits
10b4954
aef0dfc
6681b2c
60d93c5
9e53228
5d30441
65300af
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -11,32 +11,54 @@ | |
import org.apache.logging.log4j.LogManager; | ||
import org.apache.logging.log4j.Logger; | ||
import org.apache.logging.log4j.message.ParameterizedMessage; | ||
import org.apache.lucene.codecs.CodecUtil; | ||
import org.apache.lucene.index.IndexFileNames; | ||
import org.apache.lucene.index.SegmentInfos; | ||
import org.apache.lucene.search.ReferenceManager; | ||
import org.apache.lucene.store.Directory; | ||
import org.apache.lucene.store.FilterDirectory; | ||
import org.apache.lucene.store.IOContext; | ||
import org.apache.lucene.store.IndexInput; | ||
import org.opensearch.common.concurrent.GatedCloseable; | ||
import org.opensearch.index.engine.EngineException; | ||
import org.opensearch.index.store.RemoteSegmentStoreDirectory; | ||
|
||
import java.io.IOException; | ||
import java.nio.file.NoSuchFileException; | ||
import java.util.Arrays; | ||
import java.util.HashSet; | ||
import java.util.Collection; | ||
import java.util.Comparator; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.atomic.AtomicBoolean; | ||
import java.util.stream.Collectors; | ||
|
||
/** | ||
* RefreshListener implementation to upload newly created segment files to the remote store | ||
* | ||
* @opensearch.internal | ||
*/ | ||
public class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener { | ||
public final class RemoteStoreRefreshListener implements ReferenceManager.RefreshListener { | ||
// Visible for testing | ||
static final Set<String> EXCLUDE_FILES = Set.of("write.lock"); | ||
// Visible for testing | ||
static final int LAST_N_METADATA_FILES_TO_KEEP = 10; | ||
|
||
private final IndexShard indexShard; | ||
private final Directory storeDirectory; | ||
private final Directory remoteDirectory; | ||
// ToDo: This can be a map with metadata of the uploaded file as value of the map (GitHub #3398) | ||
private final Set<String> filesUploadedToRemoteStore; | ||
private final RemoteSegmentStoreDirectory remoteDirectory; | ||
private final Map<String, String> localSegmentChecksumMap; | ||
private long primaryTerm; | ||
private static final Logger logger = LogManager.getLogger(RemoteStoreRefreshListener.class); | ||
|
||
public RemoteStoreRefreshListener(Directory storeDirectory, Directory remoteDirectory) throws IOException { | ||
this.storeDirectory = storeDirectory; | ||
this.remoteDirectory = remoteDirectory; | ||
// ToDo: Handle failures in reading list of files (GitHub #3397) | ||
this.filesUploadedToRemoteStore = new HashSet<>(Arrays.asList(remoteDirectory.listAll())); | ||
public RemoteStoreRefreshListener(IndexShard indexShard) { | ||
this.indexShard = indexShard; | ||
this.storeDirectory = indexShard.store().directory(); | ||
this.remoteDirectory = (RemoteSegmentStoreDirectory) ((FilterDirectory) ((FilterDirectory) indexShard.remoteStore().directory()) | ||
.getDelegate()).getDelegate(); | ||
this.primaryTerm = indexShard.getOperationPrimaryTerm(); | ||
localSegmentChecksumMap = new HashMap<>(); | ||
} | ||
|
||
@Override | ||
|
@@ -46,42 +68,112 @@ public void beforeRefresh() throws IOException { | |
|
||
/** | ||
* Upload new segment files created as part of the last refresh to the remote segment store. | ||
* The method also deletes segment files from remote store which are not part of local filesystem. | ||
* This method also uploads remote_segments_metadata file which contains metadata of each segment file uploaded. | ||
* @param didRefresh true if the refresh opened a new reference | ||
* @throws IOException in case of I/O error in reading list of local files | ||
*/ | ||
@Override | ||
public void afterRefresh(boolean didRefresh) throws IOException { | ||
if (didRefresh) { | ||
Set<String> localFiles = Set.of(storeDirectory.listAll()); | ||
localFiles.stream().filter(file -> !filesUploadedToRemoteStore.contains(file)).forEach(file -> { | ||
try { | ||
remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); | ||
filesUploadedToRemoteStore.add(file); | ||
} catch (NoSuchFileException e) { | ||
logger.info( | ||
() -> new ParameterizedMessage("The file {} does not exist anymore. It can happen in case of temp files", file), | ||
e | ||
); | ||
} catch (IOException e) { | ||
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) | ||
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); | ||
} | ||
}); | ||
public void afterRefresh(boolean didRefresh) { | ||
synchronized (this) { | ||
try { | ||
if (indexShard.shardRouting.primary()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens if index shard was demoted to replica from primary? Shouldn't There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Only primary should be uploading the segments. If replica is demoted, this check makes sure that we stop uploading segments from replica. Ideally, There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So in this case the usage of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the shard is demoted and promoted again, we need to make sure that we call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I will try to go step by step by step:
Does it make sense? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, this is definitely a bug. Thanks for catching it. I will fix it and will add tests around it as you suggested in another comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @reta I changed the implementation and added tests. Please take a look again. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks good, thanks! |
||
if (this.primaryTerm != indexShard.getOperationPrimaryTerm()) { | ||
this.primaryTerm = indexShard.getOperationPrimaryTerm(); | ||
this.remoteDirectory.init(); | ||
} | ||
try { | ||
String lastCommittedLocalSegmentFileName = SegmentInfos.getLastCommitSegmentsFileName(storeDirectory); | ||
if (!remoteDirectory.containsFile( | ||
lastCommittedLocalSegmentFileName, | ||
getChecksumOfLocalFile(lastCommittedLocalSegmentFileName) | ||
)) { | ||
deleteStaleCommits(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would clean up stall refresh ? Can we do this in the background? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Clean up does not happen at each refresh. It would only be called when refresh is called during flush (as new segments_N file gets created). We can definitely preform stale commit deletion in the background. I have created a tracking issue here: #4315 |
||
} | ||
try (GatedCloseable<SegmentInfos> segmentInfosGatedCloseable = indexShard.getSegmentInfosSnapshot()) { | ||
SegmentInfos segmentInfos = segmentInfosGatedCloseable.get(); | ||
Collection<String> refreshedLocalFiles = segmentInfos.files(true); | ||
|
||
List<String> segmentInfosFiles = refreshedLocalFiles.stream() | ||
.filter(file -> file.startsWith(IndexFileNames.SEGMENTS)) | ||
.collect(Collectors.toList()); | ||
Optional<String> latestSegmentInfos = segmentInfosFiles.stream() | ||
.max(Comparator.comparingLong(IndexFileNames::parseGeneration)); | ||
|
||
Set<String> remoteFilesToBeDeleted = new HashSet<>(); | ||
// ToDo: Instead of deleting files in sync, mark them and delete in async/periodic flow (GitHub #3142) | ||
filesUploadedToRemoteStore.stream().filter(file -> !localFiles.contains(file)).forEach(file -> { | ||
try { | ||
remoteDirectory.deleteFile(file); | ||
remoteFilesToBeDeleted.add(file); | ||
} catch (IOException e) { | ||
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) | ||
logger.warn(() -> new ParameterizedMessage("Exception while deleting file {} from the remote segment store", file), e); | ||
if (latestSegmentInfos.isPresent()) { | ||
refreshedLocalFiles.addAll(SegmentInfos.readCommit(storeDirectory, latestSegmentInfos.get()).files(true)); | ||
segmentInfosFiles.stream() | ||
.filter(file -> !file.equals(latestSegmentInfos.get())) | ||
.forEach(refreshedLocalFiles::remove); | ||
|
||
boolean uploadStatus = uploadNewSegments(refreshedLocalFiles); | ||
if (uploadStatus) { | ||
remoteDirectory.uploadMetadata( | ||
refreshedLocalFiles, | ||
storeDirectory, | ||
indexShard.getOperationPrimaryTerm(), | ||
segmentInfos.getGeneration() | ||
); | ||
localSegmentChecksumMap.keySet() | ||
.stream() | ||
.filter(file -> !refreshedLocalFiles.contains(file)) | ||
.collect(Collectors.toSet()) | ||
.forEach(localSegmentChecksumMap::remove); | ||
} | ||
} | ||
} catch (EngineException e) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since we swallow all exceptions, what happens if shard was promoted to primary but segments were not refreshed due to some failures? Should we go back to replica mode? Or should we schedule the retry later on? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. With the current implementation, we retry segment upload in the next refresh. If the upload fails, we don't want to demote primary as the failure could be due to issues with remote segment store itself. |
||
logger.warn("Exception while reading SegmentInfosSnapshot", e); | ||
} | ||
} catch (IOException e) { | ||
// We don't want to fail refresh if upload of new segments fails. The missed segments will be re-tried | ||
// in the next refresh. This should not affect durability of the indexed data after remote trans-log integration. | ||
logger.warn("Exception while uploading new segments to the remote segment store", e); | ||
} | ||
} | ||
}); | ||
} catch (Throwable t) { | ||
logger.error("Exception in RemoteStoreRefreshListener.afterRefresh()", t); | ||
} | ||
} | ||
} | ||
|
||
// Visible for testing | ||
boolean uploadNewSegments(Collection<String> localFiles) throws IOException { | ||
AtomicBoolean uploadSuccess = new AtomicBoolean(true); | ||
localFiles.stream().filter(file -> !EXCLUDE_FILES.contains(file)).filter(file -> { | ||
try { | ||
return !remoteDirectory.containsFile(file, getChecksumOfLocalFile(file)); | ||
} catch (IOException e) { | ||
logger.info( | ||
"Exception while reading checksum of local segment file: {}, ignoring the exception and re-uploading the file", | ||
file | ||
); | ||
return true; | ||
} | ||
}).forEach(file -> { | ||
try { | ||
remoteDirectory.copyFrom(storeDirectory, file, file, IOContext.DEFAULT); | ||
} catch (IOException e) { | ||
uploadSuccess.set(false); | ||
// ToDO: Handle transient and permanent un-availability of the remote store (GitHub #3397) | ||
logger.warn(() -> new ParameterizedMessage("Exception while uploading file {} to the remote segment store", file), e); | ||
} | ||
}); | ||
return uploadSuccess.get(); | ||
Comment on lines
+138
to
+159
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the upload is happening serially, can we either bulk upload or do concurrent async uploads |
||
} | ||
|
||
private String getChecksumOfLocalFile(String file) throws IOException { | ||
if (!localSegmentChecksumMap.containsKey(file)) { | ||
try (IndexInput indexInput = storeDirectory.openInput(file, IOContext.DEFAULT)) { | ||
String checksum = Long.toString(CodecUtil.retrieveChecksum(indexInput)); | ||
localSegmentChecksumMap.put(file, checksum); | ||
} | ||
} | ||
return localSegmentChecksumMap.get(file); | ||
} | ||
|
||
remoteFilesToBeDeleted.forEach(filesUploadedToRemoteStore::remove); | ||
private void deleteStaleCommits() { | ||
try { | ||
remoteDirectory.deleteStaleSegments(LAST_N_METADATA_FILES_TO_KEEP); | ||
} catch (IOException e) { | ||
logger.info("Exception while deleting stale commits from remote segment store, will retry delete post next commit", e); | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,27 +27,37 @@ public class RemoteIndexInput extends IndexInput { | |
|
||
private final InputStream inputStream; | ||
private final long size; | ||
private long filePointer; | ||
|
||
public RemoteIndexInput(String name, InputStream inputStream, long size) { | ||
super(name); | ||
this.inputStream = inputStream; | ||
this.size = size; | ||
this.filePointer = 0; | ||
} | ||
|
||
@Override | ||
public byte readByte() throws IOException { | ||
byte[] buffer = new byte[1]; | ||
inputStream.read(buffer); | ||
int numberOfBytesRead = inputStream.read(buffer); | ||
if (numberOfBytesRead != -1) { | ||
filePointer += numberOfBytesRead; | ||
} | ||
return buffer[0]; | ||
} | ||
|
||
@Override | ||
public void readBytes(byte[] b, int offset, int len) throws IOException { | ||
int bytesRead = inputStream.read(b, offset, len); | ||
while (bytesRead > 0 && bytesRead < len) { | ||
len -= bytesRead; | ||
offset += bytesRead; | ||
bytesRead = inputStream.read(b, offset, len); | ||
if (bytesRead == len) { | ||
filePointer += bytesRead; | ||
} else { | ||
while (bytesRead > 0 && bytesRead < len) { | ||
filePointer += bytesRead; | ||
len -= bytesRead; | ||
offset += bytesRead; | ||
bytesRead = inputStream.read(b, offset, len); | ||
} | ||
} | ||
} | ||
|
||
|
@@ -61,22 +71,25 @@ public long length() { | |
return size; | ||
} | ||
|
||
@Override | ||
public void seek(long pos) throws IOException { | ||
inputStream.skip(pos); | ||
} | ||
|
||
/** | ||
* Guaranteed to throw an exception and leave the RemoteIndexInput unmodified. | ||
* This method is not implemented as it is not used for the file transfer to/from the remote store. | ||
* | ||
* @throws UnsupportedOperationException always | ||
*/ | ||
@Override | ||
public long getFilePointer() { | ||
public void seek(long pos) throws IOException { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just curious, why There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The implementation was incorrect. This was pointed by @andrross earlier. seek() should provide functionality to go to the exact point provided by the |
||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
/** | ||
* Returns the current position in this file in terms of number of bytes read so far. | ||
*/ | ||
@Override | ||
public long getFilePointer() { | ||
return filePointer; | ||
} | ||
|
||
/** | ||
* Guaranteed to throw an exception and leave the RemoteIndexInput unmodified. | ||
* This method is not implemented as it is not used for the file transfer to/from the remote store. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would have preferred individual dependencies as supplier rather than a full
IndexShard
dependency, to protect against misuseThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I thought about that but there are different IndexShard methods (
getOperationPrimaryTerm
,getSegmentInfosSnapshot
) that are getting called in theafterRefresh()
flow.Created #4316 to track this.