Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial commit of Iceberg integration. #5277

Merged
merged 31 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
54e0a80
Initial commit of Iceberg integration.
lbooker42 Mar 22, 2024
780f2b1
Ready for auto-spotless anytime!
lbooker42 Mar 22, 2024
585a26e
Refactored to use internal parquet objects instead of re-implementing…
lbooker42 Apr 2, 2024
816f6b3
Merge branch 'main' into lab-iceberg
lbooker42 Apr 2, 2024
c2323d2
Rebased to main.
lbooker42 Apr 2, 2024
62952f4
update to use faster URI creation
lbooker42 Apr 2, 2024
e070b9f
Address PR comments.
lbooker42 Apr 2, 2024
c7487fe
Fix gradle broken-ness.
lbooker42 Apr 2, 2024
d4a80b8
Gradle comments, a few changes to IcebergInstructions
lbooker42 Apr 2, 2024
4f2ba28
Final gradle fix (uncomment the correct lines)
lbooker42 Apr 3, 2024
3bd98f9
Addressed PR comments, more testing needed.
lbooker42 Apr 18, 2024
de402ed
PR comments, improved testing.
lbooker42 Apr 19, 2024
f45e7d8
Merged with main.
lbooker42 Apr 22, 2024
f2900aa
merged with main
lbooker42 May 1, 2024
9762093
WIP
lbooker42 May 3, 2024
d7c2604
WIP, but test code implemented.
lbooker42 May 14, 2024
905c8ac
merged with main
lbooker42 May 15, 2024
a507402
Tests simplified and passing.
lbooker42 May 20, 2024
8db7923
Merge branch 'main' into lab-iceberg
lbooker42 May 20, 2024
0390102
Gradle cleanup.
lbooker42 May 22, 2024
4539bee
Simplified Iceberg instructions.
lbooker42 May 28, 2024
c5d6be1
Addressed many PR comments.
lbooker42 May 30, 2024
23e4a18
Attempted to handle partitioning columns correctly.
lbooker42 May 31, 2024
fa2e79d
Getting close to final.
lbooker42 May 31, 2024
d6065e4
Another rev from comments.
lbooker42 May 31, 2024
ea5ca0e
WIP, some updates.
lbooker42 Jun 3, 2024
35861c1
Merge branch 'main' into lab-iceberg
lbooker42 Jun 3, 2024
e51cf7c
Hadoop gradle version harmonization.
lbooker42 Jun 3, 2024
b408f12
Iceberg project restructure.
lbooker42 Jun 3, 2024
de7d1f3
Exposing 'iceberg-aws' in gradle.
lbooker42 Jun 3, 2024
23d6e32
Addressing PR comments.
lbooker42 Jun 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
package io.deephaven.iceberg.layout;

