Skip to content

Commit

Permalink
Move helper methods to helper class
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 5, 2024
1 parent 639257a commit 1f5a9cb
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.repositories.s3;

import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.Delete;
import software.amazon.awssdk.services.s3.model.DeleteObjectsRequest;
import software.amazon.awssdk.services.s3.model.DeleteObjectsResponse;
import software.amazon.awssdk.services.s3.model.ObjectIdentifier;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;

public class S3AsyncDeleteHelper {
private static final Logger logger = LogManager.getLogger(S3AsyncDeleteHelper.class);

static CompletableFuture<Void> executeDeleteChain(
S3AsyncClient s3AsyncClient,
S3BlobStore blobStore,
List<String> objectsToDelete,
CompletableFuture<Void> currentChain,
boolean ignoreIfNotExists,
Runnable afterDeleteAction
) {
List<List<String>> batches = createDeleteBatches(objectsToDelete, blobStore.getBulkDeletesSize());
CompletableFuture<Void> newChain = currentChain.thenCompose(
v -> executeDeleteBatches(s3AsyncClient, blobStore, batches, ignoreIfNotExists)
);
if (afterDeleteAction != null) {
newChain = newChain.thenRun(afterDeleteAction);
}
return newChain;
}

static List<List<String>> createDeleteBatches(List<String> keys, int bulkDeleteSize) {
List<List<String>> batches = new ArrayList<>();
for (int i = 0; i < keys.size(); i += bulkDeleteSize) {
batches.add(keys.subList(i, Math.min(keys.size(), i + bulkDeleteSize)));
}
return batches;
}

private static CompletableFuture<Void> executeDeleteBatches(
S3AsyncClient s3AsyncClient,
S3BlobStore blobStore,
List<List<String>> batches,
boolean ignoreIfNotExists
) {
CompletableFuture<Void> allDeletesFuture = CompletableFuture.completedFuture(null);

for (List<String> batch : batches) {
allDeletesFuture = allDeletesFuture.thenCompose(
v -> executeSingleDeleteBatch(s3AsyncClient, blobStore, batch, ignoreIfNotExists)
);
}

return allDeletesFuture;
}

private static CompletableFuture<Void> executeSingleDeleteBatch(
S3AsyncClient s3AsyncClient,
S3BlobStore blobStore,
List<String> batch,
boolean ignoreIfNotExists
) {
DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch, blobStore);
return s3AsyncClient.deleteObjects(deleteRequest)
.thenApply(response -> processDeleteResponse(response, ignoreIfNotExists))
.exceptionally(e -> {
if (!ignoreIfNotExists) {
throw new CompletionException(e);
}
logger.warn("Error during batch deletion", e);
return null;
});
}

private static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse, boolean ignoreIfNotExists) {
if (!deleteObjectsResponse.errors().isEmpty()) {
if (ignoreIfNotExists) {
logger.warn(
() -> new ParameterizedMessage(
"Failed to delete some blobs {}",
deleteObjectsResponse.errors()
.stream()
.map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]")
.collect(Collectors.toList())
)
);
} else {
throw new CompletionException(new IOException("Failed to delete some blobs: " + deleteObjectsResponse.errors()));
}
}
return null;
}

