Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BREAKING] Fix for OOM error when reading from S3 #5613

Merged
merged 11 commits into from
Jun 21, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,13 @@
import io.deephaven.util.channel.SeekableChannelContext;
import io.deephaven.util.channel.BaseSeekableChannelContext;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.S3Uri;
import software.amazon.awssdk.services.s3.model.HeadObjectRequest;
import software.amazon.awssdk.services.s3.model.HeadObjectResponse;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
Expand All @@ -32,7 +30,7 @@ final class S3ChannelContext extends BaseSeekableChannelContext implements Seeka
static final long UNINITIALIZED_SIZE = -1;
private static final long UNINITIALIZED_NUM_FRAGMENTS = -1;

final S3SeekableChannelProvider provider;
private final S3SeekableChannelProvider provider;
final S3AsyncClient client;
final S3Instructions instructions;

Expand All @@ -48,12 +46,6 @@ final class S3ChannelContext extends BaseSeekableChannelContext implements Seeka
*/
final S3RequestCache sharedCache;

/**
* Used to cache recently fetched fragments as well as the ownership token for the request. This cache is local to
* the context and is used to keep the requests alive as long as the context is alive.
*/
private final S3Request.AcquiredRequest[] localCache;

/**
* The size of the object in bytes, stored in context to avoid fetching multiple times
*/
Expand All @@ -72,7 +64,6 @@ final class S3ChannelContext extends BaseSeekableChannelContext implements Seeka
this.provider = Objects.requireNonNull(provider);
this.client = Objects.requireNonNull(client);
this.instructions = Objects.requireNonNull(instructions);
this.localCache = new S3Request.AcquiredRequest[instructions.maxCacheSize()];
this.sharedCache = sharedCache;
if (sharedCache.getFragmentSize() != instructions.fragmentSize()) {
throw new IllegalArgumentException("Fragment size mismatch between shared cache and instructions, "
Expand Down Expand Up @@ -121,32 +112,31 @@ int fill(final long position, final ByteBuffer dest) throws IOException {
final int impliedReadAhead = (int) (lastFragmentIx - firstFragmentIx);
final int desiredReadAhead = instructions.readAheadCount();
final long totalRemainingFragments = numFragments - firstFragmentIx - 1;
final int maxReadAhead = instructions.maxCacheSize() - 1;
readAhead = Math.min(
Math.max(impliedReadAhead, desiredReadAhead),
(int) Math.min(maxReadAhead, totalRemainingFragments));
readAhead = Math.min(Math.max(impliedReadAhead, desiredReadAhead), totalRemainingFragments);
}
final S3Request firstRequest = getOrCreateRequest(firstFragmentIx);
for (int i = 0; i < readAhead; ++i) {
getOrCreateRequest(firstFragmentIx + i + 1);
int filled;
{
// Hold a reference to the first request to ensure it is not evicted from the cache
final S3Request.Acquired acquiredRequest = getOrCreateRequest(firstFragmentIx);
for (int i = 0; i < readAhead; ++i) {
// Do not hold references to the read-ahead requests
getOrCreateRequest(firstFragmentIx + i + 1);
}
// blocking
filled = acquiredRequest.fill(position, dest);
}
// blocking
int filled = firstRequest.fill(position, dest);
for (int i = 0; dest.hasRemaining(); ++i) {
// Since we have already created requests for read ahead fragments, we can retrieve them from the local
// cache
final S3Request request = getRequestFromLocalCache(firstFragmentIx + i + 1);
if (request == null || !request.isDone()) {
final S3Request.Acquired readAheadRequest = sharedCache.getRequest(uri, firstFragmentIx + i + 1);
if (readAheadRequest == null || !readAheadRequest.isDone()) {
break;
}
// non-blocking since we know isDone
filled += request.fill(position + filled, dest);
filled += readAheadRequest.fill(position + filled, dest);
}
return filled;
}

private void reset() {
releaseOutstanding();
// Reset the internal state
uri = null;
size = UNINITIALIZED_SIZE;
Expand All @@ -162,49 +152,16 @@ public void close() {
if (log.isDebugEnabled()) {
log.debug().append("Closing context: ").append(ctxStr()).endl();
}
releaseOutstanding();
}

/**
* Release all outstanding requests associated with this context. Eventually, the request will be canceled when the
* objects are garbage collected.
*/
private void releaseOutstanding() {
Arrays.fill(localCache, null);
}

// --------------------------------------------------------------------------------------------------

@Nullable
private S3Request getRequestFromLocalCache(final long fragmentIndex) {
return getRequestFromLocalCache(fragmentIndex, cacheIndex(fragmentIndex));
}

@Nullable
private S3Request getRequestFromLocalCache(final long fragmentIndex, final int cacheIdx) {
if (localCache[cacheIdx] != null && localCache[cacheIdx].request.isFragment(fragmentIndex)) {
return localCache[cacheIdx].request;
}
return null;
}

@NotNull
private S3Request getOrCreateRequest(final long fragmentIndex) {
final int cacheIdx = cacheIndex(fragmentIndex);
final S3Request locallyCached = getRequestFromLocalCache(fragmentIndex, cacheIdx);
if (locallyCached != null) {
return locallyCached;
}
final S3Request.AcquiredRequest sharedCacheRequest = sharedCache.getOrCreateRequest(uri, fragmentIndex, this);
// Cache the request and the ownership token locally
localCache[cacheIdx] = sharedCacheRequest;
private S3Request.Acquired getOrCreateRequest(final long fragmentIndex) {
final S3Request.Acquired cachedRequest = sharedCache.getOrCreateRequest(uri, fragmentIndex, this);
// Send the request, if not sent already. The following method is idempotent, so we always call it.
sharedCacheRequest.request.sendRequest();
return sharedCacheRequest.request;
}

private int cacheIndex(final long fragmentIndex) {
return (int) (fragmentIndex % instructions.maxCacheSize());
cachedRequest.send();
return cachedRequest;
}

private long fragmentIndex(final long pos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public abstract class S3Instructions implements LogOutputAppendable {
private final static int DEFAULT_READ_AHEAD_COUNT = 32;
private final static int DEFAULT_FRAGMENT_SIZE = 1 << 16; // 64 KiB
private final static int MIN_FRAGMENT_SIZE = 8 << 10; // 8 KiB
private final static int DEFAULT_MAX_CACHE_SIZE = 256;
private final static Duration DEFAULT_CONNECTION_TIMEOUT = Duration.ofSeconds(2);
private final static Duration DEFAULT_READ_TIMEOUT = Duration.ofSeconds(2);

Expand Down Expand Up @@ -73,17 +72,6 @@ public int fragmentSize() {
return DEFAULT_FRAGMENT_SIZE;
}

/**
* The maximum number of fragments to cache in memory, defaults to
* {@code Math.max(1 + readAheadCount(), DEFAULT_MAX_CACHE_SIZE)}, which is at least
* {@value #DEFAULT_MAX_CACHE_SIZE}. This caching is done at the deephaven layer for faster access to recently read
* fragments. Must be greater than or equal to {@code 1 + readAheadCount()}.
*/
@Default
public int maxCacheSize() {
return Math.max(1 + readAheadCount(), DEFAULT_MAX_CACHE_SIZE);
}

/**
* The amount of time to wait when initially establishing a connection before giving up and timing out, defaults to
* 2 seconds.
Expand Down Expand Up @@ -133,8 +121,6 @@ public interface Builder {

Builder fragmentSize(int fragmentSize);

Builder maxCacheSize(int maxCacheSize);

Builder connectionTimeout(Duration connectionTimeout);

Builder readTimeout(Duration connectionTimeout);
Expand All @@ -152,13 +138,10 @@ default Builder endpointOverride(String endpointOverride) {

abstract S3Instructions withReadAheadCount(int readAheadCount);

abstract S3Instructions withMaxCacheSize(int maxCacheSize);

@Lazy
S3Instructions singleUse() {
final int readAheadCount = Math.min(DEFAULT_READ_AHEAD_COUNT, readAheadCount());
return withReadAheadCount(readAheadCount)
.withMaxCacheSize(readAheadCount + 1);
return withReadAheadCount(readAheadCount);
}

@Check
Expand All @@ -183,14 +166,6 @@ final void boundsCheckMinFragmentSize() {
}
}

@Check
final void boundsCheckMaxCacheSize() {
if (maxCacheSize() < readAheadCount() + 1) {
throw new IllegalArgumentException("maxCacheSize(=" + maxCacheSize() + ") must be >= 1 + " +
"readAheadCount(=" + readAheadCount() + ")");
}
}

@Check
final void awsSdkV2Credentials() {
if (!(credentials() instanceof AwsSdkV2Credentials)) {
Expand Down
Loading
Loading