import io.deephaven.base.FileUtils;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import io.deephaven.iceberg.location.IcebergTableLocationKey;
import io.deephaven.iceberg.location.IcebergTableParquetLocationKey;
import io.deephaven.iceberg.util.IcebergInstructions;
import io.deephaven.parquet.table.ParquetInstructions;
import org.apache.iceberg.*;
import org.apache.iceberg.io.FileIO;
import org.jetbrains.annotations.NotNull;
Expand All @@ -22,59 +26,107 @@
* from a {@link org.apache.iceberg.Snapshot}
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
*/
public final class IcebergFlatLayout implements TableLocationKeyFinder<IcebergTableLocationKey> {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
private final Snapshot tableSnapshot;
/**
* The Iceberg {@link Table} to discover locations for.
*/
private final Table table;

/**
* The {@link FileIO} to use for passing to the catalog reading manifest data files.
*/
private final FileIO fileIO;

/**
* A cache of {@link IcebergTableLocationKey}s keyed by the URI of the file they represent.
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
*/
private final Map<URI, IcebergTableLocationKey> cache;
private final Object readInstructions;

private static IcebergTableLocationKey locationKey(
/**
* The instructions for customizations while reading.
*/
private final IcebergInstructions instructions;

/**
* The {@link ParquetInstructions} object that will be used to read any Parquet data files in this table.
*/
private ParquetInstructions parquetInstructions;

/**
* The current {@link Snapshot} to discover locations for.
*/
private Snapshot currentSnapshot;

private IcebergTableLocationKey locationKey(
final FileFormat format,
final URI fileUri,
@NotNull final Object readInstructions) {
return new IcebergTableLocationKey(format, fileUri, 0, null, readInstructions);
final URI fileUri) {
if (format == org.apache.iceberg.FileFormat.PARQUET) {
if (parquetInstructions == null) {
// Start with user-supplied instructions (if provided).
parquetInstructions = instructions.parquetInstructions().isPresent()
? instructions.parquetInstructions().get()
: ParquetInstructions.builder().build();
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

// Use the ParquetInstructions overrides to propagate the Iceberg instructions.
if (instructions.columnRenameMap() != null) {
parquetInstructions = parquetInstructions.withColumnRenameMap(instructions.columnRenameMap());
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
}
if (instructions.s3Instructions().isPresent()) {
parquetInstructions =
parquetInstructions.withSpecialInstructions(instructions.s3Instructions().get());
}
}
return new IcebergTableParquetLocationKey(fileUri, 0, null, parquetInstructions);
}
throw new UnsupportedOperationException("Unsupported file format: " + format);
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* @param table The {@link Table} to discover locations for.
* @param tableSnapshot The {@link Snapshot} from which to discover data files.
* @param readInstructions the instructions for customizations while reading.
* @param fileIO The file IO to use for reading manifest data files.
* @param instructions The instructions for customizations while reading.
*/
public IcebergFlatLayout(
@NotNull final Table table,
@NotNull final Snapshot tableSnapshot,
@NotNull final FileIO fileIO,
@NotNull final Object readInstructions) {
this.tableSnapshot = tableSnapshot;
@NotNull final IcebergInstructions instructions) {
this.table = table;
this.currentSnapshot = tableSnapshot;
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
this.fileIO = fileIO;
this.readInstructions = readInstructions;
this.instructions = instructions;

this.cache = new HashMap<>();
}

public String toString() {
return IcebergFlatLayout.class.getSimpleName() + '[' + tableSnapshot + ']';
return IcebergFlatLayout.class.getSimpleName() + '[' + table.name() + ']';
}

@Override
public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKey> locationKeyObserver) {
try {
// Retrieve the manifest files from the snapshot
final List<ManifestFile> manifestFiles = tableSnapshot.allManifests(fileIO);
final List<ManifestFile> manifestFiles = currentSnapshot.allManifests(fileIO);
for (final ManifestFile manifestFile : manifestFiles) {
// Currently only can process manifest files with DATA content type.
Assert.eq(manifestFile.content(), "manifestFile.content()",
ManifestContent.DATA, "ManifestContent.DATA");
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
final ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, fileIO);
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
for (DataFile df : reader) {
final URI fileUri = FileUtils.convertToURI(df.path().toString(), false);
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
IcebergTableLocationKey locationKey = cache.get(fileUri);
if (locationKey == null) {
locationKey = locationKey(df.format(), fileUri, readInstructions);
if (!locationKey.verifyFileReader()) {
continue;
}
cache.put(fileUri, locationKey);
final IcebergTableLocationKey locationKey = cache.computeIfAbsent(fileUri, uri -> {
final IcebergTableLocationKey key = locationKey(df.format(), fileUri);
// Verify before caching.
return key.verifyFileReader() ? key : null;
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
});
if (locationKey != null) {
locationKeyObserver.accept(locationKey);
}
locationKeyObserver.accept(locationKey);
}
}
} catch (final Exception e) {
throw new TableDataException("Error finding Iceberg locations under " + tableSnapshot, e);
throw new TableDataException("Error finding Iceberg locations under " + currentSnapshot, e);
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,89 +4,167 @@
package io.deephaven.iceberg.layout;

import io.deephaven.base.FileUtils;
import io.deephaven.base.verify.Assert;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import io.deephaven.iceberg.location.IcebergTableLocationKey;
import io.deephaven.iceberg.location.IcebergTableParquetLocationKey;
import io.deephaven.iceberg.util.IcebergInstructions;
import io.deephaven.parquet.table.ParquetInstructions;
import io.deephaven.util.type.TypeUtils;
import org.apache.iceberg.*;
import org.apache.iceberg.io.FileIO;
import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.function.Consumer;

/**
* Iceberg {@link TableLocationKeyFinder location finder} for tables without partitions that will discover data files
* from a {@link org.apache.iceberg.Snapshot}
* Iceberg {@link TableLocationKeyFinder location finder} for tables with partitions that will discover data files from
* a {@link org.apache.iceberg.Snapshot}
*/
public final class IcebergKeyValuePartitionedLayout implements TableLocationKeyFinder<IcebergTableLocationKey> {
private final Snapshot tableSnapshot;
/**
* The {@link TableDefinition} that will be used for the table.
*/
final TableDefinition tableDef;
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
/**
* The Iceberg {@link org.apache.iceberg.Table} to discover locations for.
*/
private final org.apache.iceberg.Table table;

/**
* The {@link FileIO} to use for passing to the catalog reading manifest data files.
*/
private final FileIO fileIO;

/**
* The columns to use for partitioning.
*/
private final String[] partitionColumns;
private final Object readInstructions;

/**
* The data types of the partitioning columns.
*/
private final Class<?>[] partitionColumnTypes;

/**
* A cache of {@link IcebergTableLocationKey}s keyed by the URI of the file they represent.
*/
private final Map<URI, IcebergTableLocationKey> cache;

private static IcebergTableLocationKey locationKey(
final FileFormat format,
/**
* The instructions for customizations while reading.
*/
private final IcebergInstructions instructions;

/**
* The {@link ParquetInstructions} object that will be used to read any Parquet data files in this table.
*/
private ParquetInstructions parquetInstructions;

/**
* The current {@link org.apache.iceberg.Snapshot} to discover locations for.
*/
private org.apache.iceberg.Snapshot currentSnapshot;
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

private IcebergTableLocationKey locationKey(
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
final org.apache.iceberg.FileFormat format,
final URI fileUri,
final Map<String, Comparable<?>> partitions,
@NotNull final Object readInstructions) {
return new IcebergTableLocationKey(format, fileUri, 0, partitions, readInstructions);
final Map<String, Comparable<?>> partitions) {

if (format == org.apache.iceberg.FileFormat.PARQUET) {
if (parquetInstructions == null) {
// Start with user-supplied instructions (if provided).
parquetInstructions = instructions.parquetInstructions().isPresent()
? instructions.parquetInstructions().get()
: ParquetInstructions.builder().build();

// Use the ParquetInstructions overrides to propagate the Iceberg instructions.
if (instructions.columnRenameMap() != null) {
parquetInstructions = parquetInstructions.withColumnRenameMap(instructions.columnRenameMap());
}
if (instructions.s3Instructions().isPresent()) {
parquetInstructions =
parquetInstructions.withSpecialInstructions(instructions.s3Instructions().get());
}
}
return new IcebergTableParquetLocationKey(fileUri, 0, partitions, parquetInstructions);
}
throw new UnsupportedOperationException("Unsupported file format: " + format);
}

/**
* @param tableSnapshot The {@link Snapshot} from which to discover data files.
* @param table The {@link org.apache.iceberg.Table} to discover locations for.
* @param tableSnapshot The {@link org.apache.iceberg.Snapshot} from which to discover data files.
* @param fileIO The file IO to use for reading manifest data files.
* @param partitionColumns The columns to use for partitioning.
* @param readInstructions The instructions for customizations while reading.
* @param instructions The instructions for customizations while reading.
*/
public IcebergKeyValuePartitionedLayout(
@NotNull final Snapshot tableSnapshot,
@NotNull final TableDefinition tableDef,
@NotNull final org.apache.iceberg.Table table,
@NotNull final org.apache.iceberg.Snapshot tableSnapshot,
@NotNull final FileIO fileIO,
@NotNull final String[] partitionColumns,
@NotNull final Object readInstructions) {
this.tableSnapshot = tableSnapshot;
@NotNull final IcebergInstructions instructions) {
this.tableDef = tableDef;
this.table = table;
this.currentSnapshot = tableSnapshot;
this.fileIO = fileIO;
this.partitionColumns = partitionColumns;
this.readInstructions = readInstructions;
this.instructions = instructions;

// Compute and store the data types of the partitioning columns.
partitionColumnTypes = Arrays.stream(partitionColumns)
.map(colName -> TypeUtils.getBoxedType(tableDef.getColumn(colName).getDataType()))
.toArray(Class<?>[]::new);
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

this.cache = new HashMap<>();
}

public String toString() {
return IcebergFlatLayout.class.getSimpleName() + '[' + tableSnapshot + ']';
return IcebergFlatLayout.class.getSimpleName() + '[' + table.name() + ']';
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKey> locationKeyObserver) {
final Map<String, Comparable<?>> partitions = new LinkedHashMap<>();
try {
// Retrieve the manifest files from the snapshot
final List<ManifestFile> manifestFiles = tableSnapshot.allManifests(fileIO);
final List<ManifestFile> manifestFiles = currentSnapshot.allManifests(fileIO);
for (final ManifestFile manifestFile : manifestFiles) {
// Currently only can process manifest files with DATA content type.
Assert.eq(manifestFile.content(), "manifestFile.content()",
ManifestContent.DATA, "ManifestContent.DATA");
final ManifestReader<DataFile> reader = ManifestFiles.read(manifestFile, fileIO);
for (DataFile df : reader) {
final URI fileUri = FileUtils.convertToURI(df.path().toString(), false);
IcebergTableLocationKey locationKey = cache.get(fileUri);
if (locationKey == null) {
final IcebergTableLocationKey locationKey = cache.computeIfAbsent(fileUri, uri -> {
final PartitionData partitionData = (PartitionData) df.partition();
for (int ii = 0; ii < partitionColumns.length; ++ii) {
partitions.put(partitionColumns[ii], (Comparable<?>) partitionData.get(ii));
}
locationKey = locationKey(df.format(), fileUri, partitions, readInstructions);
if (!locationKey.verifyFileReader()) {
continue;
final Object value = partitionData.get(ii);
if (value != null && !value.getClass().isAssignableFrom(partitionColumnTypes[ii])) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
throw new TableDataException("Partitioning column " + partitionColumns[ii]
+ " has type " + value.getClass().getName()
+ " but expected " + partitionColumnTypes[ii].getName());
}
partitions.put(partitionColumns[ii], (Comparable<?>) value);
}
cache.put(fileUri, locationKey);
final IcebergTableLocationKey key =
locationKey(df.format(), fileUri, partitions);
// Verify before caching.
return key.verifyFileReader() ? key : null;
});
if (locationKey != null) {
locationKeyObserver.accept(locationKey);
}
locationKeyObserver.accept(locationKey);
}
}
} catch (final Exception e) {
throw new TableDataException("Error finding Iceberg locations under " + tableSnapshot, e);
throw new TableDataException("Error finding Iceberg locations under " + currentSnapshot, e);
}
}
}
Loading
Loading