Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache size of files fetched from S3 #5545

Merged
merged 1 commit into from
May 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ final class S3ChannelContext extends BaseSeekableChannelContext implements Seeka
static final long UNINITIALIZED_SIZE = -1;
private static final long UNINITIALIZED_NUM_FRAGMENTS = -1;

final S3SeekableChannelProvider provider;
final S3AsyncClient client;
final S3Instructions instructions;

Expand Down Expand Up @@ -63,8 +64,12 @@ final class S3ChannelContext extends BaseSeekableChannelContext implements Seeka
*/
private long numFragments;

S3ChannelContext(@NotNull final S3AsyncClient client, @NotNull final S3Instructions instructions,
S3ChannelContext(
@NotNull final S3SeekableChannelProvider provider,
@NotNull final S3AsyncClient client,
@NotNull final S3Instructions instructions,
@NotNull final S3RequestCache sharedCache) {
this.provider = Objects.requireNonNull(provider);
this.client = Objects.requireNonNull(client);
this.instructions = Objects.requireNonNull(instructions);
this.localCache = new S3Request.AcquiredRequest[instructions.maxCacheSize()];
Expand All @@ -88,7 +93,7 @@ void setURI(@NotNull final S3Uri uri) {
this.uri = uri;
}

void verifyOrSetSize(long size) {
void verifyOrSetSize(final long size) {
if (this.size == UNINITIALIZED_SIZE) {
setSize(size);
} else if (this.size != size) {
Expand Down Expand Up @@ -255,10 +260,12 @@ private void ensureSize() throws IOException {
} catch (final InterruptedException | ExecutionException | TimeoutException | CancellationException e) {
throw handleS3Exception(e, String.format("fetching HEAD for file %s, %s", uri, ctxStr()), instructions);
}
setSize(headObjectResponse.contentLength());
final long fileSize = headObjectResponse.contentLength();
setSize(fileSize);
provider.updateFileSizeCache(uri.uri(), fileSize);
}

private void setSize(long size) {
private void setSize(final long size) {
this.size = size;
// ceil(size / fragmentSize)
this.numFragments = (size + instructions.fragmentSize() - 1) / instructions.fragmentSize();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,12 @@ public InputStream getInputStream(final SeekableByteChannel channel) {

@Override
public SeekableChannelContext makeContext() {
return new S3ChannelContext(s3AsyncClient, s3Instructions, sharedCache);
return new S3ChannelContext(this, s3AsyncClient, s3Instructions, sharedCache);
}

@Override
public SeekableChannelContext makeSingleUseContext() {
return new S3ChannelContext(s3AsyncClient, s3Instructions.singleUse(), sharedCache);
return new S3ChannelContext(this, s3AsyncClient, s3Instructions.singleUse(), sharedCache);
}

@Override
Expand Down Expand Up @@ -208,7 +208,7 @@ private void fetchNextBatch() throws IOException {
+ s3Object.key() + " and bucket " + bucketName + " inside directory "
+ directory, e);
}
updateFileSizeCache(getFileSizeCache(), uri, s3Object.size());
updateFileSizeCache(uri, s3Object.size());
return uri;
}).iterator();
// The following token is null when the last batch is fetched.
Expand All @@ -235,12 +235,10 @@ private Map<URI, FileSizeInfo> getFileSizeCache() {
}

/**
* Update the given file size cache with the given URI and size.
* Cache the file size for the given URI.
*/
private static void updateFileSizeCache(
@NotNull final Map<URI, FileSizeInfo> fileSizeCache,
@NotNull final URI uri,
final long size) {
void updateFileSizeCache(@NotNull final URI uri, final long size) {
final Map<URI, FileSizeInfo> fileSizeCache = getFileSizeCache();
fileSizeCache.compute(uri, (key, existingInfo) -> {
if (existingInfo == null) {
return new FileSizeInfo(uri, size);
Expand Down
Loading