diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/GlobalCheckpointListenersIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/GlobalCheckpointListenersIT.java index a2c5c0333bbfe..76ff2f809cb83 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/GlobalCheckpointListenersIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/GlobalCheckpointListenersIT.java @@ -126,7 +126,7 @@ public void accept(final long g, final Exception e) { } }, null); - shard.close("closed", randomBoolean()); + shard.close("closed", randomBoolean(), false); assertBusy(() -> assertTrue(invoked.get())); } diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index ba567c125c6e9..1524acc4e7dd7 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -620,7 +620,7 @@ public void testShardHasMemoryBufferOnTranslogRecover() throws Throwable { client().prepareIndex("test").setId("1").setSource("{\"foo\" : \"bar\"}", XContentType.JSON).setRefreshPolicy(IMMEDIATE).get(); CheckedFunction wrapper = directoryReader -> directoryReader; - shard.close("simon says", false); + shard.close("simon says", false, false); AtomicReference shardRef = new AtomicReference<>(); List failures = new ArrayList<>(); IndexingOperationListener listener = new IndexingOperationListener() { @@ -658,7 +658,7 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Engine.DeleteResul try { ExceptionsHelper.rethrowAndSuppress(failures); } finally { - newShard.close("just do it", randomBoolean()); + newShard.close("just do it", randomBoolean(), false); } } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java index 42850fc59c8ad..0ffa5ab23e0b6 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java @@ -18,7 +18,13 @@ import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.OpenSearchIntegTestCase; +import java.io.IOException; +import java.nio.file.FileVisitResult; +import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.concurrent.atomic.AtomicInteger; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; @@ -27,6 +33,8 @@ public class RemoteStoreBaseIntegTestCase extends OpenSearchIntegTestCase { protected static final int SHARD_COUNT = 1; protected static final int REPLICA_COUNT = 1; + protected Path absolutePath; + @Override protected boolean addMockInternalEngine() { return false; @@ -73,7 +81,7 @@ protected Settings remoteTranslogIndexSettings(int numberOfReplicas) { @Before public void setup() { internalCluster().startClusterManagerOnlyNode(); - Path absolutePath = randomRepoPath().toAbsolutePath(); + absolutePath = randomRepoPath().toAbsolutePath(); assertAcked( clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath)) ); @@ -84,4 +92,22 @@ public void teardown() { assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME)); } + public int getFileCount(Path path) throws Exception { + final AtomicInteger filesExisting = new AtomicInteger(0); + Files.walkFileTree(path, new SimpleFileVisitor<>() { + @Override + public FileVisitResult postVisitDirectory(Path dir, IOException impossible) throws IOException { + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) { + filesExisting.incrementAndGet(); + return FileVisitResult.CONTINUE; + } + }); + + return filesExisting.get(); + } + } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 290f0df591c64..70a41d74a57c5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -9,6 +9,7 @@ package org.opensearch.remotestore; import org.opensearch.action.admin.cluster.remotestore.restore.RestoreRemoteStoreRequest; +import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.PlainActionFuture; @@ -23,6 +24,7 @@ import org.opensearch.test.transport.MockTransportService; import java.io.IOException; +import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; import java.util.HashMap; @@ -30,6 +32,7 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; +import static org.hamcrest.Matchers.comparesEqualTo; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; @@ -241,4 +244,37 @@ public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogNoDataRefresh() thro public void testPeerRecoveryWithRemoteStoreAndRemoteTranslogRefresh() throws Exception { testPeerRecovery(true, randomIntBetween(2, 5), false); } + + private void verifyRemoteStoreCleanup(boolean remoteTranslog) throws Exception { + internalCluster().startDataOnlyNodes(3); + if (remoteTranslog) { + createIndex(INDEX_NAME, remoteTranslogIndexSettings(1)); + } else { + createIndex(INDEX_NAME, remoteStoreIndexSettings(1)); + } + + indexData(5, randomBoolean()); + String indexUUID = client().admin() + .indices() + .prepareGetSettings(INDEX_NAME) + .get() + .getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID); + Path indexPath = Path.of(String.valueOf(absolutePath), indexUUID); + assertTrue(getFileCount(indexPath) > 0); + assertAcked(client().admin().indices().delete(new DeleteIndexRequest(INDEX_NAME)).get()); + // Delete is async. Give time for it + assertBusy(() -> { + try { + assertThat(getFileCount(indexPath), comparesEqualTo(0)); + } catch (Exception e) {} + }, 30, TimeUnit.SECONDS); + } + + public void testRemoteSegmentCleanup() throws Exception { + verifyRemoteStoreCleanup(false); + } + + public void testRemoteTranslogCleanup() throws Exception { + verifyRemoteStoreCleanup(true); + } } diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index 4e808ebb838e7..73797106bb66f 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -603,6 +603,7 @@ public synchronized void removeShard(int shardId, String reason) { private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store store, IndexEventListener listener) { final int shardId = sId.id(); final Settings indexSettings = this.getIndexSettings().getSettings(); + Store remoteStore = indexShard.remoteStore(); if (store != null) { store.beforeClose(); } @@ -616,7 +617,7 @@ private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store try { // only flush if we are closed (closed index or shutdown) and if we are not deleted final boolean flushEngine = deleted.get() == false && closed.get(); - indexShard.close(reason, flushEngine); + indexShard.close(reason, flushEngine, deleted.get()); } catch (Exception e) { logger.debug(() -> new ParameterizedMessage("[{}] failed to close index shard", shardId), e); // ignore @@ -632,6 +633,11 @@ private void closeShard(String reason, ShardId sId, IndexShard indexShard, Store } else { logger.trace("[{}] store not initialized prior to closing shard, nothing to close", shardId); } + + if (remoteStore != null && indexShard.isPrimaryMode() && deleted.get()) { + remoteStore.close(); + } + } catch (Exception e) { logger.warn( () -> new ParameterizedMessage("[{}] failed to close store on shard removal (reason: [{}])", shardId, reason), diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index ed38c561c7e29..946488bf441bc 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -1882,7 +1882,7 @@ public CacheHelper getReaderCacheHelper() { } - public void close(String reason, boolean flushEngine) throws IOException { + public void close(String reason, boolean flushEngine, boolean deleted) throws IOException { synchronized (engineMutex) { try { synchronized (mutex) { @@ -1898,12 +1898,30 @@ public void close(String reason, boolean flushEngine) throws IOException { // playing safe here and close the engine even if the above succeeds - close can be called multiple times // Also closing refreshListeners to prevent us from accumulating any more listeners IOUtils.close(engine, globalCheckpointListeners, refreshListeners, pendingReplicationActions); + + if (deleted && engine != null && isPrimaryMode()) { + // Translog Clean up + engine.translogManager().onDelete(); + } + indexShardOperationPermits.close(); } } } } + /* + ToDo : Fix this https://github.com/opensearch-project/OpenSearch/issues/8003 + */ + private RemoteSegmentStoreDirectory getRemoteDirectory() { + assert indexSettings.isRemoteStoreEnabled(); + assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; + FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory(); + FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); + final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate(); + return ((RemoteSegmentStoreDirectory) remoteDirectory); + } + public void preRecovery() { final IndexShardState currentState = this.state; // single volatile read if (currentState == IndexShardState.CLOSED) { @@ -4520,16 +4538,10 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, boolean re throws IOException { assert indexSettings.isRemoteStoreEnabled(); logger.info("Downloading segments from remote segment store"); - assert remoteStore.directory() instanceof FilterDirectory : "Store.directory is not an instance of FilterDirectory"; - FilterDirectory remoteStoreDirectory = (FilterDirectory) remoteStore.directory(); - assert remoteStoreDirectory.getDelegate() instanceof FilterDirectory - : "Store.directory is not enclosing an instance of FilterDirectory"; - FilterDirectory byteSizeCachingStoreDirectory = (FilterDirectory) remoteStoreDirectory.getDelegate(); - final Directory remoteDirectory = byteSizeCachingStoreDirectory.getDelegate(); + RemoteSegmentStoreDirectory remoteDirectory = getRemoteDirectory(); // We need to call RemoteSegmentStoreDirectory.init() in order to get latest metadata of the files that // are uploaded to the remote segment store. - assert remoteDirectory instanceof RemoteSegmentStoreDirectory : "remoteDirectory is not an instance of RemoteSegmentStoreDirectory"; - RemoteSegmentMetadata remoteSegmentMetadata = ((RemoteSegmentStoreDirectory) remoteDirectory).init(); + RemoteSegmentMetadata remoteSegmentMetadata = remoteDirectory.init(); Map uploadedSegments = ((RemoteSegmentStoreDirectory) remoteDirectory) .getSegmentsUploadedToRemoteStore(); store.incRef(); diff --git a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java index 5192fd49b91f6..be4b4e910bb4d 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteDirectory.java @@ -210,4 +210,8 @@ public void rename(String source, String dest) throws IOException { public Lock obtainLock(String name) throws IOException { throw new UnsupportedOperationException(); } + + public void delete() throws IOException { + blobContainer.delete(); + } } diff --git a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java index 93fa4b7eff7b7..ca6834438ed61 100644 --- a/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java +++ b/server/src/main/java/org/opensearch/index/store/RemoteSegmentStoreDirectory.java @@ -20,24 +20,24 @@ import org.apache.lucene.store.IndexInput; import org.apache.lucene.store.IndexOutput; import org.opensearch.common.UUIDs; +import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.common.lucene.store.ByteArrayIndexInput; +import org.opensearch.index.store.lockmanager.FileLockInfo; import org.opensearch.index.store.lockmanager.RemoteStoreCommitLevelLockManager; import org.opensearch.index.store.lockmanager.RemoteStoreLockManager; -import org.opensearch.index.store.lockmanager.FileLockInfo; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadata; -import org.opensearch.common.io.VersionedCodecStreamWrapper; import org.opensearch.index.store.remote.metadata.RemoteSegmentMetadataHandler; import java.io.IOException; import java.nio.file.NoSuchFileException; -import java.util.Map; -import java.util.HashSet; -import java.util.Optional; -import java.util.HashMap; import java.util.Collection; import java.util.Collections; import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -617,4 +617,32 @@ public void deleteStaleSegments(int lastNMetadataFilesToKeep) throws IOException } } } + + /* + Tries to delete shard level directory if it is empty + Return true if it deleted it successfully + */ + private boolean deleteIfEmpty() throws IOException { + Collection metadataFiles = remoteMetadataDirectory.listFilesByPrefix(MetadataFilenameUtils.METADATA_PREFIX); + if (metadataFiles.size() != 0) { + logger.info("Remote directory still has files , not deleting the path"); + return false; + } + + try { + remoteDataDirectory.delete(); + remoteMetadataDirectory.delete(); + mdLockManager.delete(); + } catch (Exception e) { + logger.error("Exception occurred while deleting directory", e); + return false; + } + + return true; + } + + public void close() throws IOException { + deleteStaleSegments(0); + deleteIfEmpty(); + } } diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java index ce657627fcfc6..c30be082b4795 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreLockManager.java @@ -22,7 +22,7 @@ public interface RemoteStoreLockManager { * @param lockInfo lock info instance for which we need to acquire lock. * @throws IOException throws exception in case there is a problem with acquiring lock. */ - public void acquire(LockInfo lockInfo) throws IOException; + void acquire(LockInfo lockInfo) throws IOException; /** * @@ -38,4 +38,9 @@ public interface RemoteStoreLockManager { * @throws IOException throws exception in case there is a problem in checking if a given file is locked or not. */ Boolean isAcquired(LockInfo lockInfo) throws IOException; + + /* + Deletes all lock related files and directories + */ + void delete() throws IOException; } diff --git a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java index 41665ebe47600..7df20cae10664 100644 --- a/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java +++ b/server/src/main/java/org/opensearch/index/store/lockmanager/RemoteStoreMetadataLockManager.java @@ -83,4 +83,8 @@ public Boolean isAcquired(LockInfo lockInfo) throws IOException { Collection lockFiles = lockDirectory.listFilesByPrefix(((FileLockInfo) lockInfo).getLockPrefix()); return !lockFiles.isEmpty(); } + + public void delete() throws IOException { + lockDirectory.delete(); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java index 0eb133eb464d2..7eaab67ddb5a5 100644 --- a/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/InternalTranslogManager.java @@ -296,6 +296,10 @@ public void setMinSeqNoToKeep(long seqNo) { translog.setMinSeqNoToKeep(seqNo); } + public void onDelete() { + translog.onDelete(); + } + /** * Reads operations from the translog * @param location location of translog diff --git a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java index cea38b4fbc781..58ee8c0fd39e7 100644 --- a/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/NoOpTranslogManager.java @@ -120,4 +120,6 @@ public Translog.Location add(Translog.Operation operation) throws IOException { public Translog.Snapshot newChangesSnapshot(long fromSeqNo, long toSeqNo, boolean requiredFullRange) throws IOException { throw new UnsupportedOperationException("Translog snapshot unsupported with no-op translogs"); } + + public void onDelete() {} } diff --git a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java index 190ca6948f42a..939402058e048 100644 --- a/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java +++ b/server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java @@ -422,4 +422,14 @@ private void deleteStaleRemotePrimaryTermsAndMetadataFiles() { translogTransferManager.deleteStaleTranslogMetadataFilesAsync(); } } + + protected void onDelete() { + if (primaryModeSupplier.getAsBoolean() == false) { + logger.trace("skipped delete translog"); + // NO-OP + return; + } + // clean up all remote translog files + translogTransferManager.delete(); + } } diff --git a/server/src/main/java/org/opensearch/index/translog/Translog.java b/server/src/main/java/org/opensearch/index/translog/Translog.java index 422864219745b..5816fdfaff754 100644 --- a/server/src/main/java/org/opensearch/index/translog/Translog.java +++ b/server/src/main/java/org/opensearch/index/translog/Translog.java @@ -1807,6 +1807,8 @@ protected long getMinReferencedGen() throws IOException { */ protected void setMinSeqNoToKeep(long seqNo) {} + protected void onDelete() {} + /** * deletes all files associated with a reader. package-private to be able to simulate node failures at this point */ diff --git a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java index e2818dd702d87..420d6cdc43bbf 100644 --- a/server/src/main/java/org/opensearch/index/translog/TranslogManager.java +++ b/server/src/main/java/org/opensearch/index/translog/TranslogManager.java @@ -126,4 +126,9 @@ public interface TranslogManager { * This might be required when segments are persisted via other mechanism than flush. */ void setMinSeqNoToKeep(long seqNo); + + /* + Clean up if any needed on deletion of index + */ + void onDelete(); } diff --git a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java index 489c81f802695..58aca00d2e9d3 100644 --- a/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java +++ b/server/src/main/java/org/opensearch/index/translog/transfer/TranslogTransferManager.java @@ -56,6 +56,7 @@ public class TranslogTransferManager { private final TransferService transferService; private final BlobPath remoteDataTransferPath; private final BlobPath remoteMetadataTransferPath; + private final BlobPath remoteBaseTransferPath; private final FileTransferTracker fileTransferTracker; private static final long TRANSFER_TIMEOUT_IN_MILLIS = 30000; @@ -74,13 +75,14 @@ public class TranslogTransferManager { public TranslogTransferManager( ShardId shardId, TransferService transferService, - BlobPath remoteDataTransferPath, + BlobPath remoteBaseTransferPath, FileTransferTracker fileTransferTracker ) { this.shardId = shardId; this.transferService = transferService; - this.remoteDataTransferPath = remoteDataTransferPath.add(DATA_DIR); - this.remoteMetadataTransferPath = remoteDataTransferPath.add(METADATA_DIR); + this.remoteBaseTransferPath = remoteBaseTransferPath; + this.remoteDataTransferPath = remoteBaseTransferPath.add(DATA_DIR); + this.remoteMetadataTransferPath = remoteBaseTransferPath.add(METADATA_DIR); this.fileTransferTracker = fileTransferTracker; } @@ -324,6 +326,21 @@ public void onFailure(Exception e) { ); } + public void delete() { + // cleans up all the translog contents in async fashion + transferService.deleteAsync(ThreadPool.Names.REMOTE_PURGE, remoteBaseTransferPath, new ActionListener<>() { + @Override + public void onResponse(Void unused) { + logger.info("Deleted all remote translog data for {}", shardId); + } + + @Override + public void onFailure(Exception e) { + logger.error("Exception occurred while cleaning translog ", e); + } + }); + } + public void deleteStaleTranslogMetadataFilesAsync() { transferService.listAllAsync(ThreadPool.Names.REMOTE_PURGE, remoteMetadataTransferPath, new ActionListener<>() { @Override diff --git a/server/src/test/java/org/opensearch/index/engine/NoOpEngineRecoveryTests.java b/server/src/test/java/org/opensearch/index/engine/NoOpEngineRecoveryTests.java index 3162f7915c994..d8b55815b5b05 100644 --- a/server/src/test/java/org/opensearch/index/engine/NoOpEngineRecoveryTests.java +++ b/server/src/test/java/org/opensearch/index/engine/NoOpEngineRecoveryTests.java @@ -50,7 +50,7 @@ public void testRecoverFromNoOp() throws IOException { for (int i = 0; i < nbDocs; i++) { indexDoc(indexShard, "_doc", String.valueOf(i)); } - indexShard.close("test", true); + indexShard.close("test", true, false); final ShardRouting shardRouting = indexShard.routingEntry(); IndexShard primary = reinitShard( diff --git a/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java b/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java index 5ac10824e14e2..d73cdfd3fe93f 100644 --- a/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java +++ b/server/src/test/java/org/opensearch/index/replication/RecoveryDuringReplicationTests.java @@ -161,10 +161,10 @@ public void testRecoveryToReplicaThatReceivedExtraDocument() throws Exception { new SourceToParse("index", "replica", new BytesArray("{}"), XContentType.JSON) ); shards.promoteReplicaToPrimary(promotedReplica).get(); - oldPrimary.close("demoted", randomBoolean()); + oldPrimary.close("demoted", randomBoolean(), false); oldPrimary.store().close(); shards.removeReplica(remainingReplica); - remainingReplica.close("disconnected", false); + remainingReplica.close("disconnected", false, false); remainingReplica.store().close(); // randomly introduce a conflicting document final boolean extra = randomBoolean(); @@ -289,7 +289,7 @@ public void testRecoveryAfterPrimaryPromotion() throws Exception { newPrimary.flush(new FlushRequest()); } - oldPrimary.close("demoted", false); + oldPrimary.close("demoted", false, false); oldPrimary.store().close(); IndexShard newReplica = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId()); @@ -335,7 +335,7 @@ public void testReplicaRollbackStaleDocumentsInPeerRecovery() throws Exception { shards.promoteReplicaToPrimary(newPrimary).get(); // Recover a replica should rollback the stale documents shards.removeReplica(replica); - replica.close("recover replica - first time", false); + replica.close("recover replica - first time", false, false); replica.store().close(); replica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); shards.recoverReplica(replica); @@ -346,7 +346,7 @@ public void testReplicaRollbackStaleDocumentsInPeerRecovery() throws Exception { assertThat(replica.getLastSyncedGlobalCheckpoint(), equalTo(replica.seqNoStats().getMaxSeqNo())); // Recover a replica again should also rollback the stale documents. shards.removeReplica(replica); - replica.close("recover replica - second time", false); + replica.close("recover replica - second time", false, false); replica.store().close(); IndexShard anotherReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); shards.recoverReplica(anotherReplica); diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index eb7ff360ec5d9..67f149c3cb5ae 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -291,7 +291,7 @@ public void testFailShard() throws Exception { assertNotNull(shardPath); // fail shard shard.failShard("test shard fail", new CorruptIndexException("", "")); - shard.close("do not assert history", false); + shard.close("do not assert history", false, false); shard.store().close(); // check state file still exists ShardStateMetadata shardStateMetadata = load(logger, shardPath.getShardStatePath()); @@ -1614,7 +1614,7 @@ public void testSnapshotStore() throws IOException { snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); - newShard.close("test", false); + newShard.close("test", false, false); snapshot = newShard.snapshotStoreMetadata(); assertThat(snapshot.getSegmentsFile().name(), equalTo("segments_3")); @@ -1874,7 +1874,7 @@ public void testIndexingOperationsListeners() throws IOException { AtomicInteger preDelete = new AtomicInteger(); AtomicInteger postDelete = new AtomicInteger(); AtomicInteger postDeleteException = new AtomicInteger(); - shard.close("simon says", true); + shard.close("simon says", true, false); shard = reinitShard(shard, new IndexingOperationListener() { @Override public Engine.Index preIndex(ShardId shardId, Engine.Index operation) { @@ -1961,7 +1961,7 @@ public void postDelete(ShardId shardId, Engine.Delete delete, Exception ex) { assertEquals(1, postDelete.get()); assertEquals(0, postDeleteException.get()); - shard.close("Unexpected close", true); + shard.close("Unexpected close", true, false); shard.state = IndexShardState.STARTED; // It will generate exception try { @@ -4362,7 +4362,7 @@ public void onBeginTranslogRecovery() { Thread closeShardThread = new Thread(() -> { try { readyToCloseLatch.await(); - shard.close("testing", false); + shard.close("testing", false, false); // in integration tests, this is done as a listener on IndexService. MockFSDirectoryFactory.checkIndex(logger, shard.store(), shard.shardId); } catch (InterruptedException | IOException e) { @@ -4811,7 +4811,7 @@ public void testCloseShardWhileEngineIsWarming() throws Exception { recoveryThread.start(); try { warmerStarted.await(); - shard.close("testing", false); + shard.close("testing", false, false); assertThat(shard.state, equalTo(IndexShardState.CLOSED)); } finally { warmerBlocking.countDown(); diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index 0ee3e81678511..aba04e9d30159 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -579,7 +579,7 @@ public void testReplicaReceivesLowerGeneration() throws Exception { assertEqualCommittedSegments(primary, replica_1); shards.promoteReplicaToPrimary(replica_2).get(); - primary.close("demoted", false); + primary.close("demoted", false, false); primary.store().close(); IndexShard oldPrimary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId()); shards.recoverReplica(oldPrimary); @@ -618,7 +618,7 @@ public void testReplicaRestarts() throws Exception { // randomly resetart a replica final IndexShard replicaToRestart = getRandomReplica(shards); - replicaToRestart.close("restart", false); + replicaToRestart.close("restart", false, false); replicaToRestart.store().close(); shards.removeReplica(replicaToRestart); final IndexShard newReplica = shards.addReplicaWithExistingPath( @@ -716,7 +716,7 @@ private void testNRTReplicaWithRemoteStorePromotedAsPrimary(boolean performFlush shards.promoteReplicaToPrimary(nextPrimary).get(); // close oldPrimary. - oldPrimary.close("demoted", false); + oldPrimary.close("demoted", false, false); oldPrimary.store().close(); assertEquals(InternalEngine.class, nextPrimary.getEngine().getClass()); @@ -783,7 +783,7 @@ public void testNRTReplicaPromotedAsPrimary() throws Exception { shards.promoteReplicaToPrimary(nextPrimary); // close and start the oldPrimary as a replica. - oldPrimary.close("demoted", false); + oldPrimary.close("demoted", false, false); oldPrimary.store().close(); oldPrimary = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId()); shards.recoverReplica(oldPrimary); @@ -866,7 +866,7 @@ public void onFailure(Exception e) { assertEquals(nextPrimary.getEngine().getClass(), InternalEngine.class); nextPrimary.refresh("test"); - oldPrimary.close("demoted", false); + oldPrimary.close("demoted", false, false); oldPrimary.store().close(); IndexShard newReplica = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId()); shards.recoverReplica(newReplica); @@ -1074,7 +1074,7 @@ private IndexShard failAndPromoteRandomReplica(ReplicationGroup shards) throws I IndexShard primary = shards.getPrimary(); final IndexShard newPrimary = getRandomReplica(shards); shards.promoteReplicaToPrimary(newPrimary); - primary.close("demoted", true); + primary.close("demoted", true, false); primary.store().close(); primary = shards.addReplicaWithExistingPath(primary.shardPath(), primary.routingEntry().currentNodeId()); shards.recoverReplica(primary); diff --git a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java index f5f24402c1646..fec9b04d6e371 100644 --- a/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java +++ b/server/src/test/java/org/opensearch/index/store/RemoteSegmentStoreDirectoryTests.java @@ -84,7 +84,7 @@ public void setup() throws IOException { @After public void tearDown() throws Exception { - indexShard.close("test tearDown", true); + indexShard.close("test tearDown", true, false); super.tearDown(); } diff --git a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java index 3e824c0afee25..3bf7781fb909f 100644 --- a/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java +++ b/server/src/test/java/org/opensearch/index/store/remote/metadata/RemoteSegmentMetadataHandlerTests.java @@ -51,7 +51,7 @@ public void setup() throws IOException { @After public void tearDown() throws Exception { - indexShard.close("test tearDown", true); + indexShard.close("test tearDown", true, false); super.tearDown(); } diff --git a/server/src/test/java/org/opensearch/indices/IndexingMemoryControllerTests.java b/server/src/test/java/org/opensearch/indices/IndexingMemoryControllerTests.java index 7a362ce8ded74..4a9f15f7128ad 100644 --- a/server/src/test/java/org/opensearch/indices/IndexingMemoryControllerTests.java +++ b/server/src/test/java/org/opensearch/indices/IndexingMemoryControllerTests.java @@ -369,7 +369,7 @@ public void testTranslogRecoveryWorksWithIMC() throws IOException { for (int i = 0; i < 100; i++) { indexDoc(shard, Integer.toString(i), "{\"foo\" : \"bar\"}", XContentType.JSON, null); } - shard.close("simon says", false); + shard.close("simon says", false, false); AtomicReference shardRef = new AtomicReference<>(); Settings settings = Settings.builder().put("indices.memory.index_buffer_size", "50kb").build(); Iterable iterable = () -> (shardRef.get() == null) diff --git a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java index 730d9b4215b73..9faa8ddff8183 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/PeerRecoveryTargetServiceTests.java @@ -380,7 +380,7 @@ public void testResetStartRequestIfTranslogIsCorrupted() throws Exception { ); IndexShard shard = newStartedShard(false); final SeqNoStats seqNoStats = populateRandomData(shard); - shard.close("test", false); + shard.close("test", false, false); if (randomBoolean()) { shard.store().associateIndexWithNewTranslog(UUIDs.randomBase64UUID()); } else if (randomBoolean()) { diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryStatusTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryStatusTests.java index 73caa611dbcdb..3038d11e6ad91 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryStatusTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryStatusTests.java @@ -94,7 +94,7 @@ public void testRenameTempFiles() throws IOException { } } assertNotNull(expectedFile); - indexShard.close("foo", false);// we have to close it here otherwise rename fails since the write.lock is held by the engine + indexShard.close("foo", false, false);// we have to close it here otherwise rename fails since the write.lock is held by the engine multiFileWriter.renameAllTempFiles(); strings = Sets.newHashSet(indexShard.store().directory().listAll()); assertTrue(strings.toString(), strings.contains("foo.bar")); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index eae070b98c4a1..97772564acc88 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -267,7 +267,7 @@ public void testDifferentHistoryUUIDDisablesOPsRecovery() throws Exception { final String historyUUID = replica.getHistoryUUID(); Translog.TranslogGeneration translogGeneration = getTranslog(replica).getGeneration(); shards.removeReplica(replica); - replica.close("test", false); + replica.close("test", false, false); IndexWriterConfig iwc = new IndexWriterConfig(null).setCommitOnClose(false) // we don't want merges to happen here - we call maybe merge on the engine // later once we stared it up otherwise we would need to wait for it here @@ -391,7 +391,7 @@ public void testSequenceBasedRecoveryKeepsTranslog() throws Exception { if (randomBoolean()) { shards.flush(); } - replica.close("test", randomBoolean()); + replica.close("test", randomBoolean(), false); replica.store().close(); final IndexShard newReplica = shards.addReplicaWithExistingPath(replica.shardPath(), replica.routingEntry().currentNodeId()); shards.recoverReplica(newReplica); @@ -509,7 +509,7 @@ public void testRecoveryTrimsLocalTranslog() throws Exception { } shards.syncGlobalCheckpoint(); shards.promoteReplicaToPrimary(randomFrom(shards.getReplicas())).get(); - oldPrimary.close("demoted", false); + oldPrimary.close("demoted", false, false); oldPrimary.store().close(); oldPrimary = shards.addReplicaWithExistingPath(oldPrimary.shardPath(), oldPrimary.routingEntry().currentNodeId()); shards.recoverReplica(oldPrimary); diff --git a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java index c3bd4dcaf530d..6797a1db23b2d 100644 --- a/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java +++ b/server/src/test/java/org/opensearch/repositories/blobstore/BlobStoreRepositoryRestoreTests.java @@ -162,7 +162,7 @@ public void testRestoreSnapshotWithExistingFiles() throws IOException { } finally { if (shard != null && shard.state() != IndexShardState.CLOSED) { try { - shard.close("test", false); + shard.close("test", false, false); } finally { IOUtils.close(shard.store()); } @@ -228,7 +228,7 @@ public void testSnapshotWithConflictingName() throws Exception { } finally { if (shard != null && shard.state() != IndexShardState.CLOSED) { try { - shard.close("test", false); + shard.close("test", false, false); } finally { IOUtils.close(shard.store()); } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 5a01fcaa1ddaf..d3d81083e72cc 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -806,7 +806,7 @@ protected void closeShard(IndexShard shard, boolean assertConsistencyBetweenTran EngineTestCase.assertAtMostOneLuceneDocumentPerSequenceNumber(engine); } } finally { - IOUtils.close(() -> shard.close("test", false), shard.store()); + IOUtils.close(() -> shard.close("test", false, false), shard.store()); } }