Skip to content

Commit

Permalink
feat: Add auto-legalization for Iceberg Schema column names. (#5746)
Browse files Browse the repository at this point in the history
Will close #5714 when merged.
  • Loading branch information
lbooker42 authored Jul 10, 2024
1 parent 04e19cc commit 353ad20
Show file tree
Hide file tree
Showing 6 changed files with 215 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,22 +130,23 @@ public void testListTables() {
final Namespace ns = Namespace.of("sales");

Collection<TableIdentifier> tables = adapter.listTables(ns);
Assert.eq(tables.size(), "tables.size()", 3, "3 tables in the namespace");
Assert.eq(tables.size(), "tables.size()", 4, "4 tables in the namespace");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_multi")), "tables.contains(sales_multi)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_partitioned")),
"tables.contains(sales_partitioned)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_single")), "tables.contains(sales_single)");
Assert.eqTrue(tables.contains(TableIdentifier.of(ns, "sales_renamed")), "tables.contains(sales_renamed)");

Table table = adapter.listTablesAsTable(ns);
Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace");
Assert.eq(table.size(), "table.size()", 4, "4 tables in the namespace");
Assert.eqTrue(table.getColumnSource("Namespace").getType().equals(String.class), "namespace column type");
Assert.eqTrue(table.getColumnSource("TableName").getType().equals(String.class), "table_name column type");
Assert.eqTrue(table.getColumnSource("TableIdentifierObject").getType().equals(TableIdentifier.class),
"table_identifier_object column type");

// Test the string versions of the methods
table = adapter.listTablesAsTable("sales");
Assert.eq(table.size(), "table.size()", 3, "3 tables in the namespace");
Assert.eq(table.size(), "table.size()", 4, "4 tables in the namespace");
}

@Test
Expand Down Expand Up @@ -509,8 +510,8 @@ public void testOpenTableColumnRename() throws ExecutionException, InterruptedEx

final IcebergInstructions localInstructions = IcebergInstructions.builder()
.dataInstructions(instructions.dataInstructions().get())
.putColumnRenames("RegionName", "Region")
.putColumnRenames("ItemType", "Item_Type")
.putColumnRenames("Region", "RegionName")
.putColumnRenames("Item_Type", "ItemType")
.build();

final IcebergCatalogAdapter adapter =
Expand All @@ -524,6 +525,88 @@ public void testOpenTableColumnRename() throws ExecutionException, InterruptedEx
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");
}

@Test
public void testOpenTableColumnLegalization() throws ExecutionException, InterruptedException, TimeoutException {
uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_renamed").getPath()),
warehousePath);

final IcebergInstructions localInstructions = IcebergInstructions.builder()
.dataInstructions(instructions.dataInstructions().get())
.build();

final IcebergCatalogAdapter adapter =
IcebergTools.createAdapter(resourceCatalog, resourceFileIO);

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_renamed");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

Assert.eqTrue(table.getDefinition().getColumn("Region_Name") != null, "'Region Name' renamed");
Assert.eqTrue(table.getDefinition().getColumn("ItemType") != null, "'Item&Type' renamed");
Assert.eqTrue(table.getDefinition().getColumn("UnitsSold") != null, "'Units/Sold' renamed");
Assert.eqTrue(table.getDefinition().getColumn("Unit_Price") != null, "'Unit Pricee' renamed");
Assert.eqTrue(table.getDefinition().getColumn("Order_Date") != null, "'Order Date' renamed");
}

@Test
public void testOpenTableColumnLegalizationRename()
throws ExecutionException, InterruptedException, TimeoutException {
uploadParquetFiles(new File(IcebergToolsTest.class.getResource("/warehouse/sales/sales_renamed").getPath()),
warehousePath);

final IcebergInstructions localInstructions = IcebergInstructions.builder()
.dataInstructions(instructions.dataInstructions().get())
.putColumnRenames("Item&Type", "Item_Type")
.putColumnRenames("Units/Sold", "Units_Sold")
.build();

final IcebergCatalogAdapter adapter =
IcebergTools.createAdapter(resourceCatalog, resourceFileIO);

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_renamed");
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions);

// Verify we retrieved all the rows.
Assert.eq(table.size(), "table.size()", 100_000, "100_000 rows in the table");

Assert.eqTrue(table.getDefinition().getColumn("Region_Name") != null, "'Region Name' renamed");
Assert.eqTrue(table.getDefinition().getColumn("Item_Type") != null, "'Item&Type' renamed");
Assert.eqTrue(table.getDefinition().getColumn("Units_Sold") != null, "'Units/Sold' renamed");
Assert.eqTrue(table.getDefinition().getColumn("Unit_Price") != null, "'Unit Pricee' renamed");
Assert.eqTrue(table.getDefinition().getColumn("Order_Date") != null, "'Order Date' renamed");
}

