From ae3ce96f9c937a6e138113a7999deb78b133e155 Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Mon, 4 Dec 2023 12:08:37 -0800 Subject: [PATCH 1/2] Allow `IcebergCatalog` to specify the `DatasetDescriptor` name for the `IcebergTable`s it creates --- .../management/copy/iceberg/BaseIcebergCatalog.java | 10 +++++++++- .../data/management/copy/iceberg/IcebergTable.java | 10 +++++++++- .../management/copy/iceberg/IcebergDatasetTest.java | 5 +++-- 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java index 0ac4dcc0b8a..85c99584443 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java @@ -42,12 +42,20 @@ protected BaseIcebergCatalog(String catalogName, Class compan @Override public IcebergTable openTable(String dbName, String tableName) { TableIdentifier tableId = TableIdentifier.of(dbName, tableName); - return new IcebergTable(tableId, createTableOperations(tableId), this.getCatalogUri()); + return new IcebergTable(tableId, calcDatasetDescriptorName(tableId), createTableOperations(tableId), this.getCatalogUri()); } protected Catalog createCompanionCatalog(Map properties, Configuration configuration) { return CatalogUtil.loadCatalog(this.companionCatalogClass.getName(), this.catalogName, properties, configuration); } + /** + * Enable catalog-specific qualification for charting lineage, etc. This default impl is an identity pass-through that adds no qualification. + * @return the name to use for the table identified by {@link TableIdentifier} + */ + protected String calcDatasetDescriptorName(TableIdentifier tableId) { + return tableId.toString(); // default to FQ ID with both table namespace and name + } + protected abstract TableOperations createTableOperations(TableIdentifier tableId); } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index a07f6ac89cf..24febe897fc 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -36,6 +36,7 @@ import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.FileIO; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Iterators; import com.google.common.collect.Lists; import com.google.common.collect.Sets; @@ -70,9 +71,16 @@ public TableNotFoundException(TableIdentifier tableId) { @Getter private final TableIdentifier tableId; + /** allow the {@link IcebergCatalog} creating this table to qualify its name when used for lineage, etc. */ + private final String datasetDescriptorName; private final TableOperations tableOps; private final String catalogUri; + @VisibleForTesting + IcebergTable(TableIdentifier tableId, TableOperations tableOps, String catalogUri) { + this(tableId, tableId.toString(), tableOps, catalogUri); + } + /** @return metadata info limited to the most recent (current) snapshot */ public IcebergSnapshotInfo getCurrentSnapshotInfo() throws IOException { TableMetadata current = accessTableMetadata(); @@ -188,7 +196,7 @@ public DatasetDescriptor getDatasetDescriptor(FileSystem fs) { DatasetDescriptor descriptor = new DatasetDescriptor( DatasetConstants.PLATFORM_ICEBERG, URI.create(this.catalogUri), - this.tableId.toString() // use FQ ID, including table namespace + this.datasetDescriptorName ); descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString()); return descriptor; diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java index 0b485e1df2d..4b457923e1a 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java @@ -116,10 +116,11 @@ public void setUp() throws Exception { @Test public void testGetDatasetDescriptor() throws URISyntaxException { TableIdentifier tableId = TableIdentifier.of(testDbName, testTblName); - IcebergTable table = new IcebergTable(tableId, Mockito.mock(TableOperations.class), SRC_CATALOG_URI); + String qualifiedTableName = "foo_prefix." + tableId.toString(); + IcebergTable table = new IcebergTable(tableId, qualifiedTableName, Mockito.mock(TableOperations.class), SRC_CATALOG_URI); FileSystem mockFs = Mockito.mock(FileSystem.class); Mockito.when(mockFs.getUri()).thenReturn(SRC_FS_URI); - DatasetDescriptor expected = new DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG, URI.create(SRC_CATALOG_URI), tableId.toString()); + DatasetDescriptor expected = new DatasetDescriptor(DatasetConstants.PLATFORM_ICEBERG, URI.create(SRC_CATALOG_URI), qualifiedTableName); expected.addMetadata(DatasetConstants.FS_URI, SRC_FS_URI.toString()); Assert.assertEquals(table.getDatasetDescriptor(mockFs), expected); } From 8d9687299f05668da647ab430e5be093f2cdf201 Mon Sep 17 00:00:00 2001 From: Kip Kohn Date: Mon, 4 Dec 2023 13:34:24 -0800 Subject: [PATCH 2/2] small method javadoc --- .../gobblin/data/management/copy/iceberg/IcebergCatalog.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java index 5794a4c03c4..68e9bb31c65 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java @@ -28,8 +28,10 @@ */ public interface IcebergCatalog { + /** @return table identified by `dbName` and `tableName` */ IcebergTable openTable(String dbName, String tableName); + /** @return table identified by `tableId` */ default IcebergTable openTable(TableIdentifier tableId) { // CHALLENGE: clearly better to implement in the reverse direction - `openTable(String, String)` in terms of `openTable(TableIdentifier)` - // but challenging to do at this point, with multiple derived classes already "in the wild" that implement `openTable(String, String)`