Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix regression when reading manifest file by overriding newInputfile … #3

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import com.google.common.collect.Iterables;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystem;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.io.BulkDeletionFailureException;
import org.apache.iceberg.io.InputFile;
import org.apache.iceberg.io.OutputFile;
Expand All @@ -28,6 +31,7 @@
import java.util.Map;
import java.util.stream.Stream;

import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;

Expand Down Expand Up @@ -88,6 +92,36 @@ public void deleteFiles(Iterable<String> pathsToDelete)
partitions.forEach(this::deleteBatch);
}

@Override
public InputFile newInputFile(ManifestFile manifest)
{
checkArgument(
manifest.keyMetadata() == null,
"Cannot decrypt manifest: %s (use EncryptingFileIO)",
manifest.path());
return newInputFile(manifest.path(), manifest.length());
}

@Override
public InputFile newInputFile(DataFile file)
{
checkArgument(
file.keyMetadata() == null,
"Cannot decrypt data file: %s (use EncryptingFileIO)",
file.path());
return newInputFile(file.path().toString(), file.fileSizeInBytes());
}

@Override
public InputFile newInputFile(DeleteFile file)
{
checkArgument(
file.keyMetadata() == null,
"Cannot decrypt delete file: %s (use EncryptingFileIO)",
file.path());
return newInputFile(file.path().toString(), file.fileSizeInBytes());
}

private void deleteBatch(List<String> filesToDelete)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,6 @@ public void testCreateOrReplaceTableAsSelect()
.add(new FileOperation(SNAPSHOT, "OutputFile.create"))
.add(new FileOperation(MANIFEST, "OutputFile.create"))
.add(new FileOperation(MANIFEST, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.length"))
.add(new FileOperation(STATS, "OutputFile.create"))
.build());
}
Expand All @@ -198,7 +197,6 @@ public void testSelect()
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.length"))
.build());
}

Expand All @@ -225,7 +223,6 @@ public void testSelectWithLimit(int numberOfFiles)
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.addCopies(new FileOperation(MANIFEST, "InputFile.newStream"), min(icebergManifestPrefetching, numberOfFiles))
.addCopies(new FileOperation(MANIFEST, "InputFile.length"), min(icebergManifestPrefetching, numberOfFiles))
.build());

assertFileSystemAccesses("EXPLAIN SELECT * FROM test_select_with_limit LIMIT 3",
Expand All @@ -234,7 +231,6 @@ public void testSelectWithLimit(int numberOfFiles)
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.addCopies(new FileOperation(MANIFEST, "InputFile.newStream"), numberOfFiles)
.addCopies(new FileOperation(MANIFEST, "InputFile.length"), numberOfFiles)
.build());

assertFileSystemAccesses("EXPLAIN ANALYZE SELECT * FROM test_select_with_limit LIMIT 3",
Expand All @@ -243,7 +239,6 @@ public void testSelectWithLimit(int numberOfFiles)
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.addCopies(new FileOperation(MANIFEST, "InputFile.newStream"), numberOfFiles + min(icebergManifestPrefetching, numberOfFiles))
.addCopies(new FileOperation(MANIFEST, "InputFile.length"), numberOfFiles + min(icebergManifestPrefetching, numberOfFiles))
.build());

assertUpdate("DROP TABLE test_select_with_limit");
Expand Down Expand Up @@ -276,7 +271,6 @@ public void testReadWholePartition()
ALL_FILES,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(MANIFEST, "InputFile.newStream"), 2)
.addCopies(new FileOperation(MANIFEST, "InputFile.length"), 2)
.add(new FileOperation(METADATA_JSON, "InputFile.newStream"))
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
Expand All @@ -289,7 +283,6 @@ public void testReadWholePartition()
ALL_FILES,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(MANIFEST, "InputFile.newStream"), 2)
.addCopies(new FileOperation(MANIFEST, "InputFile.length"), 2)
.add(new FileOperation(METADATA_JSON, "InputFile.newStream"))
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
Expand All @@ -301,7 +294,6 @@ public void testReadWholePartition()
ALL_FILES,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(MANIFEST, "InputFile.newStream"), 2)
.addCopies(new FileOperation(MANIFEST, "InputFile.length"), 2)
.add(new FileOperation(METADATA_JSON, "InputFile.newStream"))
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
Expand All @@ -313,7 +305,6 @@ public void testReadWholePartition()
ALL_FILES,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(MANIFEST, "InputFile.newStream"), 2)
.addCopies(new FileOperation(MANIFEST, "InputFile.length"), 2)
.add(new FileOperation(METADATA_JSON, "InputFile.newStream"))
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
Expand All @@ -328,7 +319,6 @@ public void testReadWholePartition()
ALL_FILES,
ImmutableMultiset.<FileOperation>builder()
.addCopies(new FileOperation(MANIFEST, "InputFile.newStream"), 2)
.addCopies(new FileOperation(MANIFEST, "InputFile.length"), 2)
.add(new FileOperation(METADATA_JSON, "InputFile.newStream"))
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
Expand Down Expand Up @@ -365,7 +355,6 @@ public void testReadWholePartitionSplittableFile()
ALL_FILES,
ImmutableMultiset.<FileOperation>builder()
.add(new FileOperation(MANIFEST, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.length"))
.add(new FileOperation(METADATA_JSON, "InputFile.newStream"))
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
Expand All @@ -378,7 +367,6 @@ public void testReadWholePartitionSplittableFile()
ALL_FILES,
ImmutableMultiset.<FileOperation>builder()
.add(new FileOperation(MANIFEST, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.length"))
.add(new FileOperation(METADATA_JSON, "InputFile.newStream"))
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
Expand Down Expand Up @@ -409,23 +397,20 @@ public void testSelectFromVersionedTable()
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.length"))
.build());
assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId,
ImmutableMultiset.<FileOperation>builder()
.add(new FileOperation(METADATA_JSON, "InputFile.newStream"))
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.addCopies(new FileOperation(MANIFEST, "InputFile.newStream"), 2)
.addCopies(new FileOperation(MANIFEST, "InputFile.length"), 2)
.build());
assertFileSystemAccesses("SELECT * FROM " + tableName,
ImmutableMultiset.<FileOperation>builder()
.add(new FileOperation(METADATA_JSON, "InputFile.newStream"))
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.addCopies(new FileOperation(MANIFEST, "InputFile.newStream"), 2)
.addCopies(new FileOperation(MANIFEST, "InputFile.length"), 2)
.build());
}

Expand All @@ -452,23 +437,20 @@ public void testSelectFromVersionedTableWithSchemaEvolution()
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.length"))
.build());
assertFileSystemAccesses("SELECT * FROM " + tableName + " FOR VERSION AS OF " + v3SnapshotId,
ImmutableMultiset.<FileOperation>builder()
.add(new FileOperation(METADATA_JSON, "InputFile.newStream"))
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.addCopies(new FileOperation(MANIFEST, "InputFile.newStream"), 2)
.addCopies(new FileOperation(MANIFEST, "InputFile.length"), 2)
.build());
assertFileSystemAccesses("SELECT * FROM " + tableName,
ImmutableMultiset.<FileOperation>builder()
.add(new FileOperation(METADATA_JSON, "InputFile.newStream"))
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.addCopies(new FileOperation(MANIFEST, "InputFile.newStream"), 2)
.addCopies(new FileOperation(MANIFEST, "InputFile.length"), 2)
.build());
}

