Skip to content

Commit

Permalink
Move doPriv call up to handle cache evictions on scripted query
Browse files Browse the repository at this point in the history
Signed-off-by: Finn Carroll <carrofin@amazon.com>
  • Loading branch information
finnegancarroll committed Jun 17, 2024
1 parent 21d3aaa commit d7a782c
Showing 1 changed file with 34 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,20 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio

final Path key = blobFetchRequest.getFilePath();

final CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> {
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
// Doesn't exist or is closed, either way create a new one
return new DelayedCreationCachedIndexInput(fileCache, blobContainer, blobFetchRequest);
} else {
// already in the cache and ready to be used (open)
return cachedIndexInput;
}
// We need to do a privileged action here in order to fetch from remote
// and write/evict from local file cache in case this is invoked as a side
// effect of a plugin (such as a scripted search) that doesn't have the
// necessary permissions.
final CachedIndexInput cacheEntry = AccessController.doPrivileged((PrivilegedAction<CachedIndexInput>) () -> {
return fileCache.compute(key, (path, cachedIndexInput) -> {
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
// Doesn't exist or is closed, either way create a new one
return new DelayedCreationCachedIndexInput(fileCache, blobContainer, blobFetchRequest);
} else {
// already in the cache and ready to be used (open)
return cachedIndexInput;
}
});
});

// Cache entry was either retrieved from the cache or newly added, either
Expand All @@ -78,36 +84,30 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio

@SuppressWarnings("removal")
private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobContainer blobContainer, BlobFetchRequest request) {
// We need to do a privileged action here in order to fetch from remote
// and write to the local file cache in case this is invoked as a side
// effect of a plugin (such as a scripted search) that doesn't have the
// necessary permissions.
return AccessController.doPrivileged((PrivilegedAction<FileCachedIndexInput>) () -> {
try {
if (Files.exists(request.getFilePath()) == false) {
try (
OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath());
OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream)
) {
for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) {
try (
InputStream snapshotFileInputStream = blobContainer.readBlob(
blobPart.getBlobName(),
blobPart.getPosition(),
blobPart.getLength()
);
) {
snapshotFileInputStream.transferTo(localFileOutputStream);
}
try {
if (Files.exists(request.getFilePath()) == false) {
try (
OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath());
OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream)
) {
for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) {
try (
InputStream snapshotFileInputStream = blobContainer.readBlob(
blobPart.getBlobName(),
blobPart.getPosition(),
blobPart.getLength()
);
) {
snapshotFileInputStream.transferTo(localFileOutputStream);
}
}
}
final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ);
return new FileCachedIndexInput(fileCache, request.getFilePath(), luceneIndexInput);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ);
return new FileCachedIndexInput(fileCache, request.getFilePath(), luceneIndexInput);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

/**
Expand Down

0 comments on commit d7a782c

Please sign in to comment.