Skip to content

Commit

Permalink
Add RemoteDirectory interface to copy segment files to/from remote store
Browse files Browse the repository at this point in the history
Signed-off-by: Sachin Kale <kalsac@amazon.com>
  • Loading branch information
Sachin Kale committed Apr 29, 2022
1 parent 52d2d82 commit 6c11fd6
Show file tree
Hide file tree
Showing 10 changed files with 493 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ public synchronized IndexShard createShard(
lock,
new StoreCloseListener(shardId, () -> eventListener.onStoreClosed(shardId))
);
if(this.indexSettings.isRemoteStoreEnabled()) {
if (this.indexSettings.isRemoteStoreEnabled()) {
Directory remoteDirectory = remoteDirectoryFactory.newDirectory(this.indexSettings, path, repositoriesService);
remoteStore = new Store(
shardId,
Expand Down
167 changes: 84 additions & 83 deletions server/src/main/java/org/opensearch/index/engine/InternalEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,6 @@ public class InternalEngine extends Engine {

private final ReplicationTracker replicationTracker;


public InternalEngine(EngineConfig engineConfig) {
this(engineConfig, IndexWriter.MAX_DOCS, LocalCheckpointTracker::new);
}
Expand Down Expand Up @@ -489,10 +488,10 @@ public int fillSeqNoGaps(long primaryTerm) throws IOException {
syncTranslog(); // to persist noops associated with the advancement of the local checkpoint
assert localCheckpointTracker.getPersistedCheckpoint() == maxSeqNo
: "persisted local checkpoint did not advance to max seq no; is ["
+ localCheckpointTracker.getPersistedCheckpoint()
+ "], max seq no ["
+ maxSeqNo
+ "]";
+ localCheckpointTracker.getPersistedCheckpoint()
+ "], max seq no ["
+ maxSeqNo
+ "]";
return numNoOpsAdded;
}
}
Expand Down Expand Up @@ -854,8 +853,8 @@ private VersionValue resolveDocVersion(final Operation op, boolean loadSeqNo) th
} else if (engineConfig.isEnableGcDeletes()
&& versionValue.isDelete()
&& (engineConfig.getThreadPool().relativeTimeInMillis() - ((DeleteVersionValue) versionValue).time) > getGcDeletesInMillis()) {
versionValue = null;
}
versionValue = null;
}
return versionValue;
}

