Skip to content

Commit

Permalink
Minor refactor
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 9, 2024
1 parent 5e74661 commit ffc81c0
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.core.common.unit.ByteSizeUnit;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.repositories.s3.async.S3AsyncDeleteHelper;
import org.opensearch.repositories.s3.async.SizeBasedBlockingQ;
import org.opensearch.repositories.s3.async.UploadRequest;
import org.opensearch.repositories.s3.utils.HttpRangeUtils;
Expand Down Expand Up @@ -893,7 +894,7 @@ public void deleteAsync(ActionListener<DeleteResult> completionListener) {

CompletableFuture<Void> listingFuture = new CompletableFuture<>();

listPublisher.subscribe(new Subscriber<ListObjectsV2Response>() {
listPublisher.subscribe(new Subscriber<>() {
private Subscription subscription;
private final List<String> objectsToDelete = new ArrayList<>();
private CompletableFuture<Void> deletionChain = CompletableFuture.completedFuture(null);
Expand Down Expand Up @@ -925,7 +926,6 @@ public void onNext(ListObjectsV2Response response) {
blobStore,
batchToDelete,
deletionChain,
false,
() -> subscription.request(1)
);
} else {
Expand All @@ -946,7 +946,6 @@ public void onComplete() {
blobStore,
objectsToDelete,
deletionChain,
false,
null
);
}
Expand Down Expand Up @@ -988,20 +987,14 @@ public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionLi

List<String> keysToDelete = blobNames.stream().map(this::buildKey).collect(Collectors.toList());

S3AsyncDeleteHelper.executeDeleteChain(
s3AsyncClient,
blobStore,
keysToDelete,
CompletableFuture.completedFuture(null),
true,
null
).whenComplete((v, throwable) -> {
if (throwable != null) {
completionListener.onFailure(new IOException("Failed to delete blobs", throwable));
} else {
completionListener.onResponse(null);
}
});
S3AsyncDeleteHelper.executeDeleteChain(s3AsyncClient, blobStore, keysToDelete, CompletableFuture.completedFuture(null), null)
.whenComplete((v, throwable) -> {
if (throwable != null) {
completionListener.onFailure(new IOException("Failed to delete blobs", throwable));
} else {
completionListener.onResponse(null);
}
});
} catch (Exception e) {
completionListener.onFailure(new IOException("Failed to initiate async blob deletion", e));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
import static org.opensearch.repositories.s3.S3Repository.STORAGE_CLASS_SETTING;
import static org.opensearch.repositories.s3.S3Repository.UPLOAD_RETRY_ENABLED;

class S3BlobStore implements BlobStore {
public class S3BlobStore implements BlobStore {

private static final Logger logger = LogManager.getLogger(S3BlobStore.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
* compatible open source license.
*/

package org.opensearch.repositories.s3;
package org.opensearch.repositories.s3.async;

import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.Delete;
Expand All @@ -17,29 +17,25 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.repositories.s3.S3BlobStore;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.stream.Collectors;

public class S3AsyncDeleteHelper {
private static final Logger logger = LogManager.getLogger(S3AsyncDeleteHelper.class);

static CompletableFuture<Void> executeDeleteChain(
public static CompletableFuture<Void> executeDeleteChain(
S3AsyncClient s3AsyncClient,
S3BlobStore blobStore,
List<String> objectsToDelete,
CompletableFuture<Void> currentChain,
boolean ignoreIfNotExists,
Runnable afterDeleteAction
) {
List<List<String>> batches = createDeleteBatches(objectsToDelete, blobStore.getBulkDeletesSize());
CompletableFuture<Void> newChain = currentChain.thenCompose(
v -> executeDeleteBatches(s3AsyncClient, blobStore, batches, ignoreIfNotExists)
);
CompletableFuture<Void> newChain = currentChain.thenCompose(v -> executeDeleteBatches(s3AsyncClient, blobStore, batches));
if (afterDeleteAction != null) {
newChain = newChain.thenRun(afterDeleteAction);
}
Expand All @@ -57,15 +53,12 @@ static List<List<String>> createDeleteBatches(List<String> keys, int bulkDeleteS
private static CompletableFuture<Void> executeDeleteBatches(
S3AsyncClient s3AsyncClient,
S3BlobStore blobStore,
List<List<String>> batches,
boolean ignoreIfNotExists
List<List<String>> batches
) {
CompletableFuture<Void> allDeletesFuture = CompletableFuture.completedFuture(null);

for (List<String> batch : batches) {
allDeletesFuture = allDeletesFuture.thenCompose(
v -> executeSingleDeleteBatch(s3AsyncClient, blobStore, batch, ignoreIfNotExists)
);
allDeletesFuture = allDeletesFuture.thenCompose(v -> executeSingleDeleteBatch(s3AsyncClient, blobStore, batch));
}

return allDeletesFuture;
Expand All @@ -74,36 +67,23 @@ private static CompletableFuture<Void> executeDeleteBatches(
private static CompletableFuture<Void> executeSingleDeleteBatch(
S3AsyncClient s3AsyncClient,
S3BlobStore blobStore,
List<String> batch,
boolean ignoreIfNotExists
List<String> batch
) {
DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch, blobStore);
return s3AsyncClient.deleteObjects(deleteRequest)
.thenApply(response -> processDeleteResponse(response, ignoreIfNotExists))
.exceptionally(e -> {
if (!ignoreIfNotExists) {
throw new CompletionException(e);
}
logger.warn("Error during batch deletion", e);
return null;
});
return s3AsyncClient.deleteObjects(deleteRequest).thenApply(S3AsyncDeleteHelper::processDeleteResponse);
}

private static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse, boolean ignoreIfNotExists) {
private static Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse) {
if (!deleteObjectsResponse.errors().isEmpty()) {
if (ignoreIfNotExists) {
logger.warn(
() -> new ParameterizedMessage(
"Failed to delete some blobs {}",
deleteObjectsResponse.errors()
.stream()
.map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]")
.collect(Collectors.toList())
)
);
} else {
throw new CompletionException(new IOException("Failed to delete some blobs: " + deleteObjectsResponse.errors()));
}
logger.warn(
() -> new ParameterizedMessage(
"Failed to delete some blobs {}",
deleteObjectsResponse.errors()
.stream()
.map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]")
.collect(Collectors.toList())
)
);
}
return null;
}
Expand Down

0 comments on commit ffc81c0

Please sign in to comment.