diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java index f4db7d4ff78..05f1e265d2a 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java @@ -73,7 +73,7 @@ public class IcebergDataset implements PrioritizedCopyableDataset { private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired /** Destination database name */ - public static final String DESTINATION_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.destination.database"; + public static final String DESTINATION_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".destination.database"; public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs) { this.dbName = db; @@ -155,7 +155,8 @@ Collection generateCopyEntities(FileSystem targetFs, CopyConfigurati fileEntity.setDestinationData(getDestinationDataset(targetFs)); copyEntities.add(fileEntity); } - copyEntities.add(createPostPublishStep(this.srcIcebergTable, this.destIcebergTable)); + // TODO: Filter properties specific to iceberg registration and avoid serializing every global property + copyEntities.add(createPostPublishStep(this.dbName, this.inputTableName, this.properties)); log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, copyEntities.size()); return copyEntities; } @@ -316,8 +317,8 @@ protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) { return this.destIcebergTable.getDatasetDescriptor(targetFs); } - private PostPublishStep createPostPublishStep(IcebergTable srcIcebergTable, IcebergTable dstIcebergTable) { - IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(srcIcebergTable, dstIcebergTable); + private PostPublishStep createPostPublishStep(String dbName, String inputTableName, Properties properties) { + IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(dbName, inputTableName, properties); return new PostPublishStep(getFileSetId(), Maps.newHashMap(), icebergRegisterStep, 0); } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index b20a1bc292c..beded5a723a 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -23,7 +23,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Properties; import org.apache.commons.lang.StringUtils; @@ -33,9 +32,13 @@ import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValue; + import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.config.ConfigBuilder; import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.IterableDatasetFinder; import org.apache.gobblin.util.HadoopUtils; @@ -49,20 +52,26 @@ @RequiredArgsConstructor public class IcebergDatasetFinder implements IterableDatasetFinder { public static final String ICEBERG_DATASET_PREFIX = DatasetConstants.PLATFORM_ICEBERG + ".dataset"; - public static final String ICEBERG_CLUSTER_KEY = "cluster"; - public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.class"; public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog"; - public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri"; - public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".source.cluster.name"; - public static final String ICEBERG_DEST_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".destination.catalog.class"; - public static final String ICEBERG_DEST_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".copy.destination.catalog.uri"; - public static final String ICEBERG_DEST_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".destination.cluster.name"; + public static final String ICEBERG_CATALOG_KEY = "catalog"; + /** + * This is used with a prefix: "{@link IcebergDatasetFinder#ICEBERG_DATASET_PREFIX}" + "." + "(source or destination)" + "." + "{@link IcebergDatasetFinder#ICEBERG_CATALOG_KEY}" + "..." + * It is an open-ended pattern used to pass arbitrary catalog specific properties + */ + public static final String ICEBERG_CATALOG_CLASS_KEY = "class"; public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name"; public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name"; public enum CatalogLocation { SOURCE, - DESTINATION + DESTINATION; + + /** + * Provides prefix for configs based on the catalog location to filter catalog specific properties + */ + public String getConfigPrefix() { + return ICEBERG_DATASET_PREFIX + "." + this.toString().toLowerCase() + "." + ICEBERG_CATALOG_KEY + "."; + } } protected final FileSystem sourceFs; @@ -88,7 +97,7 @@ public List findDatasets() throws IOException { IcebergCatalog sourceIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.SOURCE); IcebergCatalog destinationIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.DESTINATION); /* Each Iceberg dataset maps to an Iceberg table */ - matchingDatasets.add(createIcebergDataset(dbName, tblName, sourceIcebergCatalog, destinationIcebergCatalog, properties, sourceFs)); + matchingDatasets.add(createIcebergDataset(dbName, tblName, sourceIcebergCatalog, destinationIcebergCatalog, this.properties, this.sourceFs)); log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(), matchingDatasets, dbName, tblName); // until future support added to specify multiple icebergs, count expected always to be one return matchingDatasets; @@ -118,30 +127,26 @@ protected IcebergDataset createIcebergDataset(String dbName, String tblName, Ice return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs); } - protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException { - Map catalogProperties = new HashMap<>(); + protected static IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException { + String prefix = location.getConfigPrefix(); + Map catalogProperties = buildMapFromPrefixChildren(properties, prefix); + // TODO: Filter properties specific to Hadoop Configuration configuration = HadoopUtils.getConfFromProperties(properties); - String catalogUri; - String icebergCatalogClassName; - switch (location) { - case SOURCE: - catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY); - Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY); - // introducing an optional property for catalogs requiring cluster specific properties - Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); - icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); - break; - case DESTINATION: - catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY); - Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY); - // introducing an optional property for catalogs requiring cluster specific properties - Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); - icebergCatalogClassName = properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); - break; - default: - throw new UnsupportedOperationException("Incorrect desired location: %s provided for creating Iceberg Catalog" + location); - } - catalogProperties.put(CatalogProperties.URI, catalogUri); + String icebergCatalogClassName = catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration); } + + /** + * Filters the properties based on a prefix using {@link ConfigBuilder#loadProps(Properties, String)} and creates a {@link Map} + */ + protected static Map buildMapFromPrefixChildren(Properties properties, String configPrefix) { + Map catalogProperties = new HashMap<>(); + Config config = ConfigBuilder.create().loadProps(properties, configPrefix).build(); + for (Map.Entry entry : config.entrySet()) { + catalogProperties.put(entry.getKey(), entry.getValue().unwrapped().toString()); + } + String catalogUri = config.getString(CatalogProperties.URI); + Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Catalog Table Service URI is required", configPrefix + "." + CatalogProperties.URI); + return catalogProperties; + } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java index 75f26787b09..8f32f8cc039 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java @@ -18,6 +18,7 @@ package org.apache.gobblin.data.management.copy.iceberg; import java.io.IOException; +import java.util.Properties; import org.apache.iceberg.TableMetadata; @@ -33,8 +34,9 @@ @AllArgsConstructor public class IcebergRegisterStep implements CommitStep { - private final IcebergTable srcIcebergTable; - private final IcebergTable destIcebergTable; + private final String dbName; + private final String tblName; + private final Properties properties; @Override public boolean isCompleted() throws IOException { @@ -43,12 +45,20 @@ public boolean isCompleted() throws IOException { @Override public void execute() throws IOException { + IcebergTable srcIcebergTable = IcebergDatasetFinder.createIcebergCatalog(this.properties, IcebergDatasetFinder.CatalogLocation.SOURCE) + .openTable(this.dbName, this.tblName); + IcebergTable destIcebergTable = IcebergDatasetFinder.createIcebergCatalog(this.properties, IcebergDatasetFinder.CatalogLocation.DESTINATION) + .openTable(this.dbName, this.tblName); TableMetadata destinationMetadata = null; try { - destinationMetadata = this.destIcebergTable.accessTableMetadata(); + destinationMetadata = destIcebergTable.accessTableMetadata(); } catch (IcebergTable.TableNotFoundException tnfe) { log.warn("Destination TableMetadata doesn't exist because: " , tnfe); } - this.destIcebergTable.registerIcebergTable(this.srcIcebergTable.accessTableMetadata(), destinationMetadata); + destIcebergTable.registerIcebergTable(srcIcebergTable.accessTableMetadata(), destinationMetadata); + } + @Override + public String toString() { + return String.format("Registering Iceberg Table: {%s}.{%s} ", this.dbName, this.tblName); } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index e8d0ee0ac28..6671ebdeb64 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -198,7 +198,8 @@ protected DatasetDescriptor getDatasetDescriptor(FileSystem fs) { * @param dstMetadata is null if destination {@link IcebergTable} is absent, in which case registration is skipped */ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dstMetadata) { if (dstMetadata != null) { - this.tableOps.commit(srcMetadata, dstMetadata); + // use current destination metadata as 'base metadata' and source as 'updated metadata' while committing + this.tableOps.commit(dstMetadata, srcMetadata.replaceProperties(dstMetadata.properties())); } } }