Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial commit of Iceberg integration. #5277

Merged
merged 31 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
54e0a80
Initial commit of Iceberg integration.
lbooker42 Mar 22, 2024
780f2b1
Ready for auto-spotless anytime!
lbooker42 Mar 22, 2024
585a26e
Refactored to use internal parquet objects instead of re-implementing…
lbooker42 Apr 2, 2024
816f6b3
Merge branch 'main' into lab-iceberg
lbooker42 Apr 2, 2024
c2323d2
Rebased to main.
lbooker42 Apr 2, 2024
62952f4
update to use faster URI creation
lbooker42 Apr 2, 2024
e070b9f
Address PR comments.
lbooker42 Apr 2, 2024
c7487fe
Fix gradle broken-ness.
lbooker42 Apr 2, 2024
d4a80b8
Gradle comments, a few changes to IcebergInstructions
lbooker42 Apr 2, 2024
4f2ba28
Final gradle fix (uncomment the correct lines)
lbooker42 Apr 3, 2024
3bd98f9
Addressed PR comments, more testing needed.
lbooker42 Apr 18, 2024
de402ed
PR comments, improved testing.
lbooker42 Apr 19, 2024
f45e7d8
Merged with main.
lbooker42 Apr 22, 2024
f2900aa
merged with main
lbooker42 May 1, 2024
9762093
WIP
lbooker42 May 3, 2024
d7c2604
WIP, but test code implemented.
lbooker42 May 14, 2024
905c8ac
merged with main
lbooker42 May 15, 2024
a507402
Tests simplified and passing.
lbooker42 May 20, 2024
8db7923
Merge branch 'main' into lab-iceberg
lbooker42 May 20, 2024
0390102
Gradle cleanup.
lbooker42 May 22, 2024
4539bee
Simplified Iceberg instructions.
lbooker42 May 28, 2024
c5d6be1
Addressed many PR comments.
lbooker42 May 30, 2024
23e4a18
Attempted to handle partitioning columns correctly.
lbooker42 May 31, 2024
fa2e79d
Getting close to final.
lbooker42 May 31, 2024
d6065e4
Another rev from comments.
lbooker42 May 31, 2024
ea5ca0e
WIP, some updates.
lbooker42 Jun 3, 2024
35861c1
Merge branch 'main' into lab-iceberg
lbooker42 Jun 3, 2024
e51cf7c
Hadoop gradle version harmonization.
lbooker42 Jun 3, 2024
b408f12
Iceberg project restructure.
lbooker42 Jun 3, 2024
de7d1f3
Exposing 'iceberg-aws' in gradle.
lbooker42 Jun 3, 2024
23d6e32
Addressing PR comments.
lbooker42 Jun 4, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import io.deephaven.iceberg.location.IcebergTableLocationKey;
import io.deephaven.util.channel.SeekableChannelsProvider;
import org.apache.iceberg.*;
import org.apache.iceberg.io.FileIO;
import org.jetbrains.annotations.NotNull;
Expand All @@ -23,7 +22,7 @@
* Iceberg {@link TableLocationKeyFinder location finder} for tables without partitions that will discover data files
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
* from a {@link org.apache.iceberg.Snapshot}
*/
public final class IcebergPartitionedLayout implements TableLocationKeyFinder<IcebergTableLocationKey> {
public final class IcebergKeyValuePartitionedLayout implements TableLocationKeyFinder<IcebergTableLocationKey> {
private final Snapshot tableSnapshot;
private final FileIO fileIO;
private final String[] partitionColumns;
Expand All @@ -44,7 +43,7 @@ private static IcebergTableLocationKey locationKey(
* @param partitionColumns The columns to use for partitioning.
* @param readInstructions The instructions for customizations while reading.
*/
public IcebergPartitionedLayout(
public IcebergKeyValuePartitionedLayout(
@NotNull final Snapshot tableSnapshot,
@NotNull final FileIO fileIO,
@NotNull final String[] partitionColumns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class IcebergTableLocation implements TableLocation {
private final ImmutableTableKey tableKey;
private final ImmutableTableLocationKey tableLocationKey;

AbstractTableLocation internalTableLocation;
private final AbstractTableLocation internalTableLocation;

public IcebergTableLocation(@NotNull final TableKey tableKey,
@NotNull final IcebergTableLocationKey tableLocationKey,
Expand Down Expand Up @@ -54,7 +54,8 @@ public ImmutableTableKey getTableKey() {
}

@Override
public @NotNull ImmutableTableLocationKey getKey() {
@NotNull
public ImmutableTableLocationKey getKey() {
return tableLocationKey;
}

Expand All @@ -79,12 +80,14 @@ public void refresh() {
}

@Override
public @NotNull List<SortColumn> getSortedColumns() {
@NotNull
public List<SortColumn> getSortedColumns() {
return internalTableLocation.getSortedColumns();
}

@Override
public @NotNull List<String[]> getDataIndexColumns() {
@NotNull
public List<String[]> getDataIndexColumns() {
return internalTableLocation.getDataIndexColumns();
}

Expand All @@ -94,17 +97,20 @@ public boolean hasDataIndex(@NotNull String... columns) {
}

@Override
public @Nullable BasicDataIndex getDataIndex(@NotNull String... columns) {
@Nullable
public BasicDataIndex getDataIndex(@NotNull String... columns) {
return internalTableLocation.getDataIndex(columns);
}

@Override
public @NotNull ColumnLocation getColumnLocation(@NotNull CharSequence name) {
@NotNull
public ColumnLocation getColumnLocation(@NotNull CharSequence name) {
return internalTableLocation.getColumnLocation(name);
}

@Override
public @NotNull Object getStateLock() {
@NotNull
public Object getStateLock() {
return internalTableLocation.getStateLock();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public IcebergTableLocationFactory(@NotNull final Object readInstructions) {
public TableLocation makeLocation(@NotNull final TableKey tableKey,
@NotNull final IcebergTableLocationKey locationKey,
@Nullable final TableDataRefreshService refreshService) {
final URI parquetFileURI = locationKey.getURI();
if (!FILE_URI_SCHEME.equals(parquetFileURI.getScheme()) || new File(parquetFileURI).exists()) {
final URI fileURI = locationKey.getURI();
if (!FILE_URI_SCHEME.equals(fileURI.getScheme()) || new File(fileURI).exists()) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
return new IcebergTableLocation(tableKey, locationKey, readInstructions);
} else {
return new NonexistentTableLocation(tableKey, locationKey);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public class IcebergTableLocationKey extends URITableLocationKey {
private static final String IMPLEMENTATION_NAME = IcebergTableLocationKey.class.getSimpleName();

final FileFormat format;

final URITableLocationKey internalTableLocationKey;

/**
Expand Down Expand Up @@ -59,19 +58,7 @@ public String getImplementationName() {
}

/**
* Returns {@code true} if a previous file reader has been created, or if one was successfully created on-demand.
*
* <p>
* When {@code false}, this may mean that the file:
* <ol>
* <li>does not exist, or is otherwise inaccessible</li>
* <li>is in the process of being written, and is not yet a valid parquet file</li>
* <li>is _not_ a parquet file</li>
* <li>is a corrupt parquet file</li>
* </ol>
*
*
* @return true if the file reader exists or was successfully created
* See {@link ParquetTableLocationKey#verifyFileReader()}.
*/
public synchronized boolean verifyFileReader() {
if (format == FileFormat.PARQUET) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import io.deephaven.extensions.s3.Credentials;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.iceberg.layout.IcebergFlatLayout;
import io.deephaven.iceberg.layout.IcebergPartitionedLayout;
import io.deephaven.iceberg.layout.IcebergKeyValuePartitionedLayout;
import io.deephaven.iceberg.location.IcebergTableLocationFactory;
import io.deephaven.iceberg.location.IcebergTableLocationKey;
import io.deephaven.parquet.table.ParquetInstructions;
Expand All @@ -44,13 +44,13 @@ public class IcebergCatalog {
private final S3Instructions s3Instructions;

/**
* Construct an IcebergCatalog given a set of configurable instructions..
* Construct an IcebergCatalog given a set of configurable instructions.
*
* @param name The optional service name
*/
IcebergCatalog(final @Nullable String name, final IcebergInstructions instructions) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
// Set up the properties map for the Iceberg catalog
Map<String, String> properties = new HashMap<>();
final Map<String, String> properties = new HashMap<>();

final Configuration conf = new Configuration();
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

Expand Down Expand Up @@ -181,7 +181,7 @@ private Table readTableInternal(
partitionSpec.fields().stream().map(PartitionField::name).toArray(String[]::new);

// Create the partitioning column location key finder
keyFinder = new IcebergPartitionedLayout(
keyFinder = new IcebergKeyValuePartitionedLayout(
snapshot,
fileIO,
partitionColumns,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.locations.TableDataException;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
Expand Down Expand Up @@ -35,20 +36,14 @@ static TableDefinition fromSchema(final Schema schema, PartitionSpec partitionSp
for (final Types.NestedField field : schema.columns()) {
final String name = field.name();
final Type type = field.type();
try {
final io.deephaven.qst.type.Type<?> qstType = convertPrimitiveType(type);
final ColumnDefinition<?> column;
if (partitionNames.contains(name)) {
column = ColumnDefinition.of(name, qstType).withPartitioning();
} else {
column = ColumnDefinition.of(name, qstType);
}
columns.add(column);
} catch (UnsupportedOperationException e) {
// TODO: Currently will silently skip the column. Would it be better to skip and warn the user or
// break and declare failure? We don't have a mechanism for skipping columns, do we need an overload
// with a supplied table definition?
final io.deephaven.qst.type.Type<?> qstType = convertPrimitiveType(type);
final ColumnDefinition<?> column;
if (partitionNames.contains(name)) {
column = ColumnDefinition.of(name, qstType).withPartitioning();
} else {
column = ColumnDefinition.of(name, qstType);
}
columns.add(column);
}

return TableDefinition.of(columns);
Expand Down Expand Up @@ -84,7 +79,9 @@ static io.deephaven.qst.type.Type<?> convertPrimitiveType(final Type icebergType
return io.deephaven.qst.type.Type.find(byte[].class);
}
}
throw new UnsupportedOperationException("Unsupported type: " + typeId);
throw new TableDataException(
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could probably easily add support for UUID, and maybe MAP and LIST as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #5573

"Unsupported iceberg column type " + typeId.name() +
" with logical type " + typeId.javaClass());
}

private IcebergTools() {}
Expand Down
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public void setUp() {
.build();
}

@Test
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
public void testNothing() {
// Dummy to prevent JUnit from complaining about no tests
}

// TODO: discuss how to perform tests since they require a full MiniIO + Iceberg setup
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved

// @Test
Expand Down
Loading