Expand Down Expand Up @@ -1166,35 +1165,35 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
plan = IndexingStrategy.skipDueToVersionConflict(e, true, currentVersion);
} else if (index.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
&& (versionValue.seqNo != index.getIfSeqNo() || versionValue.term != index.getIfPrimaryTerm())) {
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
index.id(),
index.getIfSeqNo(),
index.getIfPrimaryTerm(),
versionValue.seqNo,
versionValue.term
);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else if (index.versionType().isVersionConflictForWrites(currentVersion, index.version(), currentNotFoundOrDeleted)) {
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
index,
currentVersion,
currentNotFoundOrDeleted
);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else {
final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs);
if (reserveError != null) {
plan = IndexingStrategy.failAsTooManyDocs(reserveError);
} else {
plan = IndexingStrategy.processNormally(
currentNotFoundOrDeleted,
canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version()),
reservingDocs
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
index.id(),
index.getIfSeqNo(),
index.getIfPrimaryTerm(),
versionValue.seqNo,
versionValue.term
);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else if (index.versionType().isVersionConflictForWrites(currentVersion, index.version(), currentNotFoundOrDeleted)) {
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
index,
currentVersion,
currentNotFoundOrDeleted
);
plan = IndexingStrategy.skipDueToVersionConflict(e, currentNotFoundOrDeleted, currentVersion);
} else {
final Exception reserveError = tryAcquireInFlightDocs(index, reservingDocs);
if (reserveError != null) {
plan = IndexingStrategy.failAsTooManyDocs(reserveError);
} else {
plan = IndexingStrategy.processNormally(
currentNotFoundOrDeleted,
canOptimizeAddDocument ? 1L : index.versionType().updateVersion(currentVersion, index.version()),
reservingDocs
);
}
}
}
}
return plan;
}
Expand Down Expand Up @@ -1320,10 +1319,10 @@ private IndexingStrategy(
: "use lucene update is set to true, but we're not indexing into lucene";
assert (indexIntoLucene && earlyResultOnPreFlightError != null) == false
: "can only index into lucene or have a preflight result but not both."
+ "indexIntoLucene: "
+ indexIntoLucene
+ " earlyResultOnPreFlightError:"
+ earlyResultOnPreFlightError;
+ "indexIntoLucene: "
+ indexIntoLucene
+ " earlyResultOnPreFlightError:"
+ earlyResultOnPreFlightError;
assert reservedDocs == 0 || indexIntoLucene || addStaleOpToLucene : reservedDocs;
this.currentNotFoundOrDeleted = currentNotFoundOrDeleted;
this.useLuceneUpdateDocument = useLuceneUpdateDocument;
Expand Down Expand Up @@ -1581,32 +1580,32 @@ private DeletionStrategy planDeletionAsPrimary(Delete delete) throws IOException
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, true);
} else if (delete.getIfSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO
&& (versionValue.seqNo != delete.getIfSeqNo() || versionValue.term != delete.getIfPrimaryTerm())) {
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
delete.id(),
delete.getIfSeqNo(),
delete.getIfPrimaryTerm(),
versionValue.seqNo,
versionValue.term
);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
} else if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
delete,
currentVersion,
currentlyDeleted
);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
} else {
final Exception reserveError = tryAcquireInFlightDocs(delete, 1);
if (reserveError != null) {
plan = DeletionStrategy.failAsTooManyDocs(reserveError);
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
delete.id(),
delete.getIfSeqNo(),
delete.getIfPrimaryTerm(),
versionValue.seqNo,
versionValue.term
);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
} else if (delete.versionType().isVersionConflictForWrites(currentVersion, delete.version(), currentlyDeleted)) {
final VersionConflictEngineException e = new VersionConflictEngineException(
shardId,
delete,
currentVersion,
currentlyDeleted
);
plan = DeletionStrategy.skipDueToVersionConflict(e, currentVersion, currentlyDeleted);
} else {
final long versionOfDeletion = delete.versionType().updateVersion(currentVersion, delete.version());
plan = DeletionStrategy.processNormally(currentlyDeleted, versionOfDeletion, 1);
final Exception reserveError = tryAcquireInFlightDocs(delete, 1);
if (reserveError != null) {
plan = DeletionStrategy.failAsTooManyDocs(reserveError);
} else {
final long versionOfDeletion = delete.versionType().updateVersion(currentVersion, delete.version());
plan = DeletionStrategy.processNormally(currentlyDeleted, versionOfDeletion, 1);
}
}
}
return plan;
}

