Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added fix for NPE when reading from S3 #5514

Merged
merged 1 commit into from
May 22, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiConsumer;

/**
Expand Down Expand Up @@ -92,12 +91,6 @@ public boolean equals(Object obj) {

private static final Logger log = LoggerFactory.getLogger(S3Request.class);

private static final int REQUEST_NOT_SENT = 0;
private static final int REQUEST_SENT = 1;
private volatile int requestSent = REQUEST_NOT_SENT;
private static final AtomicIntegerFieldUpdater<S3Request> REQUEST_SENT_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(S3Request.class, "requestSent");

private final S3Uri s3Uri;
private final ID id;
private final S3Instructions instructions;
Expand All @@ -106,7 +99,7 @@ public boolean equals(Object obj) {
private final long from;
private final long to;
private final Instant createdAt;
private CompletableFuture<Boolean> consumerFuture;
private volatile CompletableFuture<Boolean> consumerFuture;
private volatile CompletableFuture<Boolean> producerFuture;
private int fillCount;
private long fillBytes;
Expand Down Expand Up @@ -169,14 +162,18 @@ AcquiredRequest tryAcquire() {
* Send the request to the S3 service. This method is idempotent and can be called multiple times.
*/
void sendRequest() {
if (!REQUEST_SENT_UPDATER.compareAndSet(this, REQUEST_NOT_SENT, REQUEST_SENT)) {
return;
}
if (log.isDebugEnabled()) {
log.debug().append("Sending: ").append(requestStr()).endl();
if (consumerFuture == null) {
synchronized (this) {
if (consumerFuture == null) {
if (log.isDebugEnabled()) {
log.debug().append("Sending: ").append(requestStr()).endl();
}
final CompletableFuture<Boolean> ret = client.getObject(getObjectRequest(), this);
ret.whenComplete(this);
consumerFuture = ret;
}
}
}
consumerFuture = client.getObject(getObjectRequest(), this);
consumerFuture.whenComplete(this);
}

boolean isDone() {
Expand Down
Loading