Expand All @@ -482,7 +464,6 @@ public void testSelectWithFilter()
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.length"))
.build());
}

Expand All @@ -498,7 +479,6 @@ public void testJoin()
.addCopies(new FileOperation(SNAPSHOT, "InputFile.length"), 2)
.addCopies(new FileOperation(SNAPSHOT, "InputFile.newStream"), 2)
.addCopies(new FileOperation(MANIFEST, "InputFile.newStream"), 4)
.addCopies(new FileOperation(MANIFEST, "InputFile.length"), 4)
.build());
}

Expand All @@ -516,7 +496,6 @@ public void testJoinWithPartitionedTable()
.addCopies(new FileOperation(SNAPSHOT, "InputFile.length"), 2)
.addCopies(new FileOperation(SNAPSHOT, "InputFile.newStream"), 2)
.addCopies(new FileOperation(MANIFEST, "InputFile.newStream"), 4)
.addCopies(new FileOperation(MANIFEST, "InputFile.length"), 4)
.build());
}

Expand All @@ -531,7 +510,6 @@ public void testExplainSelect()
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.length"))
.build());
}

Expand All @@ -546,7 +524,6 @@ public void testShowStatsForTable()
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.length"))
.build());
}

Expand All @@ -563,7 +540,6 @@ public void testShowStatsForPartitionedTable()
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.length"))
.build());
}

Expand All @@ -578,7 +554,6 @@ public void testShowStatsForTableWithFilter()
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.length"))
.build());
}

Expand All @@ -595,7 +570,6 @@ public void testPredicateWithVarcharCastToDate()
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.addCopies(new FileOperation(MANIFEST, "InputFile.newStream"), 2)
.addCopies(new FileOperation(MANIFEST, "InputFile.length"), 2)
.build());

// CAST to date and comparison
Expand All @@ -605,7 +579,6 @@ public void testPredicateWithVarcharCastToDate()
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.newStream")) // fewer than without filter
.add(new FileOperation(MANIFEST, "InputFile.length"))
.build());

// CAST to date and BETWEEN
Expand All @@ -615,7 +588,6 @@ public void testPredicateWithVarcharCastToDate()
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.newStream")) // fewer than without filter
.add(new FileOperation(MANIFEST, "InputFile.length"))
.build());

// conversion to date as a date function
Expand All @@ -625,7 +597,6 @@ public void testPredicateWithVarcharCastToDate()
.add(new FileOperation(SNAPSHOT, "InputFile.length"))
.add(new FileOperation(SNAPSHOT, "InputFile.newStream"))
.add(new FileOperation(MANIFEST, "InputFile.newStream")) // fewer than without filter
.add(new FileOperation(MANIFEST, "InputFile.length"))
.build());

assertUpdate("DROP TABLE test_varchar_as_date_predicate");
Expand All @@ -651,7 +622,6 @@ public void testRemoveOrphanFiles()
.addCopies(new FileOperation(SNAPSHOT, "InputFile.length"), 4)
.addCopies(new FileOperation(SNAPSHOT, "InputFile.newStream"), 4)
.addCopies(new FileOperation(MANIFEST, "InputFile.newStream"), 5)
.addCopies(new FileOperation(MANIFEST, "InputFile.length"), 5)
.build());

assertUpdate("DROP TABLE " + tableName);
Expand Down