Expand Down Expand Up @@ -1666,10 +1665,10 @@ private DeletionStrategy(
) {
assert (deleteFromLucene && earlyResultOnPreflightError != null) == false
: "can only delete from lucene or have a preflight result but not both."
+ "deleteFromLucene: "
+ deleteFromLucene
+ " earlyResultOnPreFlightError:"
+ earlyResultOnPreflightError;
+ "deleteFromLucene: "
+ deleteFromLucene
+ " earlyResultOnPreFlightError:"
+ earlyResultOnPreflightError;
this.deleteFromLucene = deleteFromLucene;
this.addStaleOpToLucene = addStaleOpToLucene;
this.currentlyDeleted = currentlyDeleted;
Expand Down Expand Up @@ -1849,21 +1848,23 @@ final boolean refresh(String source, SearcherScope scope, boolean block) throws
store.decRef();
}
if (refreshed) {
if(remoteStore != null && this.replicationTracker.isPrimaryMode()) {
if (remoteStore != null && this.replicationTracker.isPrimaryMode()) {
// Get files from local directory
// Get files from remote directory
// Add (local - remote) to remote
// Delete (remote - local) from remote
Set<String> localFiles = Arrays.stream(store.directory().listAll()).collect(Collectors.toSet());
Set<String> remoteFiles = Arrays.stream(remoteStore.directory().listAll()).collect(Collectors.toSet());
for(String file: localFiles) {
if(!remoteFiles.contains(file)) {
RemoteDirectory remoteDirectory = (RemoteDirectory) FilterDirectory.unwrap(FilterDirectory.unwrap(remoteStore.directory()));
for (String file : localFiles) {
if (!remoteFiles.contains(file)) {
RemoteDirectory remoteDirectory = (RemoteDirectory) FilterDirectory.unwrap(
FilterDirectory.unwrap(remoteStore.directory())
);
remoteDirectory.copyFrom(store.directory(), file, file, IOContext.DEFAULT);
}
}
for(String file: remoteFiles) {
if(!localFiles.contains(file)) {
for (String file : remoteFiles) {
if (!localFiles.contains(file)) {
remoteStore.directory().deleteFile(file);
}
}
Expand Down Expand Up @@ -1972,8 +1973,8 @@ public void flush(boolean force, boolean waitIfOngoing) throws EngineException {
|| force
|| shouldPeriodicallyFlush
|| getProcessedLocalCheckpoint() > Long.parseLong(
lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
)) {
lastCommittedSegmentInfos.userData.get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)
)) {
ensureCanFlush();
try {
translog.rollGeneration();
Expand Down Expand Up @@ -2294,12 +2295,12 @@ protected boolean maybeFailEngine(String source, Exception e) {
return failOnTragicEvent((AlreadyClosedException) e);
} else if (e != null
&& ((indexWriter.isOpen() == false && indexWriter.getTragicException() == e)
|| (translog.isOpen() == false && translog.getTragicException() == e))) {
// this spot on - we are handling the tragic event exception here so we have to fail the engine
// right away
failEngine(source, e);
return true;
}
|| (translog.isOpen() == false && translog.getTragicException() == e))) {
// this spot on - we are handling the tragic event exception here so we have to fail the engine
// right away
failEngine(source, e);
return true;
}
return false;
}

Expand Down Expand Up @@ -3017,9 +3018,9 @@ private void restoreVersionMapAndCheckpointTracker(DirectoryReader directoryRead
final IndexSearcher searcher = new IndexSearcher(directoryReader);
searcher.setQueryCache(null);
final Query query = new BooleanQuery.Builder().add(
LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE),
BooleanClause.Occur.MUST
)
LongPoint.newRangeQuery(SeqNoFieldMapper.NAME, getPersistedLocalCheckpoint() + 1, Long.MAX_VALUE),
BooleanClause.Occur.MUST
)
// exclude non-root nested documents
.add(Queries.newNonNestedFilter(), BooleanClause.Occur.MUST)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,7 @@ public RemoteDirectory(BlobContainer blobContainer) {
*/
@Override
public String[] listAll() throws IOException {
return blobContainer
.listBlobs()
.keySet()
.stream()
.sorted()
.toArray(String[]::new);
return blobContainer.listBlobs().keySet().stream().sorted().toArray(String[]::new);
}

/**
Expand Down Expand Up @@ -139,7 +134,7 @@ public void close() throws IOException {
public long fileLength(String name) throws IOException {
// ToDo: Instead of calling remote store each time, keep a cache with segment metadata
Map<String, BlobMetadata> metadata = blobContainer.listBlobsByPrefix(name);
if(metadata.containsKey(name)) {
if (metadata.containsKey(name)) {
return metadata.get(name).length();
}
throw new NoSuchFileException(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,14 @@ public RemoteIndexInput(String name, InputStream inputStream, long size) {

@Override
public byte readByte() throws IOException {
return inputStream.readNBytes(1)[0];
byte[] buffer = new byte[1];
inputStream.read(buffer);
return buffer[0];
}

@Override
public void readBytes(byte[] b, int offset, int len) throws IOException {
inputStream.readNBytes(b, offset, len);
inputStream.read(b, offset, len);
}

@Override
Expand All @@ -78,7 +80,7 @@ public long length() {

@Override
public void seek(long pos) throws IOException {
inputStream.skipNBytes(pos);
inputStream.skip(pos);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ public RemoteIndexOutput(String name, BlobContainer blobContainer) {

@Override
public void copyBytes(DataInput input, long numBytes) throws IOException {
assert numBytes >= 0: "numBytes=" + numBytes;
assert input instanceof IndexInput: "input should be instance of IndexInput";
assert input instanceof IndexInput : "input should be instance of IndexInput";
blobContainer.writeBlob(getName(), new InputStreamIndexInput((IndexInput) input, numBytes), numBytes, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ interface RemoteDirectoryFactory {
* @return a new lucene directory instance
* @throws IOException if an IOException occurs while opening the directory
*/
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, RepositoriesService repositoriesService) throws IOException;
Directory newDirectory(IndexSettings indexSettings, ShardPath shardPath, RepositoriesService repositoriesService)
throws IOException;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1522,10 +1522,10 @@ public long getRestoreThrottleTimeInNanos() {
}

protected void assertSnapshotOrGenericThread() {
// assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']')
// || Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']') : "Expected current thread ["
// + Thread.currentThread()
// + "] to be the snapshot or generic thread.";
// assert Thread.currentThread().getName().contains('[' + ThreadPool.Names.SNAPSHOT + ']')
// || Thread.currentThread().getName().contains('[' + ThreadPool.Names.GENERIC + ']') : "Expected current thread ["
// + Thread.currentThread()
// + "] to be the snapshot or generic thread.";
assert Boolean.TRUE;
}

Expand Down
Loading

0 comments on commit 6c11fd6

Please sign in to comment.