Skip to content

Commit

Permalink
Integrate async deletion in the snapshot interactions
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 11, 2024
1 parent 932a03e commit af78aab
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
- Making _cat/allocation API use indexLevelStats ([#15292](https://github.com/opensearch-project/OpenSearch/pull/15292))
- Memory optimisations in _cluster/health API ([#15492](https://github.com/opensearch-project/OpenSearch/pull/15492))
- Add support for async deletion in S3BlobContainer ([#15621](https://github.com/opensearch-project/OpenSearch/pull/15621))

### Dependencies
- Bump `com.azure:azure-identity` from 1.13.0 to 1.13.2 ([#15578](https://github.com/opensearch-project/OpenSearch/pull/15578))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.StepListener;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.PlainActionFuture;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateUpdateTask;
import org.opensearch.cluster.RepositoryCleanupInProgress;
Expand All @@ -69,6 +70,7 @@
import org.opensearch.common.Randomness;
import org.opensearch.common.SetOnce;
import org.opensearch.common.UUIDs;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
Expand Down Expand Up @@ -180,6 +182,7 @@
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -353,6 +356,16 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
Setting.Property.Final
);

/**
* Controls the fixed prefix for the snapshot shard blob path. cluster.snapshot.async-deletion.enable
*/
public static final Setting<Boolean> SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING = Setting.boolSetting(
"cluster.snapshot.async-deletion.enable",
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

protected volatile boolean supportURLRepo;

private volatile int maxShardBlobDeleteBatch;
Expand Down Expand Up @@ -446,6 +459,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp

private final String snapshotShardPathPrefix;

private volatile boolean enableAsyncDeletion;

/**
* Flag that is set to {@code true} if this instance is started with {@link #metadata} that has a higher value for
* {@link RepositoryMetadata#pendingGeneration()} than for {@link RepositoryMetadata#generation()} indicating a full cluster restart
Expand Down Expand Up @@ -498,6 +513,8 @@ protected BlobStoreRepository(
this.recoverySettings = recoverySettings;
this.remoteStoreSettings = new RemoteStoreSettings(clusterService.getSettings(), clusterService.getClusterSettings());
this.snapshotShardPathPrefix = SNAPSHOT_SHARD_PATH_PREFIX_SETTING.get(clusterService.getSettings());
this.enableAsyncDeletion = SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING.get(clusterService.getSettings());
clusterService.getClusterSettings().addSettingsUpdateConsumer(SNAPSHOT_ASYNC_DELETION_ENABLE_SETTING, this::setEnableAsyncDeletion);
}

@Override
Expand Down Expand Up @@ -2082,7 +2099,7 @@ private void executeOneStaleIndexDelete(
}

// Finally, we delete the [base_path]/indexId folder
deleteResult = deleteResult.add(indexEntry.getValue().delete()); // Deleting the index folder
deleteResult = deleteResult.add(deleteContainer(indexEntry.getValue())); // Deleting the index folder
logger.debug("[{}] Cleaned up stale index [{}]", metadata.name(), indexSnId);
return deleteResult;
} catch (IOException e) {
Expand Down Expand Up @@ -2115,6 +2132,21 @@ private void executeOneStaleIndexDelete(
}));
}

private DeleteResult deleteContainer(BlobContainer container) throws IOException {
long startTime = System.nanoTime();
DeleteResult deleteResult;
if (enableAsyncDeletion && container instanceof AsyncMultiStreamBlobContainer) {
// Use deleteAsync and wait for the result
PlainActionFuture<DeleteResult> future = new PlainActionFuture<>();
((AsyncMultiStreamBlobContainer) container).deleteAsync(future);
deleteResult = future.actionGet();
} else {
deleteResult = container.delete();
}
logger.debug(new ParameterizedMessage("[{}] Deleted {} in {}ns", metadata.name(), container.path(), startTime - System.nanoTime()));
return deleteResult;
}

/**
* Cleans up the remote store directory if needed.
* <p> This method cleans up segments in the remote store directory for deleted indices.
Expand Down Expand Up @@ -2318,7 +2350,7 @@ void releaseRemoteStoreLocksAndCleanup(
* @return A DeleteResult object representing the result of the deletion operation.
* @throws IOException If an I/O error occurs during the deletion process.
*/
private DeleteResult deleteShardData(ShardInfo shardInfo) throws IOException {
private DeleteResult deleteShardData(ShardInfo shardInfo) throws IOException, ExecutionException, InterruptedException {
// If the provided ShardInfo is null, return a zero DeleteResult
if (shardInfo == null) {
return DeleteResult.ZERO;
Expand All @@ -2330,7 +2362,7 @@ private DeleteResult deleteShardData(ShardInfo shardInfo) throws IOException {
// Iterate over the shards and delete each shard's data
for (int i = 0; i < shardInfo.getShardCount(); i++) {
// Call the delete method on the shardContainer and accumulate the result
deleteResult = deleteResult.add(shardContainer(shardInfo.getIndexId(), i).delete());
deleteResult = deleteResult.add(deleteContainer(shardContainer(shardInfo.getIndexId(), i)));
}

// Return the accumulated DeleteResult
Expand Down Expand Up @@ -2714,7 +2746,23 @@ public IndexMetadata getSnapshotIndexMetaData(RepositoryData repositoryData, Sna

private void deleteFromContainer(BlobContainer container, List<String> blobs) throws IOException {
logger.trace(() -> new ParameterizedMessage("[{}] Deleting {} from [{}]", metadata.name(), blobs, container.path()));
container.deleteBlobsIgnoringIfNotExists(blobs);
long startTime = System.nanoTime();
if (enableAsyncDeletion && container instanceof AsyncMultiStreamBlobContainer) {
PlainActionFuture<Void> future = new PlainActionFuture<>();
((AsyncMultiStreamBlobContainer) container).deleteBlobsAsyncIgnoringIfNotExists(blobs, future);
future.actionGet();
} else {
container.deleteBlobsIgnoringIfNotExists(blobs);
}
logger.debug(
() -> new ParameterizedMessage(
"[{}] Deletion {} from [{}] took {}ns",
metadata.name(),
blobs,
container.path(),
System.nanoTime() - startTime
)
);
}

private BlobPath indicesPath() {
Expand Down Expand Up @@ -4565,4 +4613,8 @@ public String toString() {
return name;
}
}

public void setEnableAsyncDeletion(boolean enableAsyncDeletion) {
this.enableAsyncDeletion = enableAsyncDeletion;
}
}

0 comments on commit af78aab

Please sign in to comment.