Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added a soft reference based shared cache for S3 reads #5357

Merged
merged 29 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
82bc5a1
Added a concurrent hash map based shared cache
malhotrashivam Apr 4, 2024
61b70ba
Moved to a soft reference based cache
malhotrashivam Apr 4, 2024
03dd512
Added KeyedLongObjectHashMap based cache
malhotrashivam Apr 8, 2024
c3394ce
Merge branch 'main' into sm-s3-cache
malhotrashivam Apr 23, 2024
c03c7f9
Review comments
malhotrashivam Apr 24, 2024
6fcb50f
Moved from a KeyedIntObjectHashMap to KeyedObjectHashMap
malhotrashivam Apr 24, 2024
e0d4025
Renamed the class
malhotrashivam Apr 24, 2024
a8994c4
Merge branch 'main' into sm-s3-cache
malhotrashivam Apr 24, 2024
0da90b5
Merge branch 'main' into sm-s3-cache
malhotrashivam May 3, 2024
322681b
Review comments
malhotrashivam May 3, 2024
b35db77
Removing unused methods
malhotrashivam May 3, 2024
db09a4a
Improved comments
malhotrashivam May 3, 2024
89d1238
Minor tweaks
malhotrashivam May 3, 2024
ca7c74d
ParquetFileReader will use a single use context
malhotrashivam May 3, 2024
ebbe505
Added debug log messages
malhotrashivam May 3, 2024
9e72124
Merge branch 'main' into sm-s3-cache
malhotrashivam May 8, 2024
f65fbf7
Review comments part 1
malhotrashivam May 10, 2024
8a607a1
Review with Ryan Part 2
malhotrashivam May 10, 2024
364f30f
Review with Ryan Part 3
malhotrashivam May 10, 2024
57b5285
Review with Ryan Part 4
malhotrashivam May 13, 2024
60a4e23
Review with Ryan Part 5
malhotrashivam May 13, 2024
06b0226
Updated the defaults for S3 instructions
malhotrashivam May 13, 2024
b716c67
Renaming some variables
malhotrashivam May 13, 2024
8173578
Removed some unnecessary includes
malhotrashivam May 13, 2024
1a6eaae
Slightly tweaked the caching algorithm
malhotrashivam May 13, 2024
7624442
Python review
malhotrashivam May 13, 2024
20c3245
Review with Ryan contd.
malhotrashivam May 13, 2024
5fb2586
More review comments
malhotrashivam May 14, 2024
c0f4192
Some more review comments
malhotrashivam May 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public boolean equals(Object obj) {
private final long from;
private final long to;
private final Instant createdAt;
private volatile CompletableFuture<Boolean> consumerFuture;
private CompletableFuture<Boolean> consumerFuture;
private volatile CompletableFuture<Boolean> producerFuture;
private int fillCount;
private long fillBytes;
Expand Down Expand Up @@ -433,7 +433,7 @@ public void cleanup() {
public void accept(final Boolean isComplete, final Throwable throwable) {
if (log.isDebugEnabled()) {
final Instant completedAt = Instant.now();
if (isComplete) {
if (Boolean.TRUE.equals(isComplete)) {
log.debug().append("Send complete: ").append(requestStr()).append(' ')
.append(Duration.between(createdAt, completedAt).toString()).endl();
} else {
Expand Down Expand Up @@ -474,12 +474,12 @@ private ByteBuffer getFullFragment() throws ExecutionException, InterruptedExcep
// apiCallTimeout.
final long readNanos = instructions.readTimeout().plusMillis(100).toNanos();
final Boolean isComplete = consumerFuture.get(readNanos, TimeUnit.NANOSECONDS);
if (!isComplete) {
if (Boolean.FALSE.equals(isComplete)) {
throw new UncheckedDeephavenException(String.format("Failed to complete request %s", requestStr()));
}
final ByteBuffer result = get();
if (result == null) {
throw new IllegalStateException(
throw new UncheckedDeephavenException(
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
String.format("Failed to acquire buffer after completion, %s", requestStr()));
}
if (result.position() != 0 || result.limit() != result.capacity() || result.limit() != requestLength()) {
Expand Down Expand Up @@ -532,6 +532,7 @@ private final class Sub implements Subscriber<ByteBuffer> {
return;
}
if (buffer.position() != 0) {
// We don't change the buffer position while writing to it, so this should never happen
localProducer.completeExceptionally(new IllegalStateException(
String.format("Buffer not empty for new subscriber, %s", requestStr())));
}
Expand All @@ -554,15 +555,15 @@ public void onSubscribe(Subscription s) {
}

@Override
public void onNext(final ByteBuffer byteBuffer) {
final ByteBuffer buffer = Request.this.get();
if (buffer == null) {
public void onNext(final ByteBuffer dataBuffer) {
final ByteBuffer resultBuffer = Request.this.get();
if (resultBuffer == null) {
localProducer.completeExceptionally(new IllegalStateException(
String.format("Failed to acquire buffer for data, %s", requestStr())));
return;
}
final int numBytes = byteBuffer.remaining();
buffer.duplicate().position(offset).put(byteBuffer);
final int numBytes = dataBuffer.remaining();
resultBuffer.duplicate().position(offset).put(dataBuffer);
offset += numBytes;
subscription.request(1);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,36 +57,37 @@ int getFragmentSize() {
* @return the request
*/
@NotNull
S3ChannelContext.Request.AcquiredRequest getOrCreateRequest(@NotNull final S3Uri uri, final long fragmentIndex,
Request.AcquiredRequest getOrCreateRequest(@NotNull final S3Uri uri, final long fragmentIndex,
@NotNull final S3ChannelContext context) {
final Request.ID key = new Request.ID(uri, fragmentIndex);
Request.AcquiredRequest newAcquiredRequest = null;
Request existingRequest = requests.get(key);
for (int retryCount = 0; retryCount < Integer.MAX_VALUE; retryCount++) {
while (true) {
final Request existingRequest = requests.get(key);
if (existingRequest != null) {
final Request.AcquiredRequest acquired = existingRequest.tryAcquire();
if (acquired != null) {
return acquired;
} else {
remove(existingRequest);
}
}
if (newAcquiredRequest == null) {
newAcquiredRequest = Request.createAndAcquire(fragmentIndex, context);
}
existingRequest = requests.putIfAbsent(key, newAcquiredRequest.request);
final boolean added;
if (existingRequest == null) {
// Ideally, we could have used ".replace" in this case as well, but KeyedObjectHashMap.replace currently
// has a bug when the key is not present in the map.
added = requests.putIfAbsent(key, newAcquiredRequest.request) == null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this fails, we can avoid get.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You found this more complicated. I still prefer to avoid an extra lookup. Maybe moot anyway, since we can't seem to trust replace right now.

        Request existingRequest = requests.get(key);
        while (true) {
            if (existingRequest != null) {
                final Request.AcquiredRequest acquired = existingRequest.tryAcquire();
                if (acquired != null) {
                    return acquired;
                }
            }
            if (newAcquiredRequest == null) {
                newAcquiredRequest = Request.createAndAcquire(fragmentIndex, context);
            }
            final boolean added;
            if (existingRequest == null) {
                added = (existingRequest = requests.putIfAbsent(key, newAcquiredRequest.request)) == null;
            } else {
                if (!(added = requests.replace(key, existingRequest, newAcquiredRequest.request))) {
                    existingRequest = requests.get(key);
                }
            }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reverted back to remove + putIfAbsent pattern and added a TODO for this with issue #5486.

} else {
added = requests.replace(key, existingRequest, newAcquiredRequest.request);
malhotrashivam marked this conversation as resolved.
Show resolved Hide resolved
}
if (added) {
if (log.isDebugEnabled()) {
log.debug().append("Added new request to cache: ").append(String.format("ctx=%d ",
System.identityHashCode(context))).append(newAcquiredRequest.request.requestStr()).endl();
}
return newAcquiredRequest;
}
}
// We have tried to add the request to the cache too many times
throw new IllegalStateException(
String.format("Failed to add request to cache: ctx=%d, uri=%s, fragmentIndex=%d",
System.identityHashCode(context), uri, fragmentIndex));
}

/**
Expand Down
Loading