Skip to content

Commit

Permalink
[GOBBLIN-1802]Register iceberg table metadata update with destination…
Browse files Browse the repository at this point in the history
… side catalog (#3663)

* initial commit for iceberg table registration

* address PR comments

* replace target with dest for consistency and address PR comments

* adding pre-check for dest iceberg table

* change params for tableAlreadyExists and address checkstyle

* add pre-check for iceberg table on source and guard against missing dest metadata while committing

* address feedback on PR

* updated javadoc

* update error message for catalog uri missing

---------

Co-authored-by: Meeth Gala <mgala@linkedin.com>
  • Loading branch information
meethngala and Meeth Gala authored Apr 3, 2023
1 parent 5338bdc commit b428a66
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.gobblin.data.management.copy.iceberg;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogUtil;
import org.apache.iceberg.TableOperations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.gobblin.data.management.copy.iceberg;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;


Expand All @@ -28,4 +29,5 @@ public interface IcebergCatalog {
IcebergTable openTable(String dbName, String tableName);
String getCatalogUri();
void initialize(Map<String, String> properties, Configuration configuration);
boolean tableAlreadyExists(IcebergTable icebergTable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.gobblin.data.management.copy.CopyableDataset;
import org.apache.gobblin.data.management.copy.CopyableFile;
import org.apache.gobblin.data.management.copy.OwnerAndPermission;
import org.apache.gobblin.data.management.copy.entities.PostPublishStep;
import org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset;
import org.apache.gobblin.data.management.partition.FileSet;
import org.apache.gobblin.dataset.DatasetDescriptor;
Expand All @@ -64,21 +65,21 @@
public class IcebergDataset implements PrioritizedCopyableDataset {
private final String dbName;
private final String inputTableName;
private final IcebergTable icebergTable;
private final IcebergTable srcIcebergTable;
/** Presumed destination {@link IcebergTable} exists */
private final IcebergTable destIcebergTable;
protected final Properties properties;
protected final FileSystem sourceFs;
private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired

/** Target metastore URI */
public static final String ICEBERG_TARGET_CATALOG_URI_KEY =
IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri";
/** Target database name */
public static final String TARGET_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database";
/** Destination database name */
public static final String DESTINATION_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.destination.database";

public IcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, FileSystem sourceFs) {
public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs) {
this.dbName = db;
this.inputTableName = table;
this.icebergTable = icebergTbl;
this.srcIcebergTable = srcIcebergTable;
this.destIcebergTable = destIcebergTable;
this.properties = properties;
this.sourceFs = sourceFs;
}
Expand Down Expand Up @@ -154,6 +155,7 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
fileEntity.setDestinationData(getDestinationDataset(targetFs));
copyEntities.add(fileEntity);
}
copyEntities.add(createPostPublishStep(this.srcIcebergTable, this.destIcebergTable));
log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, copyEntities.size());
return copyEntities;
}
Expand All @@ -163,7 +165,7 @@ Collection<CopyEntity> generateCopyEntities(FileSystem targetFs, CopyConfigurati
* @return a map of path, file status for each file that needs to be copied
*/
protected Map<Path, FileStatus> getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException {
IcebergTable icebergTable = this.getIcebergTable();
IcebergTable icebergTable = this.getSrcIcebergTable();
/** @return whether `pathStr` is present on `targetFs`, caching results while tunneling checked exceptions outward */
Function<String, Boolean> isPresentOnTarget = CheckedExceptionFunction.wrapToTunneled(pathStr ->
// omit considering timestamp (or other markers of freshness), as files should be immutable
Expand Down Expand Up @@ -307,10 +309,15 @@ protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co
}

protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) {
return this.icebergTable.getDatasetDescriptor(sourceFs);
return this.srcIcebergTable.getDatasetDescriptor(sourceFs);
}

protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) {
return this.icebergTable.getDatasetDescriptor(targetFs);
return this.destIcebergTable.getDatasetDescriptor(targetFs);
}

private PostPublishStep createPostPublishStep(IcebergTable srcIcebergTable, IcebergTable dstIcebergTable) {
IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(srcIcebergTable, dstIcebergTable);
return new PostPublishStep(getFileSetId(), Maps.newHashMap(), icebergRegisterStep, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,34 +25,45 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.IterableDatasetFinder;
import org.apache.gobblin.util.HadoopUtils;


/**
* Finds {@link IcebergDataset}s. Will look for tables in a database using a {@link IcebergCatalog},
* and creates a {@link IcebergDataset} for each one.
*/
@Slf4j
@RequiredArgsConstructor
public class IcebergDatasetFinder implements IterableDatasetFinder<IcebergDataset> {

public static final String ICEBERG_DATASET_PREFIX = DatasetConstants.PLATFORM_ICEBERG + ".dataset";
public static final String ICEBERG_CLUSTER_KEY = "cluster";
public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name";
public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name";
public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.class";
public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog";
public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri";
public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".source.cluster.name";
public static final String ICEBERG_DEST_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".destination.catalog.class";
public static final String ICEBERG_DEST_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".copy.destination.catalog.uri";
public static final String ICEBERG_DEST_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".destination.cluster.name";
public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name";
public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name";

public enum CatalogLocation {
SOURCE,
DESTINATION
}

protected final FileSystem sourceFs;
private final Properties properties;
Expand All @@ -74,18 +85,13 @@ public List<IcebergDataset> findDatasets() throws IOException {
String dbName = properties.getProperty(ICEBERG_DB_NAME);
String tblName = properties.getProperty(ICEBERG_TABLE_NAME);

try {
IcebergCatalog icebergCatalog = createIcebergCatalog(this.properties);
/* Each Iceberg dataset maps to an Iceberg table
* TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table
*/
matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog, properties, sourceFs));
log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(),
matchingDatasets, dbName, tblName);
return matchingDatasets;
} catch (ReflectiveOperationException exception) {
throw new IOException(exception);
}
IcebergCatalog sourceIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.SOURCE);
IcebergCatalog destinationIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.DESTINATION);
/* Each Iceberg dataset maps to an Iceberg table */
matchingDatasets.add(createIcebergDataset(dbName, tblName, sourceIcebergCatalog, destinationIcebergCatalog, properties, sourceFs));
log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(),
matchingDatasets, dbName, tblName); // until future support added to specify multiple icebergs, count expected always to be one
return matchingDatasets;
}

