diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncDeleteHelper.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncDeleteHelper.java new file mode 100644 index 0000000000000..c3934f3544a9e --- /dev/null +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3AsyncDeleteHelper.java @@ -0,0 +1,123 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.repositories.s3; + +import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.Delete; +import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest; +import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse; +import software.amazon.awssdk.services.s3.model.ObjectIdentifier; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.message.ParameterizedMessage; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.stream.Collectors; + +public class S3AsyncDeleteHelper { + private static final Logger logger = LogManager.getLogger(S3AsyncDeleteHelper.class); + + static CompletableFuture executeDeleteChain( + S3AsyncClient s3AsyncClient, + S3BlobStore blobStore, + List objectsToDelete, + CompletableFuture currentChain, + boolean ignoreIfNotExists, + Runnable afterDeleteAction + ) { + List> batches = createDeleteBatches(objectsToDelete, blobStore.getBulkDeletesSize()); + CompletableFuture newChain = currentChain.thenCompose( + v -> executeDeleteBatches(s3AsyncClient, blobStore, batches, ignoreIfNotExists) + ); + if (afterDeleteAction != null) { + newChain = newChain.thenRun(afterDeleteAction); + } + return newChain; + } + + static List> createDeleteBatches(List keys, int bulkDeleteSize) { + List> batches = new ArrayList<>(); + for (int i = 0; i < keys.size(); i += bulkDeleteSize) { + batches.add(keys.subList(i, Math.min(keys.size(), i + bulkDeleteSize))); + } + return batches; + } + + private static CompletableFuture executeDeleteBatches( + S3AsyncClient s3AsyncClient, + S3BlobStore blobStore, + List> batches, + boolean ignoreIfNotExists + ) { + CompletableFuture allDeletesFuture = CompletableFuture.completedFuture(null); + + for (List batch : batches) { + allDeletesFuture = allDeletesFuture.thenCompose( + v -> executeSingleDeleteBatch(s3AsyncClient, blobStore, batch, ignoreIfNotExists) + ); + } + + return allDeletesFuture; + } + + private static CompletableFuture executeSingleDeleteBatch( + S3AsyncClient s3AsyncClient, + S3BlobStore blobStore, + List batch, + boolean ignoreIfNotExists + ) { + DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch, blobStore); + return s3AsyncClient.deleteObjects(deleteRequest) + .thenApply(response -> processDeleteResponse(response, ignoreIfNotExists)) + .exceptionally(e -> { + if (!ignoreIfNotExists) { + throw new CompletionException(e); + } + logger.warn("Error during batch deletion", e); + return null; + }); + } + + private static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse, boolean ignoreIfNotExists) { + if (!deleteObjectsResponse.errors().isEmpty()) { + if (ignoreIfNotExists) { + logger.warn( + () -> new ParameterizedMessage( + "Failed to delete some blobs {}", + deleteObjectsResponse.errors() + .stream() + .map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]") + .collect(Collectors.toList()) + ) + ); + } else { + throw new CompletionException(new IOException("Failed to delete some blobs: " + deleteObjectsResponse.errors())); + } + } + return null; + } + + private static DeleteObjectsRequest bulkDelete(String bucket, List blobs, S3BlobStore blobStore) { + return DeleteObjectsRequest.builder() + .bucket(bucket) + .delete( + Delete.builder() + .objects(blobs.stream().map(blob -> ObjectIdentifier.builder().key(blob).build()).collect(Collectors.toList())) + .quiet(true) + .build() + ) + .overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().deleteObjectsMetricPublisher)) + .build(); + } +} diff --git a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java index 848c76fb8b68a..c5f98903db492 100644 --- a/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java +++ b/plugins/repository-s3/src/main/java/org/opensearch/repositories/s3/S3BlobContainer.java @@ -106,7 +106,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Function; import java.util.stream.Collectors; @@ -887,7 +886,6 @@ public void deleteAsync(ActionListener completionListener) { S3AsyncClient s3AsyncClient = asyncClientReference.get().client(); ListObjectsV2Request listRequest = ListObjectsV2Request.builder().bucket(blobStore.bucket()).prefix(keyPath).build(); - ListObjectsV2Publisher listPublisher = s3AsyncClient.listObjectsV2Paginator(listRequest); AtomicLong deletedBlobs = new AtomicLong(); @@ -922,8 +920,9 @@ public void onNext(ListObjectsV2Response response) { List batchToDelete = new ArrayList<>(objectsToDelete.subList(0, itemsToDelete)); objectsToDelete.subList(0, itemsToDelete).clear(); - deletionChain = executeDeleteChain( + deletionChain = S3AsyncDeleteHelper.executeDeleteChain( s3AsyncClient, + blobStore, batchToDelete, deletionChain, false, @@ -942,9 +941,15 @@ public void onError(Throwable t) { @Override public void onComplete() { if (!objectsToDelete.isEmpty()) { - deletionChain = executeDeleteChain(s3AsyncClient, objectsToDelete, deletionChain, false, null); + deletionChain = S3AsyncDeleteHelper.executeDeleteChain( + s3AsyncClient, + blobStore, + objectsToDelete, + deletionChain, + false, + null + ); } - deletionChain.whenComplete((v, throwable) -> { if (throwable != null) { listingFuture.completeExceptionally(throwable); @@ -983,87 +988,22 @@ public void deleteBlobsAsyncIgnoringIfNotExists(List blobNames, ActionLi List keysToDelete = blobNames.stream().map(this::buildKey).collect(Collectors.toList()); - executeDeleteChain(s3AsyncClient, keysToDelete, CompletableFuture.completedFuture(null), true, null).whenComplete( - (v, throwable) -> { - if (throwable != null) { - completionListener.onFailure(new IOException("Failed to delete blobs", throwable)); - } else { - completionListener.onResponse(null); - } + S3AsyncDeleteHelper.executeDeleteChain( + s3AsyncClient, + blobStore, + keysToDelete, + CompletableFuture.completedFuture(null), + true, + null + ).whenComplete((v, throwable) -> { + if (throwable != null) { + completionListener.onFailure(new IOException("Failed to delete blobs", throwable)); + } else { + completionListener.onResponse(null); } - ); + }); } catch (Exception e) { completionListener.onFailure(new IOException("Failed to initiate async blob deletion", e)); } } - - private CompletableFuture executeDeleteChain( - S3AsyncClient s3AsyncClient, - List objectsToDelete, - CompletableFuture currentChain, - boolean ignoreIfNotExists, - Runnable afterDeleteAction - ) { - List> batches = createDeleteBatches(objectsToDelete); - CompletableFuture newChain = currentChain.thenCompose(v -> executeDeleteBatches(s3AsyncClient, batches, ignoreIfNotExists)); - if (afterDeleteAction != null) { - newChain = newChain.thenRun(afterDeleteAction); - } - return newChain; - } - - private List> createDeleteBatches(List keys) { - int bulkDeleteSize = blobStore.getBulkDeletesSize(); - List> batches = new ArrayList<>(); - for (int i = 0; i < keys.size(); i += bulkDeleteSize) { - batches.add(keys.subList(i, Math.min(keys.size(), i + bulkDeleteSize))); - } - return batches; - } - - private CompletableFuture executeDeleteBatches( - S3AsyncClient s3AsyncClient, - List> batches, - boolean ignoreIfNotExists - ) { - CompletableFuture allDeletesFuture = CompletableFuture.completedFuture(null); - - for (List batch : batches) { - allDeletesFuture = allDeletesFuture.thenCompose(v -> executeSingleDeleteBatch(s3AsyncClient, batch, ignoreIfNotExists)); - } - - return allDeletesFuture; - } - - private CompletableFuture executeSingleDeleteBatch(S3AsyncClient s3AsyncClient, List batch, boolean ignoreIfNotExists) { - DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch); - return s3AsyncClient.deleteObjects(deleteRequest) - .thenApply(response -> processDeleteResponse(response, ignoreIfNotExists)) - .exceptionally(e -> { - if (!ignoreIfNotExists) { - throw new CompletionException(e); - } - logger.warn("Error during batch deletion", e); - return null; - }); - } - - private Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse, boolean ignoreIfNotExists) { - if (!deleteObjectsResponse.errors().isEmpty()) { - if (ignoreIfNotExists) { - logger.warn( - () -> new ParameterizedMessage( - "Failed to delete some blobs {}", - deleteObjectsResponse.errors() - .stream() - .map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]") - .collect(Collectors.toList()) - ) - ); - } else { - throw new CompletionException(new IOException("Failed to delete some blobs: " + deleteObjectsResponse.errors())); - } - } - return null; - } }