Skip to content

Commit

Permalink
[Remote Store] Deletion of Remote Segments and Translog upon Index De…
Browse files Browse the repository at this point in the history
…letion (opensearch-project#7682)

Signed-off-by: Gaurav Bafna <gbbafna@amazon.com>
Signed-off-by: Shivansh Arora <hishiv@amazon.com>
  • Loading branch information
gbbafna authored and shiv0408 committed Apr 25, 2024
1 parent e693850 commit fb6d28f
Show file tree
Hide file tree
Showing 28 changed files with 214 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void accept(final long g, final Exception e) {
}

}, null);
shard.close("closed", randomBoolean());
shard.close("closed", randomBoolean(), false);
assertBusy(() -> assertTrue(invoked.get()));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable {
client().prepareIndex("test").setId("1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).setRefreshPolicy(IMMEDIATE).get();

CheckedFunction<DirectoryReader, DirectoryReader, IOException> wrapper = directoryReader -> directoryReader;
shard.close("simon says", false);
shard.close("simon says", false, false);
AtomicReference<IndexShard> shardRef = new AtomicReference<>();
List<Exception> failures = new ArrayList<>();
IndexingOperationListener listener = new IndexingOperationListener() {
Expand Down Expand Up @@ -658,7 +658,7 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResul
try {
ExceptionsHelper.rethrowAndSuppress(failures);
} finally {
newShard.close("just do it", randomBoolean());
newShard.close("just do it", randomBoolean(), false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,13 @@
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.io.IOException;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.concurrent.atomic.AtomicInteger;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

Expand All @@ -27,6 +33,8 @@ public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase {
protected static final int SHARD_COUNT = 1;
protected static final int REPLICA_COUNT = 1;

protected Path absolutePath;

@Override
protected boolean addMockInternalEngine() {
return false;
Expand Down Expand Up @@ -73,7 +81,7 @@ protected Settings remoteTranslogIndexSettings(int numberOfReplicas) {
@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
Expand All @@ -84,4 +92,22 @@ public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}

public int getFileCount(Path path) throws Exception {
final AtomicInteger filesExisting = new AtomicInteger(0);
Files.walkFileTree(path, new SimpleFileVisitor<>() {
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException impossible) throws IOException {
return FileVisitResult.CONTINUE;
}

@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) {
filesExisting.incrementAndGet();
return FileVisitResult.CONTINUE;
}
});

return filesExisting.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.remotestore;

import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.PlainActionFuture;
Expand All @@ -23,13 +24,15 @@
import org.opensearch.test.transport.MockTransportService;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.hamcrest.Matchers.comparesEqualTo;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

Expand Down Expand Up @@ -241,4 +244,37 @@ public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataRefresh() thro
public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogRefresh() throws Exception {
testPeerRecovery(true, randomIntBetween(2, 5), false);
}

private void verifyRemoteStoreCleanup(boolean remoteTranslog) throws Exception {
internalCluster().startDataOnlyNodes(3);
if (remoteTranslog) {
createIndex(INDEX_NAME, remoteTranslogIndexSettings(1));
} else {
createIndex(INDEX_NAME, remoteStoreIndexSettings(1));
}

indexData(5, randomBoolean());
String indexUUID = client().admin()
.indices()
.prepareGetSettings(INDEX_NAME)
.get()
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID);
assertTrue(getFileCount(indexPath) > 0);
assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get());
// Delete is async. Give time for it
assertBusy(() -> {
try {
assertThat(getFileCount(indexPath), comparesEqualTo(0));
} catch (Exception e) {}
}, 30, TimeUnit.SECONDS);
}

public void testRemoteSegmentCleanup() throws Exception {
verifyRemoteStoreCleanup(false);
}

public void testRemoteTranslogCleanup() throws Exception {
verifyRemoteStoreCleanup(true);
}
}
8 changes: 7 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,7 @@ public synchronized void removeShard(int shardId, String reason) {
private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store store, IndexEventListener listener) {
final int shardId = sId.id();
final Settings indexSettings = this.getIndexSettings().getSettings();
Store remoteStore = indexShard.remoteStore();
if (store != null) {
store.beforeClose();
}
Expand All @@ -616,7 +617,7 @@ private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store
try {
// only flush if we are closed (closed index or shutdown) and if we are not deleted
final boolean flushEngine = deleted.get() == false && closed.get();
indexShard.close(reason, flushEngine);
indexShard.close(reason, flushEngine, deleted.get());
} catch (Exception e) {
logger.debug(() -> new ParameterizedMessage("[{}] failed to close index shard", shardId), e);
// ignore
Expand All @@ -632,6 +633,11 @@ private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store
} else {
logger.trace("[{}] store not initialized prior to closing shard, nothing to close", shardId);
}

if (remoteStore != null && indexShard.isPrimaryMode() && deleted.get()) {
remoteStore.close();
}

} catch (Exception e) {
logger.warn(
() -> new ParameterizedMessage("[{}] failed to close store on shard removal (reason: [{}])", shardId, reason),
Expand Down
30 changes: 21 additions & 9 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -1882,7 +1882,7 @@ public CacheHelper getReaderCacheHelper() {

}

public void close(String reason, boolean flushEngine) throws IOException {
public void close(String reason, boolean flushEngine, boolean deleted) throws IOException {
synchronized (engineMutex) {
try {
synchronized (mutex) {
Expand All @@ -1898,12 +1898,30 @@ public void close(String reason, boolean flushEngine) throws IOException {
// playing safe here and close the engine even if the above succeeds - close can be called multiple times
// Also closing refreshListeners to prevent us from accumulating any more listeners
IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions);

if (deleted && engine != null && isPrimaryMode()) {
// Translog Clean up
engine.translogManager().onDelete();
}

indexShardOperationPermits.close();
}
}
}
}

/*
ToDo : Fix this https://github.com/opensearch-project/OpenSearch/issues/8003
*/
private RemoteSegmentStoreDirectory getRemoteDirectory() {
assert indexSettings.isRemoteStoreEnabled();
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate();
return ((RemoteSegmentStoreDirectory) remoteDirectory);
}

public void preRecovery() {
final IndexShardState currentState = this.state; // single volatile read
if (currentState == IndexShardState.CLOSED) {
Expand Down Expand Up @@ -4520,16 +4538,10 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re
throws IOException {
assert indexSettings.isRemoteStoreEnabled();
logger.info("Downloading segments from remote segment store");
assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory";
FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory();
assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory
: "Store.directory is not enclosing an instance of FilterDirectory";
FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate();
final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate();
RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory();
// We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that
// are uploaded to the remote segment store.
assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory";
RemoteSegmentMetadata remoteSegmentMetadata = ((RemoteSegmentStoreDirectory) remoteDirectory).init();
RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.init();
Map<String, RemoteSegmentStoreDirectory.UploadedSegmentMetadata> uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory)
.getSegmentsUploadedToRemoteStore();
store.incRef();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,4 +210,8 @@ public void rename(String source, String dest) throws IOException {
public Lock obtainLock(String name) throws IOException {
throw new UnsupportedOperationException();
}

public void delete() throws IOException {
blobContainer.delete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,24 @@
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.opensearch.common.UUIDs;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.common.lucene.store.ByteArrayIndexInput;
import org.opensearch.index.store.lockmanager.FileLockInfo;
import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager;
import org.opensearch.index.store.lockmanager.RemoteStoreLockManager;
import org.opensearch.index.store.lockmanager.FileLockInfo;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata;
import org.opensearch.common.io.VersionedCodecStreamWrapper;
import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler;

import java.io.IOException;
import java.nio.file.NoSuchFileException;
import java.util.Map;
import java.util.HashSet;
import java.util.Optional;
import java.util.HashMap;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -617,4 +617,32 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException
}
}
}

/*
Tries to delete shard level directory if it is empty
Return true if it deleted it successfully
*/
private boolean deleteIfEmpty() throws IOException {
Collection<String> metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX);
if (metadataFiles.size() != 0) {
logger.info("Remote directory still has files , not deleting the path");
return false;
}

try {
remoteDataDirectory.delete();
remoteMetadataDirectory.delete();
mdLockManager.delete();
} catch (Exception e) {
logger.error("Exception occurred while deleting directory", e);
return false;
}

return true;
}

public void close() throws IOException {
deleteStaleSegments(0);
deleteIfEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public interface RemoteStoreLockManager {
* @param lockInfo lock info instance for which we need to acquire lock.
* @throws IOException throws exception in case there is a problem with acquiring lock.
*/
public void acquire(LockInfo lockInfo) throws IOException;
void acquire(LockInfo lockInfo) throws IOException;

/**
*
Expand All @@ -38,4 +38,9 @@ public interface RemoteStoreLockManager {
* @throws IOException throws exception in case there is a problem in checking if a given file is locked or not.
*/
Boolean isAcquired(LockInfo lockInfo) throws IOException;

/*
Deletes all lock related files and directories
*/
void delete() throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,8 @@ public Boolean isAcquired(LockInfo lockInfo) throws IOException {
Collection<String> lockFiles = lockDirectory.listFilesByPrefix(((FileLockInfo) lockInfo).getLockPrefix());
return !lockFiles.isEmpty();
}

public void delete() throws IOException {
lockDirectory.delete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ public void setMinSeqNoToKeep(long seqNo) {
translog.setMinSeqNoToKeep(seqNo);
}

public void onDelete() {
translog.onDelete();
}

/**
* Reads operations from the translog
* @param location location of translog
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,6 @@ public Translog.Location add(Translog.Operation operation) throws IOException {
public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException {
throw new UnsupportedOperationException("Translog snapshot unsupported with no-op translogs");
}

public void onDelete() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -422,4 +422,14 @@ private void deleteStaleRemotePrimaryTermsAndMetadataFiles() {
translogTransferManager.deleteStaleTranslogMetadataFilesAsync();
}
}

protected void onDelete() {
if (primaryModeSupplier.getAsBoolean() == false) {
logger.trace("skipped delete translog");
// NO-OP
return;
}
// clean up all remote translog files
translogTransferManager.delete();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1807,6 +1807,8 @@ protected long getMinReferencedGen() throws IOException {
*/
protected void setMinSeqNoToKeep(long seqNo) {}

protected void onDelete() {}

/**
* deletes all files associated with a reader. package-private to be able to simulate node failures at this point
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,9 @@ public interface TranslogManager {
* This might be required when segments are persisted via other mechanism than flush.
*/
void setMinSeqNoToKeep(long seqNo);

/*
Clean up if any needed on deletion of index
*/
void onDelete();
}
Loading

0 comments on commit fb6d28f

Please sign in to comment.