Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(s3stream): implement AcknowledgmentService #1470

Merged
merged 2 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ public class AwsObjectStorage extends AbstractObjectStorage {
private final S3AsyncClient writeS3Client;
private boolean deleteObjectsReturnSuccessKeys;


public AwsObjectStorage(String endpoint, Map<String, String> tagging, String region, String bucket, boolean forcePathStyle,
public AwsObjectStorage(String endpoint, Map<String, String> tagging, String region, String bucket,
boolean forcePathStyle,
List<AwsCredentialsProvider> credentialsProviders,
AsyncNetworkBandwidthLimiter networkInboundBandwidthLimiter,
AsyncNetworkBandwidthLimiter networkOutboundBandwidthLimiter,
Expand Down Expand Up @@ -105,6 +105,77 @@ public static Builder builder() {
return new Builder();
}

static void handleDeleteObjectsResponse(DeleteObjectsResponse response,
boolean deleteObjectsReturnSuccessKeys) throws Exception {
int errDeleteCount = 0;
ArrayList<String> failedKeys = new ArrayList<>();
ArrayList<String> errorsMessages = new ArrayList<>();
if (deleteObjectsReturnSuccessKeys) {
// expect NoSuchKey is not response because s3 api won't return this in errors.
for (S3Error error : response.errors()) {
if (errDeleteCount < 30) {
LOGGER.error("Delete objects for key [{}] error code [{}] message [{}]",
error.key(), error.code(), error.message());
}
failedKeys.add(error.key());
errorsMessages.add(error.message());
errDeleteCount++;

}
} else {
for (S3Error error : response.errors()) {
if (S3_API_NO_SUCH_KEY.equals(error.code())) {
// ignore for delete objects.
continue;
}
if (errDeleteCount < 30) {
LOGGER.error("Delete objects for key [{}] error code [{}] message [{}]",
error.key(), error.code(), error.message());
}
failedKeys.add(error.key());
errorsMessages.add(error.message());
errDeleteCount++;
}
}
if (errDeleteCount > 0) {
throw new DeleteObjectsException("Failed to delete objects", failedKeys, errorsMessages);
}
}

static boolean checkIfDeleteObjectsWillReturnSuccessDeleteKeys(List<String> path, DeleteObjectsResponse resp) {
// BOS S3 API works as quiet mode
// in this mode success delete objects won't be returned.
// which could cause object not deleted in metadata.
//
// BOS doc: https://cloud.baidu.com/doc/BOS/s/tkc5twspg
// S3 doc: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_RequestBody

boolean hasDeleted = resp.hasDeleted() && !resp.deleted().isEmpty();
boolean hasErrors = resp.hasErrors() && !resp.errors().isEmpty();
boolean hasErrorsWithoutNoSuchKey = resp.errors().stream().filter(s3Error -> !S3_API_NO_SUCH_KEY.equals(s3Error.code())).count() != 0;
boolean allDeleteKeyMatch = resp.deleted().stream().map(DeletedObject::key).sorted().collect(Collectors.toList()).equals(path);

if (hasDeleted && !hasErrors && allDeleteKeyMatch) {
LOGGER.info("call deleteObjects deleteObjectKeys returned.");

return true;

} else if (!hasDeleted && !hasErrorsWithoutNoSuchKey) {
LOGGER.info("call deleteObjects but deleteObjectKeys not returned. set deleteObjectsReturnSuccessKeys = false");

return false;
}

IllegalStateException exception = new IllegalStateException();

LOGGER.error("error when check if delete objects will return success." +
" delete keys {} resp {}, requestId {},httpCode {} httpText {}",
path, resp, resp.responseMetadata().requestId(),
resp.sdkHttpResponse().statusCode(), resp.sdkHttpResponse().statusText(), exception);

throw exception;
}

@Override
void doRangeRead(String path, long start, long end,
Consumer<Throwable> failHandler, Consumer<CompositeByteBuf> successHandler) {
Expand Down Expand Up @@ -183,7 +254,8 @@ void doUploadPart(String path, String uploadId, int partNumber, ByteBuf part,
}

@Override
void doUploadPartCopy(String sourcePath, String path, long start, long end, String uploadId, int partNumber, long apiCallAttemptTimeout,
void doUploadPartCopy(String sourcePath, String path, long start, long end, String uploadId, int partNumber,
long apiCallAttemptTimeout,
Consumer<Throwable> failHandler, Consumer<ObjectStorageCompletedPart> successHandler) {
UploadPartCopyRequest request = UploadPartCopyRequest.builder().sourceBucket(bucket).sourceKey(sourcePath)
.destinationBucket(bucket).destinationKey(path).copySourceRange(range(start, end)).uploadId(uploadId).partNumber(partNumber)
Expand Down Expand Up @@ -263,46 +335,13 @@ void doClose() {
@Override
CompletableFuture<List<ObjectInfo>> doList(String prefix) {
return readS3Client.listObjectsV2(builder -> builder.bucket(bucket).prefix(prefix))
.thenApply(resp -> resp.contents().stream().map(object -> new ObjectInfo((short) 0, object.key(), object.lastModified().toEpochMilli())).collect(Collectors.toList()));
.thenApply(resp ->
resp.contents()
.stream()
.map(object -> new ObjectInfo((short) 0, object.key(), object.lastModified().toEpochMilli(), object.size()))
.collect(Collectors.toList()));
}

static void handleDeleteObjectsResponse(DeleteObjectsResponse response, boolean deleteObjectsReturnSuccessKeys) throws Exception {
int errDeleteCount = 0;
ArrayList<String> failedKeys = new ArrayList<>();
ArrayList<String> errorsMessages = new ArrayList<>();
if (deleteObjectsReturnSuccessKeys) {
// expect NoSuchKey is not response because s3 api won't return this in errors.
for (S3Error error : response.errors()) {
if (errDeleteCount < 30) {
LOGGER.error("Delete objects for key [{}] error code [{}] message [{}]",
error.key(), error.code(), error.message());
}
failedKeys.add(error.key());
errorsMessages.add(error.message());
errDeleteCount++;

}
} else {
for (S3Error error : response.errors()) {
if (S3_API_NO_SUCH_KEY.equals(error.code())) {
// ignore for delete objects.
continue;
}
if (errDeleteCount < 30) {
LOGGER.error("Delete objects for key [{}] error code [{}] message [{}]",
error.key(), error.code(), error.message());
}
failedKeys.add(error.key());
errorsMessages.add(error.message());
errDeleteCount++;
}
}
if (errDeleteCount > 0) {
throw new DeleteObjectsException("Failed to delete objects", failedKeys, errorsMessages);
}
}


private String range(long start, long end) {
if (end == -1L) {
return "bytes=" + start + "-";
Expand Down Expand Up @@ -344,40 +383,6 @@ private CompletableFuture<Boolean> asyncCheckDeleteObjectsReturnSuccessDeleteKey
.thenApply(resp -> checkIfDeleteObjectsWillReturnSuccessDeleteKeys(path, resp));
}

static boolean checkIfDeleteObjectsWillReturnSuccessDeleteKeys(List<String> path, DeleteObjectsResponse resp) {
// BOS S3 API works as quiet mode
// in this mode success delete objects won't be returned.
// which could cause object not deleted in metadata.
//
// BOS doc: https://cloud.baidu.com/doc/BOS/s/tkc5twspg
// S3 doc: https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html#API_DeleteObjects_RequestBody

boolean hasDeleted = resp.hasDeleted() && !resp.deleted().isEmpty();
boolean hasErrors = resp.hasErrors() && !resp.errors().isEmpty();
boolean hasErrorsWithoutNoSuchKey = resp.errors().stream().filter(s3Error -> !S3_API_NO_SUCH_KEY.equals(s3Error.code())).count() != 0;
boolean allDeleteKeyMatch = resp.deleted().stream().map(DeletedObject::key).sorted().collect(Collectors.toList()).equals(path);

if (hasDeleted && !hasErrors && allDeleteKeyMatch) {
LOGGER.info("call deleteObjects deleteObjectKeys returned.");

return true;

} else if (!hasDeleted && !hasErrorsWithoutNoSuchKey) {
LOGGER.info("call deleteObjects but deleteObjectKeys not returned. set deleteObjectsReturnSuccessKeys = false");

return false;
}

IllegalStateException exception = new IllegalStateException();

LOGGER.error("error when check if delete objects will return success." +
" delete keys {} resp {}, requestId {},httpCode {} httpText {}",
path, resp, resp.responseMetadata().requestId(),
resp.sdkHttpResponse().statusCode(), resp.sdkHttpResponse().statusText(), exception);

throw exception;
}

private S3AsyncClient newS3Client(String endpoint, String region, boolean forcePathStyle,
List<AwsCredentialsProvider> credentialsProviders, int maxConcurrency) {
S3AsyncClientBuilder builder = S3AsyncClient.builder().region(Region.of(region));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,11 @@ void doClose() {

@Override
CompletableFuture<List<ObjectInfo>> doList(String prefix) {
return CompletableFuture.completedFuture(storage.keySet().stream().filter(key -> key.startsWith(prefix)).map(s -> new ObjectInfo((short) 0, s, 0L)).collect(Collectors.toList()));
return CompletableFuture.completedFuture(storage.entrySet()
.stream()
.filter(entry -> entry.getKey().startsWith(prefix))
.map(entry -> new ObjectInfo((short) 0, entry.getKey(), 0L, entry.getValue().readableBytes()))
.collect(Collectors.toList()));
}

public ByteBuf get() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public interface ObjectStorage {
*/
CompletableFuture<ByteBuf> rangeRead(ReadOptions options, String objectPath, long start, long end);


// Low level API
CompletableFuture<Void> write(WriteOptions options, String objectPath, ByteBuf buf);

Expand Down Expand Up @@ -59,15 +58,21 @@ public String key() {

class ObjectInfo extends ObjectPath {
private final long timestamp;
private final long size;

public ObjectInfo(short bucket, String key, long timestamp) {
public ObjectInfo(short bucket, String key, long timestamp, long size) {
super(bucket, key);
this.timestamp = timestamp;
this.size = size;
}

public long timestamp() {
return timestamp;
}

public long size() {
return size;
}
}

class WriteOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.wal.impl;
package com.automq.stream.s3.wal.common;

import com.automq.stream.s3.wal.AppendResult;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.wal.impl;
package com.automq.stream.s3.wal.common;

import com.automq.stream.s3.wal.RecoverResult;
import io.netty.buffer.ByteBuf;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package com.automq.stream.s3.wal.exception;

public class UnmarshalException extends Exception {
public class UnmarshalException extends RuntimeException {
public UnmarshalException(String message) {
super(message);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright 2024, AutoMQ CO.,LTD.
*
* Use of this software is governed by the Business Source License
* included in the file BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

package com.automq.stream.s3.wal.exception;

public class WALFencedException extends RuntimeException {
public WALFencedException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@
import com.automq.stream.s3.trace.context.TraceContext;
import com.automq.stream.s3.wal.AppendResult;
import com.automq.stream.s3.wal.RecoverResult;
import com.automq.stream.s3.wal.WriteAheadLog;
import com.automq.stream.s3.wal.common.AppendResultImpl;
import com.automq.stream.s3.wal.common.RecordHeader;
import com.automq.stream.s3.wal.common.RecoverResultImpl;
import com.automq.stream.s3.wal.common.ShutdownType;
import com.automq.stream.s3.wal.common.WALMetadata;
import com.automq.stream.s3.wal.exception.OverCapacityException;
import com.automq.stream.s3.wal.exception.UnmarshalException;
import com.automq.stream.s3.wal.common.WALMetadata;
import com.automq.stream.s3.wal.WriteAheadLog;
import com.automq.stream.s3.wal.impl.AppendResultImpl;
import com.automq.stream.s3.wal.impl.RecoverResultImpl;
import com.automq.stream.s3.wal.util.WALCachedChannel;
import com.automq.stream.s3.wal.util.WALChannel;
import com.automq.stream.s3.wal.util.WALUtil;
Expand Down Expand Up @@ -706,21 +706,21 @@ public BlockWALService build() {
@Override
public String toString() {
return "BlockWALServiceBuilder{"
+ "blockDevicePath='" + blockDevicePath
+ ", blockDeviceCapacityWant=" + blockDeviceCapacityWant
+ ", direct=" + direct
+ ", initBufferSize=" + initBufferSize
+ ", maxBufferSize=" + maxBufferSize
+ ", ioThreadNums=" + ioThreadNums
+ ", slidingWindowInitialSize=" + slidingWindowInitialSize
+ ", slidingWindowUpperLimit=" + slidingWindowUpperLimit
+ ", slidingWindowScaleUnit=" + slidingWindowScaleUnit
+ ", blockSoftLimit=" + blockSoftLimit
+ ", writeRateLimit=" + writeRateLimit
+ ", nodeId=" + nodeId
+ ", epoch=" + epoch
+ ", recoveryMode=" + recoveryMode
+ '}';
+ "blockDevicePath='" + blockDevicePath
+ ", blockDeviceCapacityWant=" + blockDeviceCapacityWant
+ ", direct=" + direct
+ ", initBufferSize=" + initBufferSize
+ ", maxBufferSize=" + maxBufferSize
+ ", ioThreadNums=" + ioThreadNums
+ ", slidingWindowInitialSize=" + slidingWindowInitialSize
+ ", slidingWindowUpperLimit=" + slidingWindowUpperLimit
+ ", slidingWindowScaleUnit=" + slidingWindowScaleUnit
+ ", blockSoftLimit=" + blockSoftLimit
+ ", writeRateLimit=" + writeRateLimit
+ ", nodeId=" + nodeId
+ ", epoch=" + epoch
+ ", recoveryMode=" + recoveryMode
+ '}';
}
}

Expand Down Expand Up @@ -749,7 +749,7 @@ public boolean equals(Object obj) {
}
var that = (InvalidRecoverResult) obj;
return Objects.equals(this.detail, that.detail) &&
super.equals(obj);
super.equals(obj);
}

@Override
Expand Down
Loading
Loading