private static DeleteObjectsRequest bulkDelete(String bucket, List<String> blobs, S3BlobStore blobStore) {
return DeleteObjectsRequest.builder()
.bucket(bucket)
.delete(
Delete.builder()
.objects(blobs.stream().map(blob -> ObjectIdentifier.builder().key(blob).build()).collect(Collectors.toList()))
.quiet(true)
.build()
)
.overrideConfiguration(o -> o.addMetricPublisher(blobStore.getStatsMetricPublisher().deleteObjectsMetricPublisher))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -887,7 +886,6 @@ public void deleteAsync(ActionListener<DeleteResult> completionListener) {
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();

ListObjectsV2Request listRequest = ListObjectsV2Request.builder().bucket(blobStore.bucket()).prefix(keyPath).build();

ListObjectsV2Publisher listPublisher = s3AsyncClient.listObjectsV2Paginator(listRequest);

AtomicLong deletedBlobs = new AtomicLong();
Expand Down Expand Up @@ -922,8 +920,9 @@ public void onNext(ListObjectsV2Response response) {
List<String> batchToDelete = new ArrayList<>(objectsToDelete.subList(0, itemsToDelete));
objectsToDelete.subList(0, itemsToDelete).clear();

deletionChain = executeDeleteChain(
deletionChain = S3AsyncDeleteHelper.executeDeleteChain(
s3AsyncClient,
blobStore,
batchToDelete,
deletionChain,
false,
Expand All @@ -942,9 +941,15 @@ public void onError(Throwable t) {
@Override
public void onComplete() {
if (!objectsToDelete.isEmpty()) {
deletionChain = executeDeleteChain(s3AsyncClient, objectsToDelete, deletionChain, false, null);
deletionChain = S3AsyncDeleteHelper.executeDeleteChain(
s3AsyncClient,
blobStore,
objectsToDelete,
deletionChain,
false,
null
);
}

deletionChain.whenComplete((v, throwable) -> {
if (throwable != null) {
listingFuture.completeExceptionally(throwable);
Expand Down Expand Up @@ -983,87 +988,22 @@ public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionLi

List<String> keysToDelete = blobNames.stream().map(this::buildKey).collect(Collectors.toList());

executeDeleteChain(s3AsyncClient, keysToDelete, CompletableFuture.completedFuture(null), true, null).whenComplete(
(v, throwable) -> {
if (throwable != null) {
completionListener.onFailure(new IOException("Failed to delete blobs", throwable));
} else {
completionListener.onResponse(null);
}
S3AsyncDeleteHelper.executeDeleteChain(
s3AsyncClient,
blobStore,
keysToDelete,
CompletableFuture.completedFuture(null),
true,
null
).whenComplete((v, throwable) -> {
if (throwable != null) {
completionListener.onFailure(new IOException("Failed to delete blobs", throwable));
} else {
completionListener.onResponse(null);
}
);
});
} catch (Exception e) {
completionListener.onFailure(new IOException("Failed to initiate async blob deletion", e));
}
}

private CompletableFuture<Void> executeDeleteChain(
S3AsyncClient s3AsyncClient,
List<String> objectsToDelete,
CompletableFuture<Void> currentChain,
boolean ignoreIfNotExists,
Runnable afterDeleteAction
) {
List<List<String>> batches = createDeleteBatches(objectsToDelete);
CompletableFuture<Void> newChain = currentChain.thenCompose(v -> executeDeleteBatches(s3AsyncClient, batches, ignoreIfNotExists));
if (afterDeleteAction != null) {
newChain = newChain.thenRun(afterDeleteAction);
}
return newChain;
}

private List<List<String>> createDeleteBatches(List<String> keys) {
int bulkDeleteSize = blobStore.getBulkDeletesSize();
List<List<String>> batches = new ArrayList<>();
for (int i = 0; i < keys.size(); i += bulkDeleteSize) {
batches.add(keys.subList(i, Math.min(keys.size(), i + bulkDeleteSize)));
}
return batches;
}

private CompletableFuture<Void> executeDeleteBatches(
S3AsyncClient s3AsyncClient,
List<List<String>> batches,
boolean ignoreIfNotExists
) {
CompletableFuture<Void> allDeletesFuture = CompletableFuture.completedFuture(null);

for (List<String> batch : batches) {
allDeletesFuture = allDeletesFuture.thenCompose(v -> executeSingleDeleteBatch(s3AsyncClient, batch, ignoreIfNotExists));
}

return allDeletesFuture;
}

private CompletableFuture<Void> executeSingleDeleteBatch(S3AsyncClient s3AsyncClient, List<String> batch, boolean ignoreIfNotExists) {
DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch);
return s3AsyncClient.deleteObjects(deleteRequest)
.thenApply(response -> processDeleteResponse(response, ignoreIfNotExists))
.exceptionally(e -> {
if (!ignoreIfNotExists) {
throw new CompletionException(e);
}
logger.warn("Error during batch deletion", e);
return null;
});
}

private Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse, boolean ignoreIfNotExists) {
if (!deleteObjectsResponse.errors().isEmpty()) {
if (ignoreIfNotExists) {
logger.warn(
() -> new ParameterizedMessage(
"Failed to delete some blobs {}",
deleteObjectsResponse.errors()
.stream()
.map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]")
.collect(Collectors.toList())
)
);
} else {
throw new CompletionException(new IOException("Failed to delete some blobs: " + deleteObjectsResponse.errors()));
}
}
return null;
}
}

0 comments on commit 1f5a9cb

Please sign in to comment.