diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java index f3ef3309a21..0ac4dcc0b8a 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java @@ -18,6 +18,7 @@ package org.apache.gobblin.data.management.copy.iceberg; import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.TableOperations; diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java index 15211c8b540..ac342e2e3a4 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java @@ -18,6 +18,7 @@ package org.apache.gobblin.data.management.copy.iceberg; import java.util.Map; + import org.apache.hadoop.conf.Configuration; @@ -28,4 +29,5 @@ public interface IcebergCatalog { IcebergTable openTable(String dbName, String tableName); String getCatalogUri(); void initialize(Map properties, Configuration configuration); + boolean tableAlreadyExists(IcebergTable icebergTable); } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java index 2119daef5ea..f4db7d4ff78 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java @@ -48,6 +48,7 @@ import org.apache.gobblin.data.management.copy.CopyableDataset; import org.apache.gobblin.data.management.copy.CopyableFile; import org.apache.gobblin.data.management.copy.OwnerAndPermission; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; import org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset; import org.apache.gobblin.data.management.partition.FileSet; import org.apache.gobblin.dataset.DatasetDescriptor; @@ -64,21 +65,21 @@ public class IcebergDataset implements PrioritizedCopyableDataset { private final String dbName; private final String inputTableName; - private final IcebergTable icebergTable; + private final IcebergTable srcIcebergTable; + /** Presumed destination {@link IcebergTable} exists */ + private final IcebergTable destIcebergTable; protected final Properties properties; protected final FileSystem sourceFs; private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired - /** Target metastore URI */ - public static final String ICEBERG_TARGET_CATALOG_URI_KEY = - IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri"; - /** Target database name */ - public static final String TARGET_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database"; + /** Destination database name */ + public static final String DESTINATION_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.destination.database"; - public IcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, FileSystem sourceFs) { + public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs) { this.dbName = db; this.inputTableName = table; - this.icebergTable = icebergTbl; + this.srcIcebergTable = srcIcebergTable; + this.destIcebergTable = destIcebergTable; this.properties = properties; this.sourceFs = sourceFs; } @@ -154,6 +155,7 @@ Collection generateCopyEntities(FileSystem targetFs, CopyConfigurati fileEntity.setDestinationData(getDestinationDataset(targetFs)); copyEntities.add(fileEntity); } + copyEntities.add(createPostPublishStep(this.srcIcebergTable, this.destIcebergTable)); log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, copyEntities.size()); return copyEntities; } @@ -163,7 +165,7 @@ Collection generateCopyEntities(FileSystem targetFs, CopyConfigurati * @return a map of path, file status for each file that needs to be copied */ protected Map getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { - IcebergTable icebergTable = this.getIcebergTable(); + IcebergTable icebergTable = this.getSrcIcebergTable(); /** @return whether `pathStr` is present on `targetFs`, caching results while tunneling checked exceptions outward */ Function isPresentOnTarget = CheckedExceptionFunction.wrapToTunneled(pathStr -> // omit considering timestamp (or other markers of freshness), as files should be immutable @@ -307,10 +309,15 @@ protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co } protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) { - return this.icebergTable.getDatasetDescriptor(sourceFs); + return this.srcIcebergTable.getDatasetDescriptor(sourceFs); } protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) { - return this.icebergTable.getDatasetDescriptor(targetFs); + return this.destIcebergTable.getDatasetDescriptor(targetFs); + } + + private PostPublishStep createPostPublishStep(IcebergTable srcIcebergTable, IcebergTable dstIcebergTable) { + IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(srcIcebergTable, dstIcebergTable); + return new PostPublishStep(getFileSetId(), Maps.newHashMap(), icebergRegisterStep, 0); } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index bc111dc2566..b20a1bc292c 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -25,18 +25,22 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.IterableDatasetFinder; import org.apache.gobblin.util.HadoopUtils; + /** * Finds {@link IcebergDataset}s. Will look for tables in a database using a {@link IcebergCatalog}, * and creates a {@link IcebergDataset} for each one. @@ -44,15 +48,22 @@ @Slf4j @RequiredArgsConstructor public class IcebergDatasetFinder implements IterableDatasetFinder { - public static final String ICEBERG_DATASET_PREFIX = DatasetConstants.PLATFORM_ICEBERG + ".dataset"; public static final String ICEBERG_CLUSTER_KEY = "cluster"; - public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name"; - public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name"; public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.class"; - public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri"; public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog"; + public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri"; public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".source.cluster.name"; + public static final String ICEBERG_DEST_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".destination.catalog.class"; + public static final String ICEBERG_DEST_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".copy.destination.catalog.uri"; + public static final String ICEBERG_DEST_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".destination.cluster.name"; + public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name"; + public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name"; + + public enum CatalogLocation { + SOURCE, + DESTINATION + } protected final FileSystem sourceFs; private final Properties properties; @@ -74,18 +85,13 @@ public List findDatasets() throws IOException { String dbName = properties.getProperty(ICEBERG_DB_NAME); String tblName = properties.getProperty(ICEBERG_TABLE_NAME); - try { - IcebergCatalog icebergCatalog = createIcebergCatalog(this.properties); - /* Each Iceberg dataset maps to an Iceberg table - * TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table - */ - matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog, properties, sourceFs)); - log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(), - matchingDatasets, dbName, tblName); - return matchingDatasets; - } catch (ReflectiveOperationException exception) { - throw new IOException(exception); - } + IcebergCatalog sourceIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.SOURCE); + IcebergCatalog destinationIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.DESTINATION); + /* Each Iceberg dataset maps to an Iceberg table */ + matchingDatasets.add(createIcebergDataset(dbName, tblName, sourceIcebergCatalog, destinationIcebergCatalog, properties, sourceFs)); + log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(), + matchingDatasets, dbName, tblName); // until future support added to specify multiple icebergs, count expected always to be one + return matchingDatasets; } @Override @@ -98,20 +104,44 @@ public Iterator getDatasetsIterator() throws IOException { return findDatasets().iterator(); } - protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) { - IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName); - return new IcebergDataset(dbName, tblName, icebergTable, properties, fs); + /** + * Requires both source and destination catalogs to connect to their respective {@link IcebergTable} + * Note: the destination side {@link IcebergTable} should be present before initiating replication + * @return {@link IcebergDataset} with its corresponding source and destination {@link IcebergTable} + */ + protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, Properties properties, FileSystem fs) throws IOException { + IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName); + Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName)); + IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName); + // TODO: Rethink strategy to enforce dest iceberg table + Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, tblName)); + return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs); } - protected IcebergCatalog createIcebergCatalog(Properties properties) throws IOException, ClassNotFoundException { + protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException { Map catalogProperties = new HashMap<>(); - String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY); - Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is required"); - catalogProperties.put(CatalogProperties.URI, catalogUri); - // introducing an optional property for catalogs requiring cluster specific properties - Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); Configuration configuration = HadoopUtils.getConfFromProperties(properties); - String icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); + String catalogUri; + String icebergCatalogClassName; + switch (location) { + case SOURCE: + catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY); + Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY); + // introducing an optional property for catalogs requiring cluster specific properties + Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); + icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); + break; + case DESTINATION: + catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY); + Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY); + // introducing an optional property for catalogs requiring cluster specific properties + Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); + icebergCatalogClassName = properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); + break; + default: + throw new UnsupportedOperationException("Incorrect desired location: %s provided for creating Iceberg Catalog" + location); + } + catalogProperties.put(CatalogProperties.URI, catalogUri); return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration); } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java index 5525750e64c..af541a79a56 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java @@ -18,11 +18,13 @@ package org.apache.gobblin.data.management.copy.iceberg; import java.util.Map; + import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogProperties; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hive.HiveCatalog; + import lombok.extern.slf4j.Slf4j; @@ -47,11 +49,16 @@ public void initialize(Map properties, Configuration configurati @Override public String getCatalogUri() { - return hc.getConf().get(CatalogProperties.URI, "<>"); + return hc.getConf().get(HiveConf.ConfVars.METASTOREURIS.varname, "<>"); } @Override protected TableOperations createTableOperations(TableIdentifier tableId) { return hc.newTableOps(tableId); } + + @Override + public boolean tableAlreadyExists(IcebergTable icebergTable) { + return hc.tableExists(icebergTable.getTableId()); + } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java new file mode 100644 index 00000000000..75f26787b09 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; + +import org.apache.iceberg.TableMetadata; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.commit.CommitStep; + +/** + * {@link CommitStep} to perform Iceberg registration. + */ +@Slf4j +@AllArgsConstructor +public class IcebergRegisterStep implements CommitStep { + + private final IcebergTable srcIcebergTable; + private final IcebergTable destIcebergTable; + + @Override + public boolean isCompleted() throws IOException { + return false; + } + + @Override + public void execute() throws IOException { + TableMetadata destinationMetadata = null; + try { + destinationMetadata = this.destIcebergTable.accessTableMetadata(); + } catch (IcebergTable.TableNotFoundException tnfe) { + log.warn("Destination TableMetadata doesn't exist because: " , tnfe); + } + this.destIcebergTable.registerIcebergTable(this.srcIcebergTable.accessTableMetadata(), destinationMetadata); + } +} diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index f65a81417f3..e8d0ee0ac28 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -26,10 +26,6 @@ import java.util.Set; import java.util.stream.Collectors; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - import org.apache.hadoop.fs.FileSystem; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; @@ -44,6 +40,10 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.DatasetDescriptor; @@ -67,7 +67,7 @@ public TableNotFoundException(TableIdentifier tableId) { this.tableId = tableId; } } - + @Getter private final TableIdentifier tableId; private final TableOperations tableOps; private final String catalogUri; @@ -194,4 +194,11 @@ protected DatasetDescriptor getDatasetDescriptor(FileSystem fs) { descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString()); return descriptor; } + /** Registers {@link IcebergTable} after publishing data. + * @param dstMetadata is null if destination {@link IcebergTable} is absent, in which case registration is skipped */ + protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dstMetadata) { + if (dstMetadata != null) { + this.tableOps.commit(srcMetadata, dstMetadata); + } + } } diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java index 9478207f8d9..c1872cb4ad8 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java @@ -188,12 +188,11 @@ public void testGetFilePathsDoesNotSwallowDestFileSystemException() throws IOExc MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI); FileSystem sourceFs = sourceFsBuilder.build(); - IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, null, new Properties(), sourceFs); MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destFsBuilder.build(); Mockito.doThrow(new IOException("Ha - not so fast!")).when(destFs).getFileStatus(new Path(SNAPSHOT_PATHS_0.manifestListPath)); - CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs); icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration); } @@ -227,9 +226,10 @@ public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { sourceBuilder.addPaths(expectedPaths); FileSystem sourceFs = sourceBuilder.build(); - IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); IcebergDataset icebergDataset = - new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -252,9 +252,10 @@ public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOExcept sourceBuilder.addPaths(expectedPaths); FileSystem sourceFs = sourceBuilder.build(); - IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); + IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); IcebergDataset icebergDataset = - new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -282,8 +283,9 @@ public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() throws IOExc sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); - IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); + IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -293,7 +295,6 @@ public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() throws IOExc // preserving attributes for owner, group and permissions respectively .preserve(PreserveAttributes.fromMnemonicString("ugp")) .copyContext(new CopyContext()).build(); - Collection copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyFsOwnershipAndPermissionPreservation(copyEntities, sourceBuilder.getPathsAndFileStatuses()); } @@ -310,8 +311,9 @@ public void testFsOwnershipAndPermissionWithoutPreservationWhenDestEmpty() throw sourceBuilder.addPaths(expectedPaths); FileSystem sourceFs = sourceBuilder.build(); - IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); + IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -321,7 +323,6 @@ public void testFsOwnershipAndPermissionWithoutPreservationWhenDestEmpty() throw // without preserving attributes for owner, group and permissions .preserve(PreserveAttributes.fromMnemonicString("")) .copyContext(new CopyContext()).build(); - Collection copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyFsOwnershipAndPermissionPreservation(copyEntities, expectedPathsAndFileStatuses); } @@ -348,7 +349,7 @@ protected IcebergTable validateGetFilePathsGivenDestState(List copyEntities, List List actual = new ArrayList<>(); for (CopyEntity copyEntity : copyEntities) { String json = copyEntity.toString(); - String filepath = CopyEntityDeserializer.getFilePathAsStringFromJson(json); - actual.add(filepath); + if (isCopyableFile(json)) { + String filepath = CopyEntityDeserializer.getFilePathAsStringFromJson(json); + actual.add(filepath); + } else{ + verifyPostPublishStep(json); + } } Assert.assertEquals(actual.size(), expected.size(), "Set" + actual.toString() + " vs Set" + expected.toString()); Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray()); } + private static boolean isCopyableFile(String json) { + String objectType = new Gson().fromJson(json, JsonObject.class) + .getAsJsonPrimitive("object-type") + .getAsString(); + return objectType.equals("org.apache.gobblin.data.management.copy.CopyableFile"); + } + private static void verifyFsOwnershipAndPermissionPreservation(Collection copyEntities, Map expectedPathsAndFileStatuses) { for (CopyEntity copyEntity : copyEntities) { String copyEntityJson = copyEntity.toString(); - List ancestorFileOwnerAndPermissionsList = CopyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson); - CopyEntityDeserializer.FileOwnerAndPermissions destinationFileOwnerAndPermissions = CopyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson); - Path filePath = new Path(CopyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson)); - FileStatus fileStatus = expectedPathsAndFileStatuses.get(filePath); - verifyFileStatus(destinationFileOwnerAndPermissions, fileStatus); - // providing path's parent to verify ancestor owner and permissions - verifyAncestorPermissions(ancestorFileOwnerAndPermissionsList, filePath.getParent(), expectedPathsAndFileStatuses); + if (isCopyableFile(copyEntityJson)) { + List ancestorFileOwnerAndPermissionsList = + CopyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson); + CopyEntityDeserializer.FileOwnerAndPermissions destinationFileOwnerAndPermissions = CopyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson); + Path filePath = new Path(CopyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson)); + FileStatus fileStatus = expectedPathsAndFileStatuses.get(filePath); + verifyFileStatus(destinationFileOwnerAndPermissions, fileStatus); + // providing path's parent to verify ancestor owner and permissions + verifyAncestorPermissions(ancestorFileOwnerAndPermissionsList, filePath.getParent(), + expectedPathsAndFileStatuses); + } else { + verifyPostPublishStep(copyEntityJson); + } } } @@ -419,14 +437,21 @@ private static void verifyAncestorPermissions(List