Skip to content

Commit

Permalink
Handle new files in symlink tables with listing caching
Browse files Browse the repository at this point in the history
The cached directory listings may not contain files that were added to
the symlink manifest files after the directory listing was cached.
  • Loading branch information
i-93 authored Feb 22, 2024
1 parent 9f462f0 commit 0324da7
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -513,24 +513,45 @@ private List<TrinoFileStatus> listBucketFiles(TrinoFileSystem fs, Location locat

@VisibleForTesting
Iterator<InternalHiveSplit> buildManifestFileIterator(InternalHiveSplitFactory splitFactory, Location location, List<Location> paths, boolean splittable)
{
return createInternalHiveSplitIterator(splitFactory, splittable, Optional.empty(), verifiedFileStatusesStream(location, paths));
}

private Stream<TrinoFileStatus> verifiedFileStatusesStream(Location location, List<Location> paths)
{
TrinoFileSystem trinoFileSystem = fileSystemFactory.create(session);
// Check if location is cached BEFORE using the directoryLister
boolean isCached = directoryLister.isCached(location);

Map<String, TrinoFileStatus> fileStatuses = new HashMap<>();
Iterator<TrinoFileStatus> fileStatusIterator = new HiveFileIterator(table, location, trinoFileSystem, directoryLister, RECURSE);
if (!fileStatusIterator.hasNext()) {
checkPartitionLocationExists(trinoFileSystem, location);
}
fileStatusIterator.forEachRemaining(status -> fileStatuses.put(Location.of(status.getPath()).path(), status));
Stream<TrinoFileStatus> fileStream = paths.stream()

// If file statuses came from cache verify that all are present
if (isCached) {
boolean missing = paths.stream()
.anyMatch(path -> !fileStatuses.containsKey(path.path()));
// Invalidate the cache and reload
if (missing) {
directoryLister.invalidate(location);

fileStatuses.clear();
fileStatusIterator = new HiveFileIterator(table, location, trinoFileSystem, directoryLister, RECURSE);
fileStatusIterator.forEachRemaining(status -> fileStatuses.put(Location.of(status.getPath()).path(), status));
}
}

return paths.stream()
.map(path -> {
TrinoFileStatus status = fileStatuses.get(path.path());
if (status == null) {
throw new TrinoException(HIVE_FILE_NOT_FOUND, "Manifest file from the location [%s] contains non-existent path: %s".formatted(location, path));
}
return status;
});
return createInternalHiveSplitIterator(splitFactory, splittable, Optional.empty(), fileStream);
}

private ListenableFuture<Void> getTransactionalSplits(Location path, boolean splittable, Optional<BucketConversion> bucketConversion, InternalHiveSplitFactory splitFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,20 @@
*/
package io.trino.plugin.hive;

import io.trino.filesystem.Location;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Table;

public interface TableInvalidationCallback
{
void invalidate(Partition partition);
default boolean isCached(Location location)
{
return false;
}

void invalidate(Table table);
default void invalidate(Location location) {}

default void invalidate(Partition partition) {}

default void invalidate(Table table) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.plugin.hive.fs;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.Weigher;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -117,6 +116,12 @@ private static RemoteIterator<TrinoFileStatus> createListingRemoteIterator(Trino
return new TrinoFileStatusRemoteIterator(fs.listFiles(location));
}

@Override
public void invalidate(Location location)
{
cache.invalidate(location);
}

@Override
public void invalidate(Table table)
{
Expand Down Expand Up @@ -205,8 +210,8 @@ public long getRequestCount()
return cache.stats().requestCount();
}

@VisibleForTesting
boolean isCached(Location location)
@Override
public boolean isCached(Location location)
{
ValueHolder cached = cache.getIfPresent(location);
return cached != null && cached.getFiles().isPresent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,13 @@ private RemoteIterator<TrinoFileStatus> createListingRemoteIterator(TrinoFileSys
return delegate.listFilesRecursively(fs, table, cacheKey.getPath());
}

@Override
public void invalidate(Location location)
{
cache.invalidate(new TransactionDirectoryListingCacheKey(transactionId, location));
delegate.invalidate(location);
}

@Override
public void invalidate(Table table)
{
Expand Down Expand Up @@ -157,10 +164,10 @@ public TrinoFileStatus next()
};
}

@VisibleForTesting
boolean isCached(Location location)
@Override
public boolean isCached(Location location)
{
return isCached(new TransactionDirectoryListingCacheKey(transactionId, location));
return isCached(new TransactionDirectoryListingCacheKey(transactionId, location)) || delegate.isCached(location);
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -856,6 +856,60 @@ public void testBuildManifestFileIteratorNestedDirectory()
assertThat(splits.get(1).getPath()).isEqualTo(directoryPath.toString());
}

@Test
public void testBuildManifestFileIteratorWithCacheInvalidation()
throws IOException
{
CachingDirectoryLister directoryLister = new CachingDirectoryLister(new Duration(5, TimeUnit.MINUTES), DataSize.of(1, MEGABYTE), List.of("*"));
Map<String, String> schema = ImmutableMap.<String, String>builder()
.put(FILE_INPUT_FORMAT, SYMLINK_TEXT_INPUT_FORMAT_CLASS)
.put(SERIALIZATION_LIB, AVRO.getSerde())
.buildOrThrow();

InternalHiveSplitFactory splitFactory = new InternalHiveSplitFactory(
"partition",
AVRO,
schema,
List.of(),
TupleDomain.all(),
() -> true,
ImmutableMap.of(),
Optional.empty(),
Optional.empty(),
DataSize.of(512, MEGABYTE),
false,
Optional.empty());

Location firstFilePath = Location.of("memory:///db_name/table_name/file1");
List<Location> locations1 = List.of(firstFilePath);
BackgroundHiveSplitLoader backgroundHiveSplitLoader1 = backgroundHiveSplitLoader(
locations1,
directoryLister);
Iterator<InternalHiveSplit> splitIterator1 = backgroundHiveSplitLoader1.buildManifestFileIterator(
splitFactory,
Location.of(TABLE_PATH),
locations1,
true);
List<InternalHiveSplit> splits1 = ImmutableList.copyOf(splitIterator1);
assertThat(splits1.size()).isEqualTo(1);
assertThat(splits1.get(0).getPath()).isEqualTo(firstFilePath.toString());

Location secondFilePath = Location.of("memory:///db_name/table_name/file2");
List<Location> locations2 = List.of(firstFilePath, secondFilePath);
BackgroundHiveSplitLoader backgroundHiveSplitLoader2 = backgroundHiveSplitLoader(
locations2,
directoryLister);
Iterator<InternalHiveSplit> splitIterator2 = backgroundHiveSplitLoader2.buildManifestFileIterator(
splitFactory,
Location.of(TABLE_PATH),
locations2,
true);
List<InternalHiveSplit> splits2 = ImmutableList.copyOf(splitIterator2);
assertThat(splits2.size()).isEqualTo(2);
assertThat(splits2.get(0).getPath()).isEqualTo(firstFilePath.toString());
assertThat(splits2.get(1).getPath()).isEqualTo(secondFilePath.toString());
}

@Test
public void testMaxPartitions()
throws Exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import io.trino.plugin.hive.metastore.Partition;
import io.trino.plugin.hive.metastore.Table;

import java.io.IOException;
Expand All @@ -29,14 +28,4 @@ public RemoteIterator<TrinoFileStatus> listFilesRecursively(TrinoFileSystem fs,
{
return new TrinoFileStatusRemoteIterator(fs.listFiles(location));
}

@Override
public void invalidate(Partition partition)
{
}

@Override
public void invalidate(Table table)
{
}
}

0 comments on commit 0324da7

Please sign in to comment.