Skip to content

Commit

Permalink
Add support for async deletion in S3BlobContainer
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 3, 2024
1 parent c62626d commit 7822739
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import software.amazon.awssdk.services.s3.model.UploadPartRequest;
import software.amazon.awssdk.services.s3.model.UploadPartResponse;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable;
import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Publisher;
import software.amazon.awssdk.utils.CollectionUtils;

import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -105,10 +106,14 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE;
import static org.opensearch.repositories.s3.S3Repository.MAX_FILE_SIZE_USING_MULTIPART;
import static org.opensearch.repositories.s3.S3Repository.MIN_PART_SIZE_USING_MULTIPART;
Expand Down Expand Up @@ -875,4 +880,190 @@ CompletableFuture<GetObjectAttributesResponse> getBlobMetadata(S3AsyncClient s3A

return SocketAccess.doPrivileged(() -> s3AsyncClient.getObjectAttributes(getObjectAttributesRequest));
}

@Override
public void deleteAsync(ActionListener<DeleteResult> completionListener) {
try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) {
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();

ListObjectsV2Request listRequest = ListObjectsV2Request.builder().bucket(blobStore.bucket()).prefix(keyPath).build();

ListObjectsV2Publisher listPublisher = s3AsyncClient.listObjectsV2Paginator(listRequest);

AtomicLong deletedBlobs = new AtomicLong();
AtomicLong deletedBytes = new AtomicLong();

CompletableFuture<Void> listingFuture = new CompletableFuture<>();

listPublisher.subscribe(new Subscriber<ListObjectsV2Response>() {
private Subscription subscription;
private final List<String> objectsToDelete = new ArrayList<>();
private CompletableFuture<Void> deletionChain = CompletableFuture.completedFuture(null);

@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(1);
}

@Override
public void onNext(ListObjectsV2Response response) {
response.contents().forEach(s3Object -> {
deletedBlobs.incrementAndGet();
deletedBytes.addAndGet(s3Object.size());
objectsToDelete.add(s3Object.key());
});

int bulkDeleteSize = blobStore.getBulkDeletesSize();
if (objectsToDelete.size() >= bulkDeleteSize) {
int fullBatchesCount = objectsToDelete.size() / bulkDeleteSize;
int itemsToDelete = fullBatchesCount * bulkDeleteSize;

List<String> batchToDelete = new ArrayList<>(objectsToDelete.subList(0, itemsToDelete));
objectsToDelete.subList(0, itemsToDelete).clear();

deletionChain = executeDeleteChain(
s3AsyncClient,
batchToDelete,
deletionChain,
false,
() -> subscription.request(1)
);
} else {
subscription.request(1);
}
}

@Override
public void onError(Throwable t) {
listingFuture.completeExceptionally(new IOException("Failed to list objects for deletion", t));
}

@Override
public void onComplete() {
if (!objectsToDelete.isEmpty()) {
deletionChain = executeDeleteChain(s3AsyncClient, objectsToDelete, deletionChain, false, null);
}

deletionChain.whenComplete((v, throwable) -> {
if (throwable != null) {
listingFuture.completeExceptionally(throwable);
} else {
listingFuture.complete(null);
}
});
}
});

listingFuture.whenComplete((v, throwable) -> {
if (throwable != null) {
completionListener.onFailure(
throwable instanceof Exception
? (Exception) throwable
: new IOException("Unexpected error during async deletion", throwable)
);
} else {
completionListener.onResponse(new DeleteResult(deletedBlobs.get(), deletedBytes.get()));
}
});
} catch (Exception e) {
completionListener.onFailure(new IOException("Failed to initiate async deletion", e));
}
}

@Override
public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener) {
if (blobNames.isEmpty()) {
completionListener.onResponse(null);
return;
}

try (AmazonAsyncS3Reference asyncClientReference = blobStore.asyncClientReference()) {
S3AsyncClient s3AsyncClient = asyncClientReference.get().client();

List<String> keysToDelete = blobNames.stream().map(this::buildKey).collect(Collectors.toList());

executeDeleteChain(s3AsyncClient, keysToDelete, CompletableFuture.completedFuture(null), true, null).whenComplete(
(v, throwable) -> {
if (throwable != null) {
completionListener.onFailure(new IOException("Failed to delete blobs", throwable));
} else {
completionListener.onResponse(null);
}
}
);
} catch (Exception e) {
completionListener.onFailure(new IOException("Failed to initiate async blob deletion", e));
}
}

private CompletableFuture<Void> executeDeleteChain(
S3AsyncClient s3AsyncClient,
List<String> objectsToDelete,
CompletableFuture<Void> currentChain,
boolean ignoreIfNotExists,
Runnable afterDeleteAction
) {
List<List<String>> batches = createDeleteBatches(objectsToDelete);
CompletableFuture<Void> newChain = currentChain.thenCompose(v -> executeDeleteBatches(s3AsyncClient, batches, ignoreIfNotExists));
if (afterDeleteAction != null) {
newChain = newChain.thenRun(afterDeleteAction);
}
return newChain;
}

private List<List<String>> createDeleteBatches(List<String> keys) {
int bulkDeleteSize = blobStore.getBulkDeletesSize();
List<List<String>> batches = new ArrayList<>();
for (int i = 0; i < keys.size(); i += bulkDeleteSize) {
batches.add(keys.subList(i, Math.min(keys.size(), i + bulkDeleteSize)));
}
return batches;
}