@Override
Expand All @@ -98,20 +104,44 @@ public Iterator<IcebergDataset> getDatasetsIterator() throws IOException {
return findDatasets().iterator();
}

protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) {
IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName);
return new IcebergDataset(dbName, tblName, icebergTable, properties, fs);
/**
* Requires both source and destination catalogs to connect to their respective {@link IcebergTable}
* Note: the destination side {@link IcebergTable} should be present before initiating replication
* @return {@link IcebergDataset} with its corresponding source and destination {@link IcebergTable}
*/
protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, Properties properties, FileSystem fs) throws IOException {
IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName);
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName));
IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName);
// TODO: Rethink strategy to enforce dest iceberg table
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, tblName));
return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs);
}

protected IcebergCatalog createIcebergCatalog(Properties properties) throws IOException, ClassNotFoundException {
protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException {
Map<String, String> catalogProperties = new HashMap<>();
String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is required");
catalogProperties.put(CatalogProperties.URI, catalogUri);
// introducing an optional property for catalogs requiring cluster specific properties
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
Configuration configuration = HadoopUtils.getConfFromProperties(properties);
String icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
String catalogUri;
String icebergCatalogClassName;
switch (location) {
case SOURCE:
catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY);
// introducing an optional property for catalogs requiring cluster specific properties
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
break;
case DESTINATION:
catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY);
// introducing an optional property for catalogs requiring cluster specific properties
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
icebergCatalogClassName = properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
break;
default:
throw new UnsupportedOperationException("Incorrect desired location: %s provided for creating Iceberg Catalog" + location);
}
catalogProperties.put(CatalogProperties.URI, catalogUri);
return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.gobblin.data.management.copy.iceberg;

import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.CatalogProperties;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.hive.HiveCatalog;

import lombok.extern.slf4j.Slf4j;


Expand All @@ -47,11 +49,16 @@ public void initialize(Map<String, String> properties, Configuration configurati

@Override
public String getCatalogUri() {
return hc.getConf().get(CatalogProperties.URI, "<<not set>>");
return hc.getConf().get(HiveConf.ConfVars.METASTOREURIS.varname, "<<not set>>");
}

@Override
protected TableOperations createTableOperations(TableIdentifier tableId) {
return hc.newTableOps(tableId);
}

@Override
public boolean tableAlreadyExists(IcebergTable icebergTable) {
return hc.tableExists(icebergTable.getTableId());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.data.management.copy.iceberg;

import java.io.IOException;

import org.apache.iceberg.TableMetadata;

import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.commit.CommitStep;

/**
* {@link CommitStep} to perform Iceberg registration.
*/
@Slf4j
@AllArgsConstructor
public class IcebergRegisterStep implements CommitStep {

private final IcebergTable srcIcebergTable;
private final IcebergTable destIcebergTable;

@Override
public boolean isCompleted() throws IOException {
return false;
}

@Override
public void execute() throws IOException {
TableMetadata destinationMetadata = null;
try {
destinationMetadata = this.destIcebergTable.accessTableMetadata();
} catch (IcebergTable.TableNotFoundException tnfe) {
log.warn("Destination TableMetadata doesn't exist because: " , tnfe);
}
this.destIcebergTable.registerIcebergTable(this.srcIcebergTable.accessTableMetadata(), destinationMetadata);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,6 @@
import java.util.Set;
import java.util.stream.Collectors;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.hadoop.fs.FileSystem;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
Expand All @@ -44,6 +40,10 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.dataset.DatasetConstants;
import org.apache.gobblin.dataset.DatasetDescriptor;

Expand All @@ -67,7 +67,7 @@ public TableNotFoundException(TableIdentifier tableId) {
this.tableId = tableId;
}
}

@Getter
private final TableIdentifier tableId;
private final TableOperations tableOps;
private final String catalogUri;
Expand Down Expand Up @@ -194,4 +194,11 @@ protected DatasetDescriptor getDatasetDescriptor(FileSystem fs) {
descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString());
return descriptor;
}
/** Registers {@link IcebergTable} after publishing data.
* @param dstMetadata is null if destination {@link IcebergTable} is absent, in which case registration is skipped */
protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dstMetadata) {
if (dstMetadata != null) {
this.tableOps.commit(srcMetadata, dstMetadata);
}
}
}
Loading

0 comments on commit b428a66

Please sign in to comment.