Skip to content

Commit

Permalink
Allow multiple invocations of KeyValuePartitionLayout.findKeys(...), …
Browse files Browse the repository at this point in the history
…by allocating LocationTableBuilders per call (#4313)
  • Loading branch information
rcaudy authored and stanbrub committed Aug 11, 2023
1 parent 02906e4 commit cb57bf1
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.Supplier;

/**
* {@link TableLocationKeyFinder Location finder} that will traverse a directory hierarchy and infer partitions from
Expand Down Expand Up @@ -74,26 +75,29 @@ public interface LocationTableBuilder {

private final File tableRootDirectory;
private final Predicate<Path> pathFilter;
private final LocationTableBuilder locationTableBuilder;
private final Supplier<LocationTableBuilder> locationTableBuilderFactory;
private final BiFunction<Path, Map<String, Comparable<?>>, TLK> keyFactory;
private final int maxPartitioningLevels;

/**
* @param tableRootDirectory The directory to traverse from
* @param pathFilter Filter to determine whether a regular file should be used to create a key
* @param locationTableBuilderFactory Factory for {@link LocationTableBuilder builders} used to organize partition
* information; as builders are typically stateful, a new builder is created each time this
* KeyValuePartitionLayout is used to {@link #findKeys(Consumer) find keys}
* @param keyFactory Key factory function
* @param maxPartitioningLevels Maximum partitioning levels to traverse. Must be {@code >= 0}. {@code 0} means only
* look at files in {@code tableRootDirectory} and find no partitions.
*/
public KeyValuePartitionLayout(
@NotNull final File tableRootDirectory,
@NotNull final Predicate<Path> pathFilter,
@NotNull final LocationTableBuilder locationTableBuilder,
@NotNull final Supplier<LocationTableBuilder> locationTableBuilderFactory,
@NotNull final BiFunction<Path, Map<String, Comparable<?>>, TLK> keyFactory,
final int maxPartitioningLevels) {
this.tableRootDirectory = tableRootDirectory;
this.pathFilter = pathFilter;
this.locationTableBuilder = locationTableBuilder;
this.locationTableBuilderFactory = locationTableBuilderFactory;
this.keyFactory = keyFactory;
this.maxPartitioningLevels = Require.geqZero(maxPartitioningLevels, "maxPartitioningLevels");
}
Expand All @@ -105,8 +109,7 @@ public String toString() {
@Override
public void findKeys(@NotNull final Consumer<TLK> locationKeyObserver) {
final Deque<Path> targetFiles = new ArrayDeque<>();


final LocationTableBuilder locationTableBuilder = locationTableBuilderFactory.get();
try {
Files.walkFileTree(tableRootDirectory.toPath(), EnumSet.of(FileVisitOption.FOLLOW_LINKS),
maxPartitioningLevels + 1, new SimpleFileVisitor<>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -56,7 +57,7 @@ public void testFlat() throws IOException {
Files.write(file2.toPath(), "Goodbye cruel world!".getBytes());

final RecordingLocationKeyFinder<FileTableLocationKey> recorder = new RecordingLocationKeyFinder<>();
new KeyValuePartitionLayout<>(dataDirectory, path -> true, new LocationTableBuilderCsv(dataDirectory),
new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory),
(path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 0).findKeys(recorder);
final List<FileTableLocationKey> results =
recorder.getRecordedKeys().stream().sorted().collect(Collectors.toList());
Expand All @@ -80,7 +81,7 @@ public void testOneLevel() throws IOException {
Files.write(file2.toPath(), "Goodbye cruel world!".getBytes());

final RecordingLocationKeyFinder<FileTableLocationKey> recorder = new RecordingLocationKeyFinder<>();
new KeyValuePartitionLayout<>(dataDirectory, path -> true, new LocationTableBuilderCsv(dataDirectory),
new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory),
(path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 1).findKeys(recorder);
final List<FileTableLocationKey> results =
recorder.getRecordedKeys().stream().sorted().collect(Collectors.toList());
Expand Down Expand Up @@ -113,7 +114,7 @@ public void testThreeLevels() throws IOException {
Files.write(file3.toPath(), "Oui!".getBytes());

final RecordingLocationKeyFinder<FileTableLocationKey> recorder = new RecordingLocationKeyFinder<>();
new KeyValuePartitionLayout<>(dataDirectory, path -> true, new LocationTableBuilderCsv(dataDirectory),
new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory),
(path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 3).findKeys(recorder);
final List<FileTableLocationKey> results =
recorder.getRecordedKeys().stream().sorted().collect(Collectors.toList());
Expand Down Expand Up @@ -156,42 +157,48 @@ public void testTypesAndNameLegalization() throws IOException {
Files.write(file2.toPath(), "Goodbye cruel world!".getBytes());
Files.write(file3.toPath(), "Oui!".getBytes());

final LocationTableBuilder[] locationTableBuilders = new LocationTableBuilder[] {
new LocationTableBuilderCsv(dataDirectory),
new LocationTableBuilderDefinition(TableDefinition.of(
final List<Supplier<LocationTableBuilder>> locationTableBuilderSuppliers = List.of(
() -> new LocationTableBuilderCsv(dataDirectory),
() -> new LocationTableBuilderDefinition(TableDefinition.of(
ColumnDefinition.ofInt("A").withPartitioning(),
ColumnDefinition.ofBoolean("C").withPartitioning(),
ColumnDefinition.ofDouble("B1").withPartitioning()))
};
for (final LocationTableBuilder locationTableBuilder : locationTableBuilders) {

final RecordingLocationKeyFinder<FileTableLocationKey> recorder = new RecordingLocationKeyFinder<>();
new KeyValuePartitionLayout<>(dataDirectory, path -> true, locationTableBuilder,
(path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 3).findKeys(recorder);
final List<FileTableLocationKey> results =
recorder.getRecordedKeys().stream().sorted().collect(Collectors.toList());

TestCase.assertEquals(3, results.size());

TestCase.assertEquals(file2.getAbsoluteFile(), results.get(0).getFile());
TestCase.assertEquals(file3.getAbsoluteFile(), results.get(1).getFile());
TestCase.assertEquals(file1.getAbsoluteFile(), results.get(2).getFile());

TestCase.assertEquals(3, results.get(0).getPartitionKeys().size());
TestCase.assertEquals(3, results.get(1).getPartitionKeys().size());
TestCase.assertEquals(3, results.get(2).getPartitionKeys().size());

TestCase.assertEquals(Integer.valueOf(1), results.get(0).getPartitionValue("A"));
TestCase.assertEquals(Integer.valueOf(1), results.get(1).getPartitionValue("A"));
TestCase.assertEquals(Integer.valueOf(2), results.get(2).getPartitionValue("A"));

TestCase.assertEquals(7.0, results.get(0).getPartitionValue("B1"));
TestCase.assertEquals(100.0, results.get(1).getPartitionValue("B1"));
TestCase.assertEquals(3.14, results.get(2).getPartitionValue("B1"));

TestCase.assertEquals(Boolean.FALSE, results.get(0).getPartitionValue("C"));
TestCase.assertEquals(Boolean.FALSE, results.get(1).getPartitionValue("C"));
TestCase.assertEquals(Boolean.TRUE, results.get(2).getPartitionValue("C"));
ColumnDefinition.ofDouble("B1").withPartitioning())));
for (final Supplier<LocationTableBuilder> locationTableBuilderSupplier : locationTableBuilderSuppliers) {
final TableLocationKeyFinder<FileTableLocationKey> finder = new KeyValuePartitionLayout<>(
dataDirectory, path -> true, locationTableBuilderSupplier,
(path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 3);

final RecordingLocationKeyFinder<FileTableLocationKey> recorder1 = new RecordingLocationKeyFinder<>();
finder.findKeys(recorder1);
final RecordingLocationKeyFinder<FileTableLocationKey> recorder2 = new RecordingLocationKeyFinder<>();
finder.findKeys(recorder2);
final List<FileTableLocationKey> results1 =
recorder1.getRecordedKeys().stream().sorted().collect(Collectors.toList());
final List<FileTableLocationKey> results2 =
recorder2.getRecordedKeys().stream().sorted().collect(Collectors.toList());
TestCase.assertEquals(results1, results2);

TestCase.assertEquals(3, results1.size());

TestCase.assertEquals(file2.getAbsoluteFile(), results1.get(0).getFile());
TestCase.assertEquals(file3.getAbsoluteFile(), results1.get(1).getFile());
TestCase.assertEquals(file1.getAbsoluteFile(), results1.get(2).getFile());

TestCase.assertEquals(3, results1.get(0).getPartitionKeys().size());
TestCase.assertEquals(3, results1.get(1).getPartitionKeys().size());
TestCase.assertEquals(3, results1.get(2).getPartitionKeys().size());

TestCase.assertEquals(Integer.valueOf(1), results1.get(0).getPartitionValue("A"));
TestCase.assertEquals(Integer.valueOf(1), results1.get(1).getPartitionValue("A"));
TestCase.assertEquals(Integer.valueOf(2), results1.get(2).getPartitionValue("A"));

TestCase.assertEquals(7.0, results1.get(0).getPartitionValue("B1"));
TestCase.assertEquals(100.0, results1.get(1).getPartitionValue("B1"));
TestCase.assertEquals(3.14, results1.get(2).getPartitionValue("B1"));

TestCase.assertEquals(Boolean.FALSE, results1.get(0).getPartitionValue("C"));
TestCase.assertEquals(Boolean.FALSE, results1.get(1).getPartitionValue("C"));
TestCase.assertEquals(Boolean.TRUE, results1.get(2).getPartitionValue("C"));
}
}

Expand All @@ -211,7 +218,7 @@ public void testMaxDepthEmpty() throws IOException {
Files.write(file3.toPath(), "Oui!".getBytes());

final RecordingLocationKeyFinder<FileTableLocationKey> recorder = new RecordingLocationKeyFinder<>();
new KeyValuePartitionLayout<>(dataDirectory, path -> true, new LocationTableBuilderCsv(dataDirectory),
new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory),
(path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 3).findKeys(recorder);
final List<FileTableLocationKey> results =
recorder.getRecordedKeys().stream().sorted().collect(Collectors.toList());
Expand Down Expand Up @@ -239,7 +246,7 @@ public void testMaxDepth() throws IOException {
Files.write(file4.toPath(), "Non!".getBytes());

final RecordingLocationKeyFinder<FileTableLocationKey> recorder = new RecordingLocationKeyFinder<>();
new KeyValuePartitionLayout<>(dataDirectory, path -> true, new LocationTableBuilderCsv(dataDirectory),
new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory),
(path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 3).findKeys(recorder);
final List<FileTableLocationKey> results =
recorder.getRecordedKeys().stream().sorted().collect(Collectors.toList());
Expand Down Expand Up @@ -267,7 +274,7 @@ public void testMismatch() throws IOException {
Files.write(file3.toPath(), "Oui!".getBytes());

try {
new KeyValuePartitionLayout<>(dataDirectory, path -> true, new LocationTableBuilderCsv(dataDirectory),
new KeyValuePartitionLayout<>(dataDirectory, path -> true, () -> new LocationTableBuilderCsv(dataDirectory),
(path, partitions) -> new FileTableLocationKey(path.toFile(), 0, partitions), 3).findKeys(ftlk -> {
});
TestCase.fail("Expected exception");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public ParquetKeyValuePartitionedLayout(
@NotNull final TableDefinition tableDefinition) {
super(tableRootDirectory,
ParquetFileHelper::fileNameMatches,
new LocationTableBuilderDefinition(tableDefinition),
() -> new LocationTableBuilderDefinition(tableDefinition),
(path, partitions) -> new ParquetTableLocationKey(path.toFile(), 0, partitions),
Math.toIntExact(tableDefinition.getColumnStream().filter(ColumnDefinition::isPartitioning).count()));
}
Expand All @@ -36,7 +36,7 @@ public ParquetKeyValuePartitionedLayout(
final int maxPartitioningLevels) {
super(tableRootDirectory,
ParquetFileHelper::fileNameMatches,
new LocationTableBuilderCsv(tableRootDirectory),
() -> new LocationTableBuilderCsv(tableRootDirectory),
(path, partitions) -> new ParquetTableLocationKey(path.toFile(), 0, partitions),
maxPartitioningLevels);
}
Expand Down

0 comments on commit cb57bf1

Please sign in to comment.