private CompletableFuture<Void> executeDeleteBatches(
S3AsyncClient s3AsyncClient,
List<List<String>> batches,
boolean ignoreIfNotExists
) {
CompletableFuture<Void> allDeletesFuture = CompletableFuture.completedFuture(null);

for (List<String> batch : batches) {
allDeletesFuture = allDeletesFuture.thenCompose(v -> executeSingleDeleteBatch(s3AsyncClient, batch, ignoreIfNotExists));
}

return allDeletesFuture;
}

private CompletableFuture<Void> executeSingleDeleteBatch(S3AsyncClient s3AsyncClient, List<String> batch, boolean ignoreIfNotExists) {
DeleteObjectsRequest deleteRequest = bulkDelete(blobStore.bucket(), batch);
return s3AsyncClient.deleteObjects(deleteRequest)
.thenApply(response -> processDeleteResponse(response, ignoreIfNotExists))
.exceptionally(e -> {
if (!ignoreIfNotExists) {
throw new CompletionException(e);
}
logger.warn("Error during batch deletion", e);
return null;
});
}

private Void processDeleteResponse(DeleteObjectsResponse deleteObjectsResponse, boolean ignoreIfNotExists) {
if (!deleteObjectsResponse.errors().isEmpty()) {
if (ignoreIfNotExists) {
logger.warn(
() -> new ParameterizedMessage(
"Failed to delete some blobs {}",
deleteObjectsResponse.errors()
.stream()
.map(s3Error -> "[" + s3Error.key() + "][" + s3Error.code() + "][" + s3Error.message() + "]")
.collect(Collectors.toList())
)
);
} else {
throw new CompletionException(new IOException("Failed to delete some blobs: " + deleteObjectsResponse.errors()));
}
}
return null;
}
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@

/*
* SPDX-License-Identifier: Apache-2.0
*
Expand All @@ -12,6 +13,7 @@
import org.opensearch.common.StreamContext;
import org.opensearch.common.blobstore.AsyncMultiStreamBlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.blobstore.stream.read.ReadContext;
Expand Down Expand Up @@ -146,4 +148,14 @@ public boolean remoteIntegrityCheckSupported() {
private boolean isSegmentFile(String filename) {
return !filename.endsWith(".tlog") && !filename.endsWith(".ckp");
}

@Override
public void deleteAsync(ActionListener<DeleteResult> completionListener) {
throw new UnsupportedOperationException("deleteAsync");
}

@Override
public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener) {
throw new UnsupportedOperationException("deleteBlobsAsyncIgnoringIfNotExists");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.opensearch.core.action.ActionListener;

import java.io.IOException;
import java.util.List;

/**
* An extension of {@link BlobContainer} that adds {@link AsyncMultiStreamBlobContainer#asyncBlobUpload} to allow
Expand Down Expand Up @@ -48,4 +49,8 @@ public interface AsyncMultiStreamBlobContainer extends BlobContainer {
* by underlying blobContainer. In this case, caller doesn't need to ensure integrity of data.
*/
boolean remoteIntegrityCheckSupported();

void deleteAsync(ActionListener<DeleteResult> completionListener);

void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener);
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,4 +171,14 @@ private InputStreamContainer decryptInputStreamContainer(InputStreamContainer in
return new InputStreamContainer(decryptedStream, adjustedLength, adjustedPos);
}
}

@Override
public void deleteAsync(ActionListener<DeleteResult> completionListener) {
blobContainer.deleteAsync(completionListener);
}

@Override
public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener) {
blobContainer.deleteBlobsAsyncIgnoringIfNotExists(blobNames, completionListener);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.blobstore.stream.read.ReadContext;
Expand Down Expand Up @@ -51,6 +52,7 @@
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -340,5 +342,15 @@ public boolean remoteIntegrityCheckSupported() {
public BlobContainer getDelegate() {
return delegate;
}

@Override
public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener) {
throw new RuntimeException("deleteBlobsAsyncIgnoringIfNotExists not supported");
}

@Override
public void deleteAsync(ActionListener<DeleteResult> completionListener) {
throw new RuntimeException("deleteAsync not supported");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.common.blobstore.BlobMetadata;
import org.opensearch.common.blobstore.BlobPath;
import org.opensearch.common.blobstore.BlobStore;
import org.opensearch.common.blobstore.DeleteResult;
import org.opensearch.common.blobstore.fs.FsBlobContainer;
import org.opensearch.common.blobstore.fs.FsBlobStore;
import org.opensearch.common.blobstore.stream.read.ReadContext;
Expand All @@ -63,6 +64,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

Expand Down Expand Up @@ -335,5 +337,15 @@ public boolean remoteIntegrityCheckSupported() {
public BlobContainer getDelegate() {
return delegate;
}

@Override
public void deleteAsync(ActionListener<DeleteResult> completionListener) {
throw new RuntimeException("deleteAsync not supported");
}

@Override
public void deleteBlobsAsyncIgnoringIfNotExists(List<String> blobNames, ActionListener<Void> completionListener) {
throw new RuntimeException("deleteBlobsAsyncIgnoringIfNotExists not supported");
}
}
}

0 comments on commit 7822739

Please sign in to comment.