From bc006dcd9202a918a7b69035e20fecb679806dfd Mon Sep 17 00:00:00 2001 From: Shivam Malhotra Date: Wed, 22 May 2024 11:02:05 -0500 Subject: [PATCH] Added fix for NPE when reading from S3 --- .../io/deephaven/extensions/s3/S3Request.java | 27 +++++++++---------- 1 file changed, 12 insertions(+), 15 deletions(-) diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java index e2a79ba296d..d031b5c8c11 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3Request.java @@ -29,7 +29,6 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.function.BiConsumer; /** @@ -92,12 +91,6 @@ public boolean equals(Object obj) { private static final Logger log = LoggerFactory.getLogger(S3Request.class); - private static final int REQUEST_NOT_SENT = 0; - private static final int REQUEST_SENT = 1; - private volatile int requestSent = REQUEST_NOT_SENT; - private static final AtomicIntegerFieldUpdater REQUEST_SENT_UPDATER = - AtomicIntegerFieldUpdater.newUpdater(S3Request.class, "requestSent"); - private final S3Uri s3Uri; private final ID id; private final S3Instructions instructions; @@ -106,7 +99,7 @@ public boolean equals(Object obj) { private final long from; private final long to; private final Instant createdAt; - private CompletableFuture consumerFuture; + private volatile CompletableFuture consumerFuture; private volatile CompletableFuture producerFuture; private int fillCount; private long fillBytes; @@ -169,14 +162,18 @@ AcquiredRequest tryAcquire() { * Send the request to the S3 service. This method is idempotent and can be called multiple times. */ void sendRequest() { - if (!REQUEST_SENT_UPDATER.compareAndSet(this, REQUEST_NOT_SENT, REQUEST_SENT)) { - return; - } - if (log.isDebugEnabled()) { - log.debug().append("Sending: ").append(requestStr()).endl(); + if (consumerFuture == null) { + synchronized (this) { + if (consumerFuture == null) { + if (log.isDebugEnabled()) { + log.debug().append("Sending: ").append(requestStr()).endl(); + } + final CompletableFuture ret = client.getObject(getObjectRequest(), this); + ret.whenComplete(this); + consumerFuture = ret; + } + } } - consumerFuture = client.getObject(getObjectRequest(), this); - consumerFuture.whenComplete(this); } boolean isDone() {