Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify the file presense for cached directory lister and retry #20414

Merged
merged 2 commits into from
Feb 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -513,24 +513,45 @@ private List<TrinoFileStatus> listBucketFiles(TrinoFileSystem fs, Location locat

@VisibleForTesting
Iterator<InternalHiveSplit> buildManifestFileIterator(InternalHiveSplitFactory splitFactory, Location location, List<Location> paths, boolean splittable)
{
return createInternalHiveSplitIterator(splitFactory, splittable, Optional.empty(), verifiedFileStatusesStream(location, paths));
}

private Stream<TrinoFileStatus> verifiedFileStatusesStream(Location location, List<Location> paths)
{
TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session);
// Check if location is cached BEFORE using the directoryLister
boolean isCached = directoryLister.isCached(location);

Map<String, TrinoFileStatus> fileStatuses = new HashMap<>();
Iterator<TrinoFileStatus> fileStatusIterator = new HiveFileIterator(table, location, trinoFileSystem, directoryLister, RECURSE);
if (!fileStatusIterator.hasNext()) {
checkPartitionLocationExists(trinoFileSystem, location);
}
fileStatusIterator.forEachRemaining(status -> fileStatuses.put(Location.of(status.getPath()).path(), status));
Stream<TrinoFileStatus> fileStream = paths.stream()

// If file statuses came from cache verify that all are present
if (isCached) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't feel right to me to ask each time whether the location is cached.
You are adding handling for a corner case in the happy flow this way.

Maybe it would be better to add a procedure to clear the directory listing caching for a specified location.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code under this 'if' verifies if all the listed files are present in directory listing. We know that the discrepancy could be caused by the stale cache and in this case there is a way to handle it (invalidate the cache and retry). There is no sense to do it if location is not cached, invalidation is NoOp and retry would provide the same results.

I did add invalidate(Location) call to directory lister, so the conditional code would work in any case. It is just a performance optimization: avoiding verification and retrying if those are not going to change anything anyway.

boolean missing = paths.stream()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than fully reloading the whole cache it'd be nice if we could just check any missing paths directly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would be great. Unfortunately directory lister can't list the files individually, it does it by folders and caches the same way. We are invalidating the cache for a parent folder (not the whole cache!) causing the reloading of it's content.

.anyMatch(path -> !fileStatuses.containsKey(path.path()));
// Invalidate the cache and reload
if (missing) {
directoryLister.invalidate(location);
Copy link
Member

@alexjo2144 alexjo2144 Feb 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the general approach here is fine, I would suggest changing the interfaces in a slightly different way though.

Exposing isCached via TableInvalidationCallback seems fine to me.

What I'd do differently is rather than exposing invalidate(Location) can we try adding an additional parameter to the HiveFileIterator cnstr, something like boolean invalidateCaches. That will get passed through DirectoryLister#listFilesRecursively to force a hard load when set to true?

My reason for that is you're assuming here that the cache key is on a Location, but that's an internal to the caching DirectoryListers that's not guaranteed to be stable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @alexjo2144.

I looked into your proposal to remove invalidate(Location) from the interface. That doesn't look right for me. Directory listers are chained with the delegate model, some of them are caching (by Location) ones, some are not (they have a Noop invalidate). If we remove invalidate from the interface we won't be able to push it down the chain.

We only call invalidate(location) if isCached(location) is true, so that in a way verifies that the particular directory lister supports cache by location.

What do you think?


fileStatuses.clear();
fileStatusIterator = new HiveFileIterator(table, location, trinoFileSystem, directoryLister, RECURSE);
fileStatusIterator.forEachRemaining(status -> fileStatuses.put(Location.of(status.getPath()).path(), status));
}
}

return paths.stream()
.map(path -> {
TrinoFileStatus status = fileStatuses.get(path.path());
if (status == null) {
throw new TrinoException(HIVE_FILE_NOT_FOUND, "Manifest file from the location [%s] contains non-existent path: %s".formatted(location, path));
}
return status;
});
return createInternalHiveSplitIterator(splitFactory, splittable, Optional.empty(), fileStream);
}