@Test
public void testOpenTableColumnLegalizationPartitionException()
throws ExecutionException, InterruptedException, TimeoutException {
final TableDefinition tableDef = TableDefinition.of(
ColumnDefinition.ofInt("Year").withPartitioning(),
ColumnDefinition.ofInt("Month").withPartitioning());

final IcebergInstructions localInstructions = IcebergInstructions.builder()
.tableDefinition(tableDef)
.putColumnRenames("Year", "Current Year")
.putColumnRenames("Month", "Current Month")
.dataInstructions(instructions.dataInstructions().get())
.build();

final IcebergCatalogAdapter adapter =
IcebergTools.createAdapter(resourceCatalog, resourceFileIO);

final Namespace ns = Namespace.of("sales");
final TableIdentifier tableId = TableIdentifier.of(ns, "sales_partitioned");
try {
final io.deephaven.engine.table.Table table = adapter.readTable(tableId, localInstructions);
Assert.statementNeverExecuted("Expected an exception for missing columns");
} catch (final TableDataException e) {
Assert.eqTrue(e.getMessage().contains("invalid column name provided"), "Exception message");
}
}

@Test
public void testOpenTableColumnRenamePartitioningColumns()
throws ExecutionException, InterruptedException, TimeoutException {
Expand Down
Git LFS file not shown
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
{
"format-version" : 2,
"table-uuid" : "49f61dcb-0c7a-414d-8e67-9bf190a24032",
"location" : "s3://warehouse/sales/sales_renamed",
"last-sequence-number" : 1,
"last-updated-ms" : 1720562597002,
"last-column-id" : 5,
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "Region Name",
"required" : false,
"type" : "string"
}, {
"id" : 2,
"name" : "Item&Type",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "Units/Sold",
"required" : false,
"type" : "int"
}, {
"id" : 4,
"name" : "Unit Price",
"required" : false,
"type" : "double"
}, {
"id" : 5,
"name" : "Order Date",
"required" : false,
"type" : "timestamptz"
} ]
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ ]
} ],
"last-partition-id" : 999,
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"owner" : "root",
"created-at" : "2024-07-09T22:03:15.386555127Z",
"write.format.default" : "parquet",
"write.parquet.compression-codec" : "zstd"
},
"current-snapshot-id" : 1014870679100567484,
"refs" : {
"main" : {
"snapshot-id" : 1014870679100567484,
"type" : "branch"
}
},
"snapshots" : [ {
"sequence-number" : 1,
"snapshot-id" : 1014870679100567484,
"timestamp-ms" : 1720562597002,
"summary" : {
"operation" : "append",
"spark.app.id" : "local-1720562294285",
"added-data-files" : "1",
"added-records" : "100000",
"added-files-size" : "729387",
"changed-partition-count" : "1",
"total-records" : "100000",
"total-files-size" : "729387",
"total-data-files" : "1",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "s3://warehouse/sales/sales_renamed/metadata/snap-1014870679100567484-1-6154af6b-49cf-41eb-a340-ff60964e750d.avro",
"schema-id" : 0
} ],
"statistics" : [ ],
"partition-statistics" : [ ],
"snapshot-log" : [ {
"timestamp-ms" : 1720562597002,
"snapshot-id" : 1014870679100567484
} ],
"metadata-log" : [ ]
}
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//
package io.deephaven.iceberg.util;

import io.deephaven.api.util.NameValidator;
import io.deephaven.engine.rowset.RowSetFactory;
import io.deephaven.engine.table.*;
import io.deephaven.engine.table.impl.PartitionAwareSourceTable;
Expand Down Expand Up @@ -453,10 +454,40 @@ private Table readTableInternal(
// Get the user supplied table definition.
final TableDefinition userTableDef = userInstructions.tableDefinition().orElse(null);

final Set<String> takenNames = new HashSet<>();

// Map all the column names in the schema to their legalized names.
final Map<String, String> legalizedColumnRenames = new HashMap<>();

// Validate user-supplied names meet legalization requirements
for (final Map.Entry<String, String> entry : userInstructions.columnRenames().entrySet()) {
final String destinationName = entry.getValue();
if (!NameValidator.isValidColumnName(destinationName)) {
throw new TableDataException(
String.format("%s:%d - invalid column name provided (%s)", table, snapshot.snapshotId(),
destinationName));
}
// Add these renames to the legalized list.
legalizedColumnRenames.put(entry.getKey(), destinationName);
takenNames.add(destinationName);
}

for (final Types.NestedField field : schema.columns()) {
final String name = field.name();
// Do we already have a valid rename for this column from the user or a partitioned column?
if (!legalizedColumnRenames.containsKey(name)) {
final String legalizedName =
NameValidator.legalizeColumnName(name, s -> s.replace(" ", "_"), takenNames);
if (!legalizedName.equals(name)) {
legalizedColumnRenames.put(name, legalizedName);
takenNames.add(legalizedName);
}
}
}

// Get the table definition from the schema (potentially limited by the user supplied table definition and
// applying column renames).
final TableDefinition icebergTableDef =
fromSchema(schema, partitionSpec, userTableDef, userInstructions.columnRenames());
final TableDefinition icebergTableDef = fromSchema(schema, partitionSpec, userTableDef, legalizedColumnRenames);

// If the user supplied a table definition, make sure it's fully compatible.
final TableDefinition tableDef;
Expand Down

0 comments on commit 353ad20

Please sign in to comment.