Skip to content

Commit

Permalink
[GOBBLIN-1811]Fix Iceberg Registration Serialization (#3673)
Browse files Browse the repository at this point in the history
* Fix Iceberg Register Step serialization

* remove unused string properties

* address PR comments

* added comments

* fix checkstyle and replace table metadata props while committing

---------

Co-authored-by: Meeth Gala <mgala@linkedin.com>
  • Loading branch information
meethngala and Meeth Gala authored Apr 14, 2023
1 parent ebdb03f commit 27dea4a
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class IcebergDataset implements PrioritizedCopyableDataset {
private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired

/** Destination database name */
public static final String DESTINATION_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.destination.database";
public static final String DESTINATION_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".destination.database";

public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs) {
this.dbName = db;
Expand Down Expand Up @@ -155,7 +155,8 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
fileEntity.setDestinationData(getDestinationDataset(targetFs));
copyEntities.add(fileEntity);
}
copyEntities.add(createPostPublishStep(this.srcIcebergTable, this.destIcebergTable));
// TODO: Filter properties specific to iceberg registration and avoid serializing every global property
copyEntities.add(createPostPublishStep(this.dbName, this.inputTableName, this.properties));
log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, copyEntities.size());
return copyEntities;
}
Expand Down Expand Up @@ -316,8 +317,8 @@ protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
return this.destIcebergTable.getDatasetDescriptor(targetFs);
}

private PostPublishStep createPostPublishStep(IcebergTable srcIcebergTable, IcebergTable dstIcebergTable) {
IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(srcIcebergTable, dstIcebergTable);
private PostPublishStep createPostPublishStep(String dbName, String inputTableName, Properties properties) {
IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(dbName, inputTableName, properties);
return new PostPublishStep(getFileSetId(), Maps.newHashMap(), icebergRegisterStep, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import org.apache.commons.lang.StringUtils;
Expand All @@ -33,9 +32,13 @@
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

import com.typesafe.config.Config;
import com.typesafe.config.ConfigValue;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.IterableDatasetFinder;
import org.apache.gobblin.util.HadoopUtils;
Expand All @@ -49,20 +52,26 @@
@RequiredArgsConstructor
public class IcebergDatasetFinder implements IterableDatasetFinder<IcebergDataset> {
public static final String ICEBERG_DATASET_PREFIX = DatasetConstants.PLATFORM_ICEBERG + ".dataset";
public static final String ICEBERG_CLUSTER_KEY = "cluster";
public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.class";
public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".source.cluster.name";
public static final String ICEBERG_DEST_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".destination.catalog.class";
public static final String ICEBERG_DEST_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".copy.destination.catalog.uri";
public static final String ICEBERG_DEST_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".destination.cluster.name";
public static final String ICEBERG_CATALOG_KEY = "catalog";
/**
* This is used with a prefix: "{@link IcebergDatasetFinder#ICEBERG_DATASET_PREFIX}" + "." + "(source or destination)" + "." + "{@link IcebergDatasetFinder#ICEBERG_CATALOG_KEY}" + "..."
* It is an open-ended pattern used to pass arbitrary catalog specific properties
*/
public static final String ICEBERG_CATALOG_CLASS_KEY = "class";
public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name";
public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name";

public enum CatalogLocation {
SOURCE,
DESTINATION
DESTINATION;

/**
* Provides prefix for configs based on the catalog location to filter catalog specific properties
*/
public String getConfigPrefix() {
return ICEBERG_DATASET_PREFIX + "." + this.toString().toLowerCase() + "." + ICEBERG_CATALOG_KEY + ".";
}
}

protected final FileSystem sourceFs;
Expand All @@ -88,7 +97,7 @@ public List<IcebergDataset> findDatasets() throws IOException {
IcebergCatalog sourceIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.SOURCE);
IcebergCatalog destinationIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.DESTINATION);
/* Each Iceberg dataset maps to an Iceberg table */
matchingDatasets.add(createIcebergDataset(dbName, tblName, sourceIcebergCatalog, destinationIcebergCatalog, properties, sourceFs));
matchingDatasets.add(createIcebergDataset(dbName, tblName, sourceIcebergCatalog, destinationIcebergCatalog, this.properties, this.sourceFs));
log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(),
matchingDatasets, dbName, tblName); // until future support added to specify multiple icebergs, count expected always to be one
return matchingDatasets;
Expand Down Expand Up @@ -118,30 +127,26 @@ protected IcebergDataset createIcebergDataset(String dbName, String tblName, Ice
return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs);
}

protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException {
Map<String, String> catalogProperties = new HashMap<>();
protected static IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException {
String prefix = location.getConfigPrefix();
Map<String, String> catalogProperties = buildMapFromPrefixChildren(properties, prefix);
// TODO: Filter properties specific to Hadoop
Configuration configuration = HadoopUtils.getConfFromProperties(properties);
String catalogUri;
String icebergCatalogClassName;
switch (location) {
case SOURCE:
catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
// introducing an optional property for catalogs requiring cluster specific properties
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
break;
case DESTINATION:
catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
// introducing an optional property for catalogs requiring cluster specific properties
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
icebergCatalogClassName = properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
break;
default:
throw new UnsupportedOperationException("Incorrect desired location: %s provided for creating Iceberg Catalog" + location);
}
catalogProperties.put(CatalogProperties.URI, catalogUri);
String icebergCatalogClassName = catalogProperties.getOrDefault(ICEBERG_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration);
}

/**
* Filters the properties based on a prefix using {@link ConfigBuilder#loadProps(Properties, String)} and creates a {@link Map}
*/
protected static Map<String, String> buildMapFromPrefixChildren(Properties properties, String configPrefix) {
Map<String, String> catalogProperties = new HashMap<>();
Config config = ConfigBuilder.create().loadProps(properties, configPrefix).build();
for (Map.Entry<String, ConfigValue> entry : config.entrySet()) {
catalogProperties.put(entry.getKey(), entry.getValue().unwrapped().toString());
}
String catalogUri = config.getString(CatalogProperties.URI);
Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Catalog Table Service URI is required", configPrefix + "." + CatalogProperties.URI);
return catalogProperties;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;
import java.util.Properties;

import org.apache.iceberg.TableMetadata;

Expand All @@ -33,8 +34,9 @@
@AllArgsConstructor
public class IcebergRegisterStep implements CommitStep {

private final IcebergTable srcIcebergTable;
private final IcebergTable destIcebergTable;
private final String dbName;
private final String tblName;
private final Properties properties;

@Override
public boolean isCompleted() throws IOException {
Expand All @@ -43,12 +45,20 @@ public boolean isCompleted() throws IOException {

@Override
public void execute() throws IOException {
IcebergTable srcIcebergTable = IcebergDatasetFinder.createIcebergCatalog(this.properties, IcebergDatasetFinder.CatalogLocation.SOURCE)
.openTable(this.dbName, this.tblName);
IcebergTable destIcebergTable = IcebergDatasetFinder.createIcebergCatalog(this.properties, IcebergDatasetFinder.CatalogLocation.DESTINATION)
.openTable(this.dbName, this.tblName);
TableMetadata destinationMetadata = null;
try {
destinationMetadata = this.destIcebergTable.accessTableMetadata();
destinationMetadata = destIcebergTable.accessTableMetadata();
} catch (IcebergTable.TableNotFoundException tnfe) {
log.warn("Destination TableMetadata doesn't exist because: " , tnfe);
}
this.destIcebergTable.registerIcebergTable(this.srcIcebergTable.accessTableMetadata(), destinationMetadata);
destIcebergTable.registerIcebergTable(srcIcebergTable.accessTableMetadata(), destinationMetadata);
}
@Override
public String toString() {
return String.format("Registering Iceberg Table: {%s}.{%s} ", this.dbName, this.tblName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ protected DatasetDescriptor getDatasetDescriptor(FileSystem fs) {
* @param dstMetadata is null if destination {@link IcebergTable} is absent, in which case registration is skipped */
protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dstMetadata) {
if (dstMetadata != null) {
this.tableOps.commit(srcMetadata, dstMetadata);
// use current destination metadata as 'base metadata' and source as 'updated metadata' while committing
this.tableOps.commit(dstMetadata, srcMetadata.replaceProperties(dstMetadata.properties()));
}
}
}

0 comments on commit 27dea4a

Please sign in to comment.