private ListenableFuture<Void> getTransactionalSplits(Location path, boolean splittable, Optional<BucketConversion> bucketConversion, InternalHiveSplitFactory splitFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,20 @@
*/
package io.trino.plugin.hive;

import io.trino.filesystem.Location;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Table;

public interface TableInvalidationCallback
{
void invalidate(Partition partition);
default boolean isCached(Location location)
{
return false;
}

void invalidate(Table table);
default void invalidate(Location location) {}

default void invalidate(Partition partition) {}

default void invalidate(Table table) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.hive.fs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.Weigher;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -117,6 +116,12 @@ private static RemoteIterator<TrinoFileStatus> createListingRemoteIterator(Trino
return new TrinoFileStatusRemoteIterator(fs.listFiles(location));
}

@Override
public void invalidate(Location location)
{
cache.invalidate(location);
}

@Override
public void invalidate(Table table)
{
Expand Down Expand Up @@ -205,8 +210,8 @@ public long getRequestCount()
return cache.stats().requestCount();
}

@VisibleForTesting
boolean isCached(Location location)
@Override
public boolean isCached(Location location)
{
ValueHolder cached = cache.getIfPresent(location);
return cached != null && cached.getFiles().isPresent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ private RemoteIterator<TrinoFileStatus> createListingRemoteIterator(TrinoFileSys
return delegate.listFilesRecursively(fs, table, cacheKey.getPath());
}

@Override
public void invalidate(Location location)
{
cache.invalidate(new TransactionDirectoryListingCacheKey(transactionId, location));
delegate.invalidate(location);
}

@Override
public void invalidate(Table table)
{
Expand Down Expand Up @@ -157,10 +164,10 @@ public TrinoFileStatus next()
};
}

@VisibleForTesting
boolean isCached(Location location)
@Override
public boolean isCached(Location location)
{
return isCached(new TransactionDirectoryListingCacheKey(transactionId, location));
return isCached(new TransactionDirectoryListingCacheKey(transactionId, location)) || delegate.isCached(location);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,60 @@ public void testBuildManifestFileIteratorNestedDirectory()
assertThat(splits.get(1).getPath()).isEqualTo(directoryPath.toString());
}

@Test
public void testBuildManifestFileIteratorWithCacheInvalidation()
throws IOException
{
CachingDirectoryLister directoryLister = new CachingDirectoryLister(new Duration(5, TimeUnit.MINUTES), DataSize.of(1, MEGABYTE), List.of("*"));
Map<String, String> schema = ImmutableMap.<String, String>builder()
.put(FILE_INPUT_FORMAT, SYMLINK_TEXT_INPUT_FORMAT_CLASS)
.put(SERIALIZATION_LIB, AVRO.getSerde())
.buildOrThrow();

InternalHiveSplitFactory splitFactory = new InternalHiveSplitFactory(
"partition",
AVRO,
schema,
List.of(),
TupleDomain.all(),
() -> true,
ImmutableMap.of(),
Optional.empty(),
Optional.empty(),
DataSize.of(512, MEGABYTE),
false,
Optional.empty());

Location firstFilePath = Location.of("memory:///db_name/table_name/file1");
List<Location> locations1 = List.of(firstFilePath);
BackgroundHiveSplitLoader backgroundHiveSplitLoader1 = backgroundHiveSplitLoader(
locations1,
directoryLister);
Iterator<InternalHiveSplit> splitIterator1 = backgroundHiveSplitLoader1.buildManifestFileIterator(
splitFactory,
Location.of(TABLE_PATH),
locations1,
true);
List<InternalHiveSplit> splits1 = ImmutableList.copyOf(splitIterator1);
assertThat(splits1.size()).isEqualTo(1);
assertThat(splits1.get(0).getPath()).isEqualTo(firstFilePath.toString());

Location secondFilePath = Location.of("memory:///db_name/table_name/file2");
List<Location> locations2 = List.of(firstFilePath, secondFilePath);
BackgroundHiveSplitLoader backgroundHiveSplitLoader2 = backgroundHiveSplitLoader(
locations2,
directoryLister);
Iterator<InternalHiveSplit> splitIterator2 = backgroundHiveSplitLoader2.buildManifestFileIterator(
splitFactory,
Location.of(TABLE_PATH),
locations2,
true);
List<InternalHiveSplit> splits2 = ImmutableList.copyOf(splitIterator2);
assertThat(splits2.size()).isEqualTo(2);
assertThat(splits2.get(0).getPath()).isEqualTo(firstFilePath.toString());
assertThat(splits2.get(1).getPath()).isEqualTo(secondFilePath.toString());
}

@Test
public void testMaxPartitions()
throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Table;

import java.io.IOException;
Expand All @@ -29,14 +28,4 @@ public RemoteIterator<TrinoFileStatus> listFilesRecursively(TrinoFileSystem fs,
{
return new TrinoFileStatusRemoteIterator(fs.listFiles(location));
}

@Override
public void invalidate(Partition partition)
{
}

@Override
public void invalidate(Table table)
{
}
}
Loading