Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1802]Register iceberg table metadata update with destination side catalog #3663

Merged
merged 9 commits into from
Apr 3, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;
meethngala marked this conversation as resolved.
Show resolved Hide resolved
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.TableOperations;
Expand All @@ -39,7 +41,7 @@ protected BaseIcebergCatalog(String catalogName, Class<? extends Catalog> compan
}

@Override
public IcebergTable openTable(String dbName, String tableName) {
public IcebergTable openTable(String dbName, String tableName) throws IOException {
meethngala marked this conversation as resolved.
Show resolved Hide resolved
TableIdentifier tableId = TableIdentifier.of(dbName, tableName);
return new IcebergTable(tableId, createTableOperations(tableId), this.getCatalogUri());
}
Expand All @@ -48,5 +50,5 @@ protected Catalog createCompanionCatalog(Map<String, String> properties, Configu
return CatalogUtil.loadCatalog(this.companionCatalogClass.getName(), this.catalogName, properties, configuration);
}

protected abstract TableOperations createTableOperations(TableIdentifier tableId);
protected abstract TableOperations createTableOperations(TableIdentifier tableId) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,17 @@

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;


/**
* Any catalog from which to access {@link IcebergTable}s.
*/
public interface IcebergCatalog {
IcebergTable openTable(String dbName, String tableName);
IcebergTable openTable(String dbName, String tableName) throws IOException;
String getCatalogUri();
void initialize(Map<String, String> properties, Configuration configuration);
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.OwnerAndPermission;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.dataset.DatasetDescriptor;
Expand All @@ -64,21 +65,20 @@
public class IcebergDataset implements PrioritizedCopyableDataset {
private final String dbName;
private final String inputTableName;
private final IcebergTable icebergTable;
private final IcebergTable srcIcebergTable;
private final IcebergTable existingDestinationIcebergTable;
meethngala marked this conversation as resolved.
Show resolved Hide resolved
protected final Properties properties;
protected final FileSystem sourceFs;
private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired

/** Target metastore URI */
public static final String ICEBERG_TARGET_CATALOG_URI_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri";
/** Target database name */
public static final String TARGET_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";

public IcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, FileSystem sourceFs) {
public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, IcebergTable existingDestinationIcebergTable, Properties properties, FileSystem sourceFs) {
this.dbName = db;
this.inputTableName = table;
this.icebergTable = icebergTbl;
this.srcIcebergTable = srcIcebergTable;
this.existingDestinationIcebergTable = existingDestinationIcebergTable;
this.properties = properties;
this.sourceFs = sourceFs;
}
Expand Down Expand Up @@ -153,7 +153,9 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
fileEntity.setSourceData(getSourceDataset(this.sourceFs));
fileEntity.setDestinationData(getDestinationDataset(targetFs));
copyEntities.add(fileEntity);

meethngala marked this conversation as resolved.
Show resolved Hide resolved
}
copyEntities.add(addPostPublishStep(this.srcIcebergTable, this.existingDestinationIcebergTable));
meethngala marked this conversation as resolved.
Show resolved Hide resolved
log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, copyEntities.size());
return copyEntities;
}
Expand All @@ -163,7 +165,7 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
* @return a map of path, file status for each file that needs to be copied
*/
protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
IcebergTable icebergTable = this.getIcebergTable();
IcebergTable icebergTable = this.getSrcIcebergTable();
/** @return whether `pathStr` is present on `targetFs`, caching results while tunneling checked exceptions outward */
Function<String, Boolean> isPresentOnTarget = CheckedExceptionFunction.wrapToTunneled(pathStr ->
// omit considering timestamp (or other markers of freshness), as files should be immutable
Expand Down Expand Up @@ -307,10 +309,15 @@ protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
}

protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
return this.icebergTable.getDatasetDescriptor(sourceFs);
return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
}

protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
return this.icebergTable.getDatasetDescriptor(targetFs);
return this.existingDestinationIcebergTable.getDatasetDescriptor(targetFs);
}

private PostPublishStep addPostPublishStep(IcebergTable srcIcebergTable, IcebergTable dstIcebergTable) {
IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(srcIcebergTable, dstIcebergTable);
return new PostPublishStep(getFileSetId(), Maps.newHashMap(), icebergRegisterStep, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,44 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.IterableDatasetFinder;
import org.apache.gobblin.util.HadoopUtils;


/**
* Finds {@link IcebergDataset}s. Will look for tables in a database using a {@link IcebergCatalog},
* and creates a {@link IcebergDataset} for each one.
*/
@Slf4j
@RequiredArgsConstructor
public class IcebergDatasetFinder implements IterableDatasetFinder<IcebergDataset> {

public static final String ICEBERG_DATASET_PREFIX = DatasetConstants.PLATFORM_ICEBERG + ".dataset";
public static final String ICEBERG_CLUSTER_KEY = "cluster";
public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name";
public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name";
public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.class";
public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
public static final String ICEBERG_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.class";
meethngala marked this conversation as resolved.
Show resolved Hide resolved
public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".source.cluster.name";
public static final String ICEBERG_DST_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri";
public static final String ICEBERG_DST_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".target.cluster.name";
public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name";
public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name";

public enum CatalogLocation {
SOURCE,
DESTINATION
}

protected final FileSystem sourceFs;
private final Properties properties;
Expand All @@ -74,18 +84,15 @@ public List<IcebergDataset> findDatasets() throws IOException {
String dbName = properties.getProperty(ICEBERG_DB_NAME);
String tblName = properties.getProperty(ICEBERG_TABLE_NAME);

try {
IcebergCatalog icebergCatalog = createIcebergCatalog(this.properties);
/* Each Iceberg dataset maps to an Iceberg table
* TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table
*/
matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog, properties, sourceFs));
log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(),
matchingDatasets, dbName, tblName);
return matchingDatasets;
} catch (ReflectiveOperationException exception) {
throw new IOException(exception);
}
IcebergCatalog sourceIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.SOURCE);
IcebergCatalog destinationIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.DESTINATION);
/* Each Iceberg dataset maps to an Iceberg table
* TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table
meethngala marked this conversation as resolved.
Show resolved Hide resolved
*/
matchingDatasets.add(createIcebergDataset(dbName, tblName, sourceIcebergCatalog, destinationIcebergCatalog, properties, sourceFs));
log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(),
meethngala marked this conversation as resolved.
Show resolved Hide resolved
matchingDatasets, dbName, tblName);
return matchingDatasets;
}

@Override
Expand All @@ -98,20 +105,40 @@ public Iterator<IcebergDataset> getDatasetsIterator() throws IOException {
return findDatasets().iterator();
}

protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
/**
* Requires both source and destination catalogs to connect to their respective {@link IcebergTable}
* Note: the destination side {@link IcebergTable} should be present before initiating replication
meethngala marked this conversation as resolved.
Show resolved Hide resolved
* @return {@link IcebergDataset} with its corresponding source and destination {@link IcebergTable}
*/
protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, Properties properties, FileSystem fs) throws IOException {
IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName);
IcebergTable existingDestinationIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName);
return new IcebergDataset(dbName, tblName, srcIcebergTable, existingDestinationIcebergTable, properties, fs);
}

protected IcebergCatalog createIcebergCatalog(Properties properties) throws IOException, ClassNotFoundException {
protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException {
Map<String, String> catalogProperties = new HashMap<>();
String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is required");
catalogProperties.put(CatalogProperties.URI, catalogUri);
// introducing an optional property for catalogs requiring cluster specific properties
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
Configuration configuration = HadoopUtils.getConfFromProperties(properties);
String icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
String catalogUri;
String icebergCatalogClassName;
switch (location) {
case SOURCE:
catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service URI is required");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cleaner and more encapsulated might be to add a method on the enum (class) that checks for the property. for this, you'd initialize each enum value w/ its associated URI key. hence you might say:

catalogProperties.put(I_C_K, location.getCatalogUri(properties))

// introducing an optional property for catalogs requiring cluster specific properties
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
break;
case DESTINATION:
catalogUri = properties.getProperty(ICEBERG_DST_CATALOG_URI_KEY);
Preconditions.checkNotNull(catalogUri, "Destination Catalog Table Service URI is required");
meethngala marked this conversation as resolved.
Show resolved Hide resolved
// introducing an optional property for catalogs requiring cluster specific properties
Optional.ofNullable(properties.getProperty(ICEBERG_DST_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
break;
default:
throw new UnsupportedOperationException("Incorrect desired location: %s provided for creating Iceberg Catalog" + location);
}
icebergCatalogClassName = properties.getProperty(ICEBERG_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
catalogProperties.put(CatalogProperties.URI, catalogUri);
return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hive.HiveCatalog;

import lombok.extern.slf4j.Slf4j;


Expand Down Expand Up @@ -51,7 +54,10 @@ public String getCatalogUri() {
}

@Override
protected TableOperations createTableOperations(TableIdentifier tableId) {
protected TableOperations createTableOperations(TableIdentifier tableId) throws IOException {
if (!hc.tableExists(tableId)) {
throw new IcebergTable.TableNotFoundException(tableId);
}
meethngala marked this conversation as resolved.
Show resolved Hide resolved
return hc.newTableOps(tableId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;

import org.apache.iceberg.TableMetadata;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.commit.CommitStep;

/**
* {@link CommitStep} to perform Iceberg registration.
*/

meethngala marked this conversation as resolved.
Show resolved Hide resolved
@Slf4j
@AllArgsConstructor
public class IcebergRegisterStep implements CommitStep {

private final IcebergTable srcIcebergTable;
private final IcebergTable existingDestinationIcebergTable;

@Override
public boolean isCompleted() throws IOException {
return false;
}

@Override
public void execute() throws IOException {
TableMetadata destinationMetadata = null;
try {
destinationMetadata = this.existingDestinationIcebergTable.accessTableMetadata();
} catch (IcebergTable.TableNotFoundException tnfe) {
log.warn("Destination TableMetadata doesn't exist because : {}" , tnfe);
meethngala marked this conversation as resolved.
Show resolved Hide resolved
}
this.existingDestinationIcebergTable.registerIcebergTable(this.srcIcebergTable.accessTableMetadata(), destinationMetadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.hadoop.fs.FileSystem;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
Expand All @@ -44,6 +40,10 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;

Expand Down Expand Up @@ -194,4 +194,9 @@ protected DatasetDescriptor getDatasetDescriptor(FileSystem fs) {
descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
return descriptor;
}
/** Registers {@link IcebergTable} after publishing data.
* @param dstMetadata is null if target {@link IcebergTable} is absent */
meethngala marked this conversation as resolved.
Show resolved Hide resolved
protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dstMetadata) {
this.tableOps.commit(srcMetadata, dstMetadata);
}
}
Loading