From 5ce0ad44aa0b6552fbe3a539288858714b27f791 Mon Sep 17 00:00:00 2001 From: Meeth Gala Date: Wed, 15 Mar 2023 16:59:06 -0700 Subject: [PATCH 1/9] initial commit for iceberg table registration --- .../copy/iceberg/IcebergDataset.java | 24 +++--- .../copy/iceberg/IcebergDatasetFinder.java | 72 ++++++++++++------ .../copy/iceberg/IcebergRegisterStep.java | 55 ++++++++++++++ .../management/copy/iceberg/IcebergTable.java | 13 +++- .../copy/iceberg/IcebergDatasetTest.java | 76 ++++++++++++++----- gradle/scripts/dependencyDefinitions.gradle | 2 +- 6 files changed, 182 insertions(+), 60 deletions(-) create mode 100644 gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java index 2119daef5ea..2626e2a6c13 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java @@ -48,6 +48,7 @@ import org.apache.gobblin.data.management.copy.CopyableDataset; import org.apache.gobblin.data.management.copy.CopyableFile; import org.apache.gobblin.data.management.copy.OwnerAndPermission; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; import org.apache.gobblin.data.management.copy.prioritization.PrioritizedCopyableDataset; import org.apache.gobblin.data.management.partition.FileSet; import org.apache.gobblin.dataset.DatasetDescriptor; @@ -64,21 +65,20 @@ public class IcebergDataset implements PrioritizedCopyableDataset { private final String dbName; private final String inputTableName; - private final IcebergTable icebergTable; + private final IcebergTable srcIcebergTable; + private final IcebergTable existingTargetIcebergTable; protected final Properties properties; protected final FileSystem sourceFs; private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired - /** Target metastore URI */ - public static final String ICEBERG_TARGET_CATALOG_URI_KEY = - IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri"; /** Target database name */ public static final String TARGET_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database"; - public IcebergDataset(String db, String table, IcebergTable icebergTbl, Properties properties, FileSystem sourceFs) { + public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, IcebergTable existingTargetIcebergTable, Properties properties, FileSystem sourceFs) { this.dbName = db; this.inputTableName = table; - this.icebergTable = icebergTbl; + this.srcIcebergTable = srcIcebergTable; + this.existingTargetIcebergTable = existingTargetIcebergTable; this.properties = properties; this.sourceFs = sourceFs; } @@ -154,6 +154,7 @@ Collection generateCopyEntities(FileSystem targetFs, CopyConfigurati fileEntity.setDestinationData(getDestinationDataset(targetFs)); copyEntities.add(fileEntity); } + addPostPublishStep(copyEntities); log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, copyEntities.size()); return copyEntities; } @@ -163,7 +164,7 @@ Collection generateCopyEntities(FileSystem targetFs, CopyConfigurati * @return a map of path, file status for each file that needs to be copied */ protected Map getFilePathsToFileStatus(FileSystem targetFs, CopyConfiguration copyConfig) throws IOException { - IcebergTable icebergTable = this.getIcebergTable(); + IcebergTable icebergTable = this.getSrcIcebergTable(); /** @return whether `pathStr` is present on `targetFs`, caching results while tunneling checked exceptions outward */ Function isPresentOnTarget = CheckedExceptionFunction.wrapToTunneled(pathStr -> // omit considering timestamp (or other markers of freshness), as files should be immutable @@ -307,10 +308,15 @@ protected FileSystem getSourceFileSystemFromFileStatus(FileStatus fileStatus, Co } protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) { - return this.icebergTable.getDatasetDescriptor(sourceFs); + return this.srcIcebergTable.getDatasetDescriptor(sourceFs); } protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) { - return this.icebergTable.getDatasetDescriptor(targetFs); + return this.srcIcebergTable.getDatasetDescriptor(targetFs); + } + + private void addPostPublishStep(List copyEntities) { + IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(this.getSrcIcebergTable(), this.getExistingTargetIcebergTable()); + copyEntities.add(new PostPublishStep(getFileSetId(), Maps.newHashMap(), icebergRegisterStep, 0)); } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index bc111dc2566..9d34e7e668f 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -25,18 +25,22 @@ import java.util.Map; import java.util.Optional; import java.util.Properties; + import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.IterableDatasetFinder; import org.apache.gobblin.util.HadoopUtils; + /** * Finds {@link IcebergDataset}s. Will look for tables in a database using a {@link IcebergCatalog}, * and creates a {@link IcebergDataset} for each one. @@ -44,15 +48,21 @@ @Slf4j @RequiredArgsConstructor public class IcebergDatasetFinder implements IterableDatasetFinder { - public static final String ICEBERG_DATASET_PREFIX = DatasetConstants.PLATFORM_ICEBERG + ".dataset"; public static final String ICEBERG_CLUSTER_KEY = "cluster"; - public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name"; - public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name"; public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.class"; public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri"; public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog"; public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".source.cluster.name"; + public static final String ICEBERG_TARGET_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri"; + public static final String ICEBERG_TARGET_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".target.cluster.name"; + public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name"; + public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name"; + + public enum CatalogLocation { + SOURCE, + TARGET + } protected final FileSystem sourceFs; private final Properties properties; @@ -74,18 +84,15 @@ public List findDatasets() throws IOException { String dbName = properties.getProperty(ICEBERG_DB_NAME); String tblName = properties.getProperty(ICEBERG_TABLE_NAME); - try { - IcebergCatalog icebergCatalog = createIcebergCatalog(this.properties); - /* Each Iceberg dataset maps to an Iceberg table - * TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table - */ - matchingDatasets.add(createIcebergDataset(dbName, tblName, icebergCatalog, properties, sourceFs)); - log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(), - matchingDatasets, dbName, tblName); - return matchingDatasets; - } catch (ReflectiveOperationException exception) { - throw new IOException(exception); - } + IcebergCatalog sourceIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.SOURCE); + IcebergCatalog targetIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.TARGET); + /* Each Iceberg dataset maps to an Iceberg table + * TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table + */ + matchingDatasets.add(createIcebergDataset(dbName, tblName, sourceIcebergCatalog, targetIcebergCatalog, properties, sourceFs)); + log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(), + matchingDatasets, dbName, tblName); + return matchingDatasets; } @Override @@ -98,20 +105,35 @@ public Iterator getDatasetsIterator() throws IOException { return findDatasets().iterator(); } - protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog icebergCatalog, Properties properties, FileSystem fs) { - IcebergTable icebergTable = icebergCatalog.openTable(dbName, tblName); - return new IcebergDataset(dbName, tblName, icebergTable, properties, fs); + protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog targetIcebergCatalog, Properties properties, FileSystem fs) { + IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName); + IcebergTable existingTargetIcebergTable = targetIcebergCatalog.openTable(dbName, tblName); + return new IcebergDataset(dbName, tblName, srcIcebergTable, existingTargetIcebergTable, properties, fs); } - protected IcebergCatalog createIcebergCatalog(Properties properties) throws IOException, ClassNotFoundException { + protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException { Map catalogProperties = new HashMap<>(); - String catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY); - Preconditions.checkNotNull(catalogUri, "Catalog Table Service URI is required"); - catalogProperties.put(CatalogProperties.URI, catalogUri); - // introducing an optional property for catalogs requiring cluster specific properties - Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); Configuration configuration = HadoopUtils.getConfFromProperties(properties); - String icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); + String catalogUri; + String icebergCatalogClassName; + switch (location) { + case SOURCE: + catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY); + Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service URI is required"); + // introducing an optional property for catalogs requiring cluster specific properties + Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); + break; + case TARGET: + catalogUri = properties.getProperty(ICEBERG_TARGET_CATALOG_URI_KEY); + Preconditions.checkNotNull(catalogUri, "Target Catalog Table Service URI is required"); + // introducing an optional property for catalogs requiring cluster specific properties + Optional.ofNullable(properties.getProperty(ICEBERG_TARGET_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); + break; + default: + throw new UnsupportedOperationException("Incorrect desired location: %s provided for creating Iceberg Catalog" + location); + } + icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); + catalogProperties.put(CatalogProperties.URI, catalogUri); return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration); } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java new file mode 100644 index 00000000000..27afcff25a0 --- /dev/null +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.data.management.copy.iceberg; + +import java.io.IOException; + +import org.apache.iceberg.TableMetadata; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.commit.CommitStep; + +/** + * {@link CommitStep} to perform Iceberg registration. + */ + +@Slf4j +@AllArgsConstructor +public class IcebergRegisterStep implements CommitStep { + + private final IcebergTable srcIcebergTable; + private final IcebergTable existingTargetIcebergTable; + + @Override + public boolean isCompleted() throws IOException { + return false; + } + + @Override + public void execute() throws IOException { + TableMetadata targetMetadata = null; + try { + targetMetadata = this.existingTargetIcebergTable.accessTableMetadata(); + } catch (IcebergTable.TableNotFoundException tnfe) { + log.warn("Target TableMetadata doesn't exist because : {}" , tnfe); + } + this.srcIcebergTable.registerIcebergTable(this.srcIcebergTable.accessTableMetadata(), targetMetadata); + } +} diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index f65a81417f3..7561b1847aa 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -26,10 +26,6 @@ import java.util.Set; import java.util.stream.Collectors; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - import org.apache.hadoop.fs.FileSystem; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; @@ -44,6 +40,10 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.dataset.DatasetConstants; import org.apache.gobblin.dataset.DatasetDescriptor; @@ -194,4 +194,9 @@ protected DatasetDescriptor getDatasetDescriptor(FileSystem fs) { descriptor.addMetadata(DatasetConstants.FS_URI, fs.getUri().toString()); return descriptor; } + /** Registers {@link IcebergTable} after publishing data. + * @param dstMetadata is null if target {@link IcebergTable} is absent */ + protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dstMetadata) { + this.tableOps.commit(srcMetadata, dstMetadata); + } } diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java index 9478207f8d9..b2d8eac4d51 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java @@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.mockito.MockedConstruction; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.BeforeClass; @@ -61,8 +62,10 @@ import org.apache.gobblin.data.management.copy.CopyContext; import org.apache.gobblin.data.management.copy.CopyEntity; import org.apache.gobblin.data.management.copy.PreserveAttributes; +import org.apache.gobblin.data.management.copy.entities.PostPublishStep; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mockConstruction; /** Tests for {@link org.apache.gobblin.data.management.copy.iceberg.IcebergDataset} */ @@ -98,6 +101,10 @@ public class IcebergDatasetTest { new MockIcebergTable.SnapshotPaths(Optional.empty(), MANIFEST_LIST_PATH_1, Arrays.asList( new IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_PATH_1, Arrays.asList(MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B)))); + private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_2 = + new MockIcebergTable.SnapshotPaths(Optional.empty(), Strings.EMPTY, Arrays.asList( + new IcebergSnapshotInfo.ManifestFileInfo(Strings.EMPTY, + Arrays.asList(Strings.EMPTY)))); private final String testDbName = "test_db_name"; private final String testTblName = "test_tbl_name"; @@ -188,12 +195,11 @@ public void testGetFilePathsDoesNotSwallowDestFileSystemException() throws IOExc MockFileSystemBuilder sourceFsBuilder = new MockFileSystemBuilder(SRC_FS_URI); FileSystem sourceFs = sourceFsBuilder.build(); - IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergDataset icebergDataset = new IcebergDataset(testDbName, testTblName, icebergTable, null, new Properties(), sourceFs); MockFileSystemBuilder destFsBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destFsBuilder.build(); Mockito.doThrow(new IOException("Ha - not so fast!")).when(destFs).getFileStatus(new Path(SNAPSHOT_PATHS_0.manifestListPath)); - CopyConfiguration copyConfiguration = createEmptyCopyConfiguration(destFs); icebergDataset.getFilePathsToFileStatus(destFs, copyConfiguration); } @@ -228,8 +234,9 @@ public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergTable targetTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_2)); IcebergDataset icebergDataset = - new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + new TrickIcebergDataset(testDbName, testTblName, icebergTable, targetTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -237,6 +244,9 @@ public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) .copyContext(new CopyContext()).build(); + try (MockedConstruction mockedPostPublishStep = mockConstruction(PostPublishStep.class)) { + PostPublishStep step = new PostPublishStep(icebergDataset.getFileSetId(), Maps.newHashMap(), new IcebergRegisterStep(icebergTable, targetTable), 0); + } Collection copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } @@ -253,8 +263,9 @@ public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOExcept FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); + IcebergTable targetTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_2)); IcebergDataset icebergDataset = - new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + new TrickIcebergDataset(testDbName, testTblName, icebergTable, targetTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -262,6 +273,9 @@ public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOExcept CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) .copyContext(new CopyContext()).build(); + try (MockedConstruction mockedPostPublishStep = mockConstruction(PostPublishStep.class)) { + PostPublishStep step = new PostPublishStep(icebergDataset.getFileSetId(), Maps.newHashMap(), new IcebergRegisterStep(icebergTable, targetTable), 0); + } Collection copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } @@ -283,7 +297,8 @@ public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() throws IOExc FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergTable targetTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_2)); + IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, targetTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -293,7 +308,9 @@ public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() throws IOExc // preserving attributes for owner, group and permissions respectively .preserve(PreserveAttributes.fromMnemonicString("ugp")) .copyContext(new CopyContext()).build(); - + try (MockedConstruction mockedPostPublishStep = mockConstruction(PostPublishStep.class)) { + PostPublishStep step = new PostPublishStep(icebergDataset.getFileSetId(), Maps.newHashMap(), new IcebergRegisterStep(icebergTable, targetTable), 0); + } Collection copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyFsOwnershipAndPermissionPreservation(copyEntities, sourceBuilder.getPathsAndFileStatuses()); } @@ -311,7 +328,8 @@ public void testFsOwnershipAndPermissionWithoutPreservationWhenDestEmpty() throw FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, new Properties(), sourceFs); + IcebergTable targetTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_2)); + IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, targetTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -321,7 +339,9 @@ public void testFsOwnershipAndPermissionWithoutPreservationWhenDestEmpty() throw // without preserving attributes for owner, group and permissions .preserve(PreserveAttributes.fromMnemonicString("")) .copyContext(new CopyContext()).build(); - + try (MockedConstruction mockedPostPublishStep = mockConstruction(PostPublishStep.class)) { + PostPublishStep step = new PostPublishStep(icebergDataset.getFileSetId(), Maps.newHashMap(), new IcebergRegisterStep(icebergTable, targetTable), 0); + } Collection copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyFsOwnershipAndPermissionPreservation(copyEntities, expectedPathsAndFileStatuses); } @@ -348,7 +368,7 @@ protected IcebergTable validateGetFilePathsGivenDestState(List copyEntities, List List actual = new ArrayList<>(); for (CopyEntity copyEntity : copyEntities) { String json = copyEntity.toString(); - String filepath = CopyEntityDeserializer.getFilePathAsStringFromJson(json); - actual.add(filepath); + if (isCopyableFile(json)) { + String filepath = CopyEntityDeserializer.getFilePathAsStringFromJson(json); + actual.add(filepath); + } } Assert.assertEquals(actual.size(), expected.size(), "Set" + actual.toString() + " vs Set" + expected.toString()); Assert.assertEqualsNoOrder(actual.toArray(), expected.toArray()); } + private static boolean isCopyableFile(String json) { + String objectType = new Gson().fromJson(json, JsonObject.class) + .getAsJsonPrimitive("object-type") + .getAsString(); + return objectType.equals("org.apache.gobblin.data.management.copy.CopyableFile"); + } + private static void verifyFsOwnershipAndPermissionPreservation(Collection copyEntities, Map expectedPathsAndFileStatuses) { for (CopyEntity copyEntity : copyEntities) { String copyEntityJson = copyEntity.toString(); - List ancestorFileOwnerAndPermissionsList = CopyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson); - CopyEntityDeserializer.FileOwnerAndPermissions destinationFileOwnerAndPermissions = CopyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson); - Path filePath = new Path(CopyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson)); - FileStatus fileStatus = expectedPathsAndFileStatuses.get(filePath); - verifyFileStatus(destinationFileOwnerAndPermissions, fileStatus); - // providing path's parent to verify ancestor owner and permissions - verifyAncestorPermissions(ancestorFileOwnerAndPermissionsList, filePath.getParent(), expectedPathsAndFileStatuses); + if (isCopyableFile(copyEntityJson)) { + List ancestorFileOwnerAndPermissionsList = + CopyEntityDeserializer.getAncestorOwnerAndPermissions(copyEntityJson); + CopyEntityDeserializer.FileOwnerAndPermissions destinationFileOwnerAndPermissions = CopyEntityDeserializer.getDestinationOwnerAndPermissions(copyEntityJson); + Path filePath = new Path(CopyEntityDeserializer.getFilePathAsStringFromJson(copyEntityJson)); + FileStatus fileStatus = expectedPathsAndFileStatuses.get(filePath); + verifyFileStatus(destinationFileOwnerAndPermissions, fileStatus); + // providing path's parent to verify ancestor owner and permissions + verifyAncestorPermissions(ancestorFileOwnerAndPermissionsList, filePath.getParent(), + expectedPathsAndFileStatuses); + } } } @@ -424,9 +457,9 @@ private static void verifyAncestorPermissions(List Date: Tue, 21 Mar 2023 16:52:10 -0700 Subject: [PATCH 2/9] address PR comments --- .../copy/iceberg/BaseIcebergCatalog.java | 6 ++- .../copy/iceberg/IcebergCatalog.java | 4 +- .../copy/iceberg/IcebergDataset.java | 17 +++++---- .../copy/iceberg/IcebergDatasetFinder.java | 35 +++++++++-------- .../copy/iceberg/IcebergHiveCatalog.java | 8 +++- .../copy/iceberg/IcebergRegisterStep.java | 10 ++--- .../copy/iceberg/IcebergDatasetTest.java | 38 ++++++++----------- 7 files changed, 63 insertions(+), 55 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java index f3ef3309a21..565d1292aaf 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java @@ -17,7 +17,9 @@ package org.apache.gobblin.data.management.copy.iceberg; +import java.io.IOException; import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.TableOperations; @@ -39,7 +41,7 @@ protected BaseIcebergCatalog(String catalogName, Class compan } @Override - public IcebergTable openTable(String dbName, String tableName) { + public IcebergTable openTable(String dbName, String tableName) throws IOException { TableIdentifier tableId = TableIdentifier.of(dbName, tableName); return new IcebergTable(tableId, createTableOperations(tableId), this.getCatalogUri()); } @@ -48,5 +50,5 @@ protected Catalog createCompanionCatalog(Map properties, Configu return CatalogUtil.loadCatalog(this.companionCatalogClass.getName(), this.catalogName, properties, configuration); } - protected abstract TableOperations createTableOperations(TableIdentifier tableId); + protected abstract TableOperations createTableOperations(TableIdentifier tableId) throws IOException; } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java index 15211c8b540..a412964f4ae 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java @@ -17,7 +17,9 @@ package org.apache.gobblin.data.management.copy.iceberg; +import java.io.IOException; import java.util.Map; + import org.apache.hadoop.conf.Configuration; @@ -25,7 +27,7 @@ * Any catalog from which to access {@link IcebergTable}s. */ public interface IcebergCatalog { - IcebergTable openTable(String dbName, String tableName); + IcebergTable openTable(String dbName, String tableName) throws IOException; String getCatalogUri(); void initialize(Map properties, Configuration configuration); } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java index 2626e2a6c13..828457af112 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java @@ -66,7 +66,7 @@ public class IcebergDataset implements PrioritizedCopyableDataset { private final String dbName; private final String inputTableName; private final IcebergTable srcIcebergTable; - private final IcebergTable existingTargetIcebergTable; + private final IcebergTable existingDestinationIcebergTable; protected final Properties properties; protected final FileSystem sourceFs; private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired @@ -74,11 +74,11 @@ public class IcebergDataset implements PrioritizedCopyableDataset { /** Target database name */ public static final String TARGET_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database"; - public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, IcebergTable existingTargetIcebergTable, Properties properties, FileSystem sourceFs) { + public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, IcebergTable existingDestinationIcebergTable, Properties properties, FileSystem sourceFs) { this.dbName = db; this.inputTableName = table; this.srcIcebergTable = srcIcebergTable; - this.existingTargetIcebergTable = existingTargetIcebergTable; + this.existingDestinationIcebergTable = existingDestinationIcebergTable; this.properties = properties; this.sourceFs = sourceFs; } @@ -153,8 +153,9 @@ Collection generateCopyEntities(FileSystem targetFs, CopyConfigurati fileEntity.setSourceData(getSourceDataset(this.sourceFs)); fileEntity.setDestinationData(getDestinationDataset(targetFs)); copyEntities.add(fileEntity); + } - addPostPublishStep(copyEntities); + copyEntities.add(addPostPublishStep(this.srcIcebergTable, this.existingDestinationIcebergTable)); log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, copyEntities.size()); return copyEntities; } @@ -312,11 +313,11 @@ protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) { } protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) { - return this.srcIcebergTable.getDatasetDescriptor(targetFs); + return this.existingDestinationIcebergTable.getDatasetDescriptor(targetFs); } - private void addPostPublishStep(List copyEntities) { - IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(this.getSrcIcebergTable(), this.getExistingTargetIcebergTable()); - copyEntities.add(new PostPublishStep(getFileSetId(), Maps.newHashMap(), icebergRegisterStep, 0)); + private PostPublishStep addPostPublishStep(IcebergTable srcIcebergTable, IcebergTable dstIcebergTable) { + IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(srcIcebergTable, dstIcebergTable); + return new PostPublishStep(getFileSetId(), Maps.newHashMap(), icebergRegisterStep, 0); } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index 9d34e7e668f..b39563a83fc 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -50,18 +50,18 @@ public class IcebergDatasetFinder implements IterableDatasetFinder { public static final String ICEBERG_DATASET_PREFIX = DatasetConstants.PLATFORM_ICEBERG + ".dataset"; public static final String ICEBERG_CLUSTER_KEY = "cluster"; - public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.class"; - public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri"; + public static final String ICEBERG_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.class"; public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog"; + public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri"; public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".source.cluster.name"; - public static final String ICEBERG_TARGET_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri"; - public static final String ICEBERG_TARGET_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".target.cluster.name"; + public static final String ICEBERG_DST_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri"; + public static final String ICEBERG_DST_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".target.cluster.name"; public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name"; public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name"; public enum CatalogLocation { SOURCE, - TARGET + DESTINATION } protected final FileSystem sourceFs; @@ -85,11 +85,11 @@ public List findDatasets() throws IOException { String tblName = properties.getProperty(ICEBERG_TABLE_NAME); IcebergCatalog sourceIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.SOURCE); - IcebergCatalog targetIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.TARGET); + IcebergCatalog destinationIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.DESTINATION); /* Each Iceberg dataset maps to an Iceberg table * TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table */ - matchingDatasets.add(createIcebergDataset(dbName, tblName, sourceIcebergCatalog, targetIcebergCatalog, properties, sourceFs)); + matchingDatasets.add(createIcebergDataset(dbName, tblName, sourceIcebergCatalog, destinationIcebergCatalog, properties, sourceFs)); log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(), matchingDatasets, dbName, tblName); return matchingDatasets; @@ -105,10 +105,15 @@ public Iterator getDatasetsIterator() throws IOException { return findDatasets().iterator(); } - protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog targetIcebergCatalog, Properties properties, FileSystem fs) { + /** + * Requires both source and destination catalogs to connect to their respective {@link IcebergTable} + * Note: the destination side {@link IcebergTable} should be present before initiating replication + * @return {@link IcebergDataset} with its corresponding source and destination {@link IcebergTable} + */ + protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, Properties properties, FileSystem fs) throws IOException { IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName); - IcebergTable existingTargetIcebergTable = targetIcebergCatalog.openTable(dbName, tblName); - return new IcebergDataset(dbName, tblName, srcIcebergTable, existingTargetIcebergTable, properties, fs); + IcebergTable existingDestinationIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName); + return new IcebergDataset(dbName, tblName, srcIcebergTable, existingDestinationIcebergTable, properties, fs); } protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException { @@ -123,16 +128,16 @@ protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLoca // introducing an optional property for catalogs requiring cluster specific properties Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); break; - case TARGET: - catalogUri = properties.getProperty(ICEBERG_TARGET_CATALOG_URI_KEY); - Preconditions.checkNotNull(catalogUri, "Target Catalog Table Service URI is required"); + case DESTINATION: + catalogUri = properties.getProperty(ICEBERG_DST_CATALOG_URI_KEY); + Preconditions.checkNotNull(catalogUri, "Destination Catalog Table Service URI is required"); // introducing an optional property for catalogs requiring cluster specific properties - Optional.ofNullable(properties.getProperty(ICEBERG_TARGET_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); + Optional.ofNullable(properties.getProperty(ICEBERG_DST_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); break; default: throw new UnsupportedOperationException("Incorrect desired location: %s provided for creating Iceberg Catalog" + location); } - icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); + icebergCatalogClassName = properties.getProperty(ICEBERG_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); catalogProperties.put(CatalogProperties.URI, catalogUri); return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration); } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java index 5525750e64c..8c8bcb0a437 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java @@ -17,12 +17,15 @@ package org.apache.gobblin.data.management.copy.iceberg; +import java.io.IOException; import java.util.Map; + import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hive.HiveCatalog; + import lombok.extern.slf4j.Slf4j; @@ -51,7 +54,10 @@ public String getCatalogUri() { } @Override - protected TableOperations createTableOperations(TableIdentifier tableId) { + protected TableOperations createTableOperations(TableIdentifier tableId) throws IOException { + if (!hc.tableExists(tableId)) { + throw new IcebergTable.TableNotFoundException(tableId); + } return hc.newTableOps(tableId); } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java index 27afcff25a0..b494cfda4a2 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java @@ -35,7 +35,7 @@ public class IcebergRegisterStep implements CommitStep { private final IcebergTable srcIcebergTable; - private final IcebergTable existingTargetIcebergTable; + private final IcebergTable existingDestinationIcebergTable; @Override public boolean isCompleted() throws IOException { @@ -44,12 +44,12 @@ public boolean isCompleted() throws IOException { @Override public void execute() throws IOException { - TableMetadata targetMetadata = null; + TableMetadata destinationMetadata = null; try { - targetMetadata = this.existingTargetIcebergTable.accessTableMetadata(); + destinationMetadata = this.existingDestinationIcebergTable.accessTableMetadata(); } catch (IcebergTable.TableNotFoundException tnfe) { - log.warn("Target TableMetadata doesn't exist because : {}" , tnfe); + log.warn("Destination TableMetadata doesn't exist because : {}" , tnfe); } - this.srcIcebergTable.registerIcebergTable(this.srcIcebergTable.accessTableMetadata(), targetMetadata); + this.existingDestinationIcebergTable.registerIcebergTable(this.srcIcebergTable.accessTableMetadata(), destinationMetadata); } } diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java index b2d8eac4d51..7485af19b6c 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java @@ -41,7 +41,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; -import org.mockito.MockedConstruction; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.BeforeClass; @@ -62,10 +61,8 @@ import org.apache.gobblin.data.management.copy.CopyContext; import org.apache.gobblin.data.management.copy.CopyEntity; import org.apache.gobblin.data.management.copy.PreserveAttributes; -import org.apache.gobblin.data.management.copy.entities.PostPublishStep; import static org.mockito.Mockito.any; -import static org.mockito.Mockito.mockConstruction; /** Tests for {@link org.apache.gobblin.data.management.copy.iceberg.IcebergDataset} */ @@ -101,10 +98,6 @@ public class IcebergDatasetTest { new MockIcebergTable.SnapshotPaths(Optional.empty(), MANIFEST_LIST_PATH_1, Arrays.asList( new IcebergSnapshotInfo.ManifestFileInfo(MANIFEST_PATH_1, Arrays.asList(MANIFEST_DATA_PATH_1A, MANIFEST_DATA_PATH_1B)))); - private static final MockIcebergTable.SnapshotPaths SNAPSHOT_PATHS_2 = - new MockIcebergTable.SnapshotPaths(Optional.empty(), Strings.EMPTY, Arrays.asList( - new IcebergSnapshotInfo.ManifestFileInfo(Strings.EMPTY, - Arrays.asList(Strings.EMPTY)))); private final String testDbName = "test_db_name"; private final String testTblName = "test_tbl_name"; @@ -234,7 +227,7 @@ public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergTable targetTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_2)); + IcebergTable targetTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, targetTable, new Properties(), sourceFs); @@ -244,9 +237,6 @@ public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) .copyContext(new CopyContext()).build(); - try (MockedConstruction mockedPostPublishStep = mockConstruction(PostPublishStep.class)) { - PostPublishStep step = new PostPublishStep(icebergDataset.getFileSetId(), Maps.newHashMap(), new IcebergRegisterStep(icebergTable, targetTable), 0); - } Collection copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } @@ -263,7 +253,7 @@ public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOExcept FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); - IcebergTable targetTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_2)); + IcebergTable targetTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, targetTable, new Properties(), sourceFs); @@ -273,9 +263,6 @@ public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOExcept CopyConfiguration copyConfiguration = CopyConfiguration.builder(destFs, copyConfigProperties).preserve(PreserveAttributes.fromMnemonicString("")) .copyContext(new CopyContext()).build(); - try (MockedConstruction mockedPostPublishStep = mockConstruction(PostPublishStep.class)) { - PostPublishStep step = new PostPublishStep(icebergDataset.getFileSetId(), Maps.newHashMap(), new IcebergRegisterStep(icebergTable, targetTable), 0); - } Collection copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyCopyEntities(copyEntities, expectedPaths); } @@ -297,7 +284,7 @@ public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() throws IOExc FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergTable targetTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_2)); + IcebergTable targetTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, targetTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); @@ -308,9 +295,6 @@ public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() throws IOExc // preserving attributes for owner, group and permissions respectively .preserve(PreserveAttributes.fromMnemonicString("ugp")) .copyContext(new CopyContext()).build(); - try (MockedConstruction mockedPostPublishStep = mockConstruction(PostPublishStep.class)) { - PostPublishStep step = new PostPublishStep(icebergDataset.getFileSetId(), Maps.newHashMap(), new IcebergRegisterStep(icebergTable, targetTable), 0); - } Collection copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyFsOwnershipAndPermissionPreservation(copyEntities, sourceBuilder.getPathsAndFileStatuses()); } @@ -328,7 +312,7 @@ public void testFsOwnershipAndPermissionWithoutPreservationWhenDestEmpty() throw FileSystem sourceFs = sourceBuilder.build(); IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergTable targetTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_2)); + IcebergTable targetTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, targetTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); @@ -339,9 +323,6 @@ public void testFsOwnershipAndPermissionWithoutPreservationWhenDestEmpty() throw // without preserving attributes for owner, group and permissions .preserve(PreserveAttributes.fromMnemonicString("")) .copyContext(new CopyContext()).build(); - try (MockedConstruction mockedPostPublishStep = mockConstruction(PostPublishStep.class)) { - PostPublishStep step = new PostPublishStep(icebergDataset.getFileSetId(), Maps.newHashMap(), new IcebergRegisterStep(icebergTable, targetTable), 0); - } Collection copyEntities = icebergDataset.generateCopyEntities(destFs, copyConfiguration); verifyFsOwnershipAndPermissionPreservation(copyEntities, expectedPathsAndFileStatuses); } @@ -405,6 +386,8 @@ private static void verifyCopyEntities(Collection copyEntities, List if (isCopyableFile(json)) { String filepath = CopyEntityDeserializer.getFilePathAsStringFromJson(json); actual.add(filepath); + } else{ + verifyPostPublishStep(json); } } Assert.assertEquals(actual.size(), expected.size(), "Set" + actual.toString() + " vs Set" + expected.toString()); @@ -431,6 +414,8 @@ private static void verifyFsOwnershipAndPermissionPreservation(Collection Date: Thu, 23 Mar 2023 11:15:48 -0700 Subject: [PATCH 3/9] replace target with dest for consistency and address PR comments --- .../copy/iceberg/IcebergCatalog.java | 2 ++ .../copy/iceberg/IcebergDataset.java | 13 +++++---- .../copy/iceberg/IcebergDatasetFinder.java | 18 ++++++------ .../copy/iceberg/IcebergHiveCatalog.java | 11 ++++---- .../copy/iceberg/IcebergRegisterStep.java | 8 +++--- .../copy/iceberg/IcebergDatasetTest.java | 28 +++++++++---------- 6 files changed, 43 insertions(+), 37 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java index a412964f4ae..6a034cda2ee 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.iceberg.catalog.TableIdentifier; /** @@ -30,4 +31,5 @@ public interface IcebergCatalog { IcebergTable openTable(String dbName, String tableName) throws IOException; String getCatalogUri(); void initialize(Map properties, Configuration configuration); + boolean tableAlreadyExists(TableIdentifier tableIdentifier); } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java index 828457af112..6d8ff805003 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java @@ -66,7 +66,8 @@ public class IcebergDataset implements PrioritizedCopyableDataset { private final String dbName; private final String inputTableName; private final IcebergTable srcIcebergTable; - private final IcebergTable existingDestinationIcebergTable; + /** Presumed destination {@link IcebergTable} exists */ + private final IcebergTable destIcebergTable; protected final Properties properties; protected final FileSystem sourceFs; private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired @@ -74,11 +75,11 @@ public class IcebergDataset implements PrioritizedCopyableDataset { /** Target database name */ public static final String TARGET_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database"; - public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, IcebergTable existingDestinationIcebergTable, Properties properties, FileSystem sourceFs) { + public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs) { this.dbName = db; this.inputTableName = table; this.srcIcebergTable = srcIcebergTable; - this.existingDestinationIcebergTable = existingDestinationIcebergTable; + this.destIcebergTable = destIcebergTable; this.properties = properties; this.sourceFs = sourceFs; } @@ -155,7 +156,7 @@ Collection generateCopyEntities(FileSystem targetFs, CopyConfigurati copyEntities.add(fileEntity); } - copyEntities.add(addPostPublishStep(this.srcIcebergTable, this.existingDestinationIcebergTable)); + copyEntities.add(createPostPublishStep(this.srcIcebergTable, this.destIcebergTable)); log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, copyEntities.size()); return copyEntities; } @@ -313,10 +314,10 @@ protected DatasetDescriptor getSourceDataset(FileSystem sourceFs) { } protected DatasetDescriptor getDestinationDataset(FileSystem targetFs) { - return this.existingDestinationIcebergTable.getDatasetDescriptor(targetFs); + return this.destIcebergTable.getDatasetDescriptor(targetFs); } - private PostPublishStep addPostPublishStep(IcebergTable srcIcebergTable, IcebergTable dstIcebergTable) { + private PostPublishStep createPostPublishStep(IcebergTable srcIcebergTable, IcebergTable dstIcebergTable) { IcebergRegisterStep icebergRegisterStep = new IcebergRegisterStep(srcIcebergTable, dstIcebergTable); return new PostPublishStep(getFileSetId(), Maps.newHashMap(), icebergRegisterStep, 0); } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index b39563a83fc..511141c6da2 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -50,12 +50,13 @@ public class IcebergDatasetFinder implements IterableDatasetFinder { public static final String ICEBERG_DATASET_PREFIX = DatasetConstants.PLATFORM_ICEBERG + ".dataset"; public static final String ICEBERG_CLUSTER_KEY = "cluster"; - public static final String ICEBERG_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.class"; + public static final String ICEBERG_SRC_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.class"; public static final String DEFAULT_ICEBERG_CATALOG_CLASS = "org.apache.gobblin.data.management.copy.iceberg.IcebergHiveCatalog"; public static final String ICEBERG_SRC_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".source.catalog.uri"; public static final String ICEBERG_SRC_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".source.cluster.name"; - public static final String ICEBERG_DST_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri"; - public static final String ICEBERG_DST_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".target.cluster.name"; + public static final String ICEBERG_DEST_CATALOG_CLASS_KEY = ICEBERG_DATASET_PREFIX + ".target.catalog.class"; + public static final String ICEBERG_DEST_CATALOG_URI_KEY = ICEBERG_DATASET_PREFIX + ".copy.target.catalog.uri"; + public static final String ICEBERG_DEST_CLUSTER_NAME = ICEBERG_DATASET_PREFIX + ".target.cluster.name"; public static final String ICEBERG_DB_NAME = ICEBERG_DATASET_PREFIX + ".database.name"; public static final String ICEBERG_TABLE_NAME = ICEBERG_DATASET_PREFIX + ".table.name"; @@ -112,8 +113,8 @@ public Iterator getDatasetsIterator() throws IOException { */ protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, Properties properties, FileSystem fs) throws IOException { IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName); - IcebergTable existingDestinationIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName); - return new IcebergDataset(dbName, tblName, srcIcebergTable, existingDestinationIcebergTable, properties, fs); + IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName); + return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs); } protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLocation location) throws IOException { @@ -127,17 +128,18 @@ protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLoca Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service URI is required"); // introducing an optional property for catalogs requiring cluster specific properties Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); + icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); break; case DESTINATION: - catalogUri = properties.getProperty(ICEBERG_DST_CATALOG_URI_KEY); + catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY); Preconditions.checkNotNull(catalogUri, "Destination Catalog Table Service URI is required"); // introducing an optional property for catalogs requiring cluster specific properties - Optional.ofNullable(properties.getProperty(ICEBERG_DST_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); + Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); + icebergCatalogClassName = properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); break; default: throw new UnsupportedOperationException("Incorrect desired location: %s provided for creating Iceberg Catalog" + location); } - icebergCatalogClassName = properties.getProperty(ICEBERG_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); catalogProperties.put(CatalogProperties.URI, catalogUri); return IcebergCatalogFactory.create(icebergCatalogClassName, catalogProperties, configuration); } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java index 8c8bcb0a437..7d97e0b2a01 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java @@ -17,7 +17,6 @@ package org.apache.gobblin.data.management.copy.iceberg; -import java.io.IOException; import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -54,10 +53,12 @@ public String getCatalogUri() { } @Override - protected TableOperations createTableOperations(TableIdentifier tableId) throws IOException { - if (!hc.tableExists(tableId)) { - throw new IcebergTable.TableNotFoundException(tableId); - } + protected TableOperations createTableOperations(TableIdentifier tableId) { return hc.newTableOps(tableId); } + + @Override + public boolean tableAlreadyExists(TableIdentifier tableId) { + return hc.tableExists(tableId); + } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java index b494cfda4a2..b5c7ae1654a 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java @@ -35,7 +35,7 @@ public class IcebergRegisterStep implements CommitStep { private final IcebergTable srcIcebergTable; - private final IcebergTable existingDestinationIcebergTable; + private final IcebergTable destIcebergTable; @Override public boolean isCompleted() throws IOException { @@ -46,10 +46,10 @@ public boolean isCompleted() throws IOException { public void execute() throws IOException { TableMetadata destinationMetadata = null; try { - destinationMetadata = this.existingDestinationIcebergTable.accessTableMetadata(); + destinationMetadata = this.destIcebergTable.accessTableMetadata(); } catch (IcebergTable.TableNotFoundException tnfe) { - log.warn("Destination TableMetadata doesn't exist because : {}" , tnfe); + log.warn("Destination TableMetadata doesn't exist because: " , tnfe); } - this.existingDestinationIcebergTable.registerIcebergTable(this.srcIcebergTable.accessTableMetadata(), destinationMetadata); + this.destIcebergTable.registerIcebergTable(this.srcIcebergTable.accessTableMetadata(), destinationMetadata); } } diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java index 7485af19b6c..c1872cb4ad8 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetTest.java @@ -226,10 +226,10 @@ public void testGenerateCopyEntitiesWhenDestEmpty() throws IOException { sourceBuilder.addPaths(expectedPaths); FileSystem sourceFs = sourceBuilder.build(); - IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergTable targetTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); IcebergDataset icebergDataset = - new TrickIcebergDataset(testDbName, testTblName, icebergTable, targetTable, new Properties(), sourceFs); + new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -252,10 +252,10 @@ public void testGenerateCopyEntitiesMultiSnapshotWhenDestEmpty() throws IOExcept sourceBuilder.addPaths(expectedPaths); FileSystem sourceFs = sourceBuilder.build(); - IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); - IcebergTable targetTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1, SNAPSHOT_PATHS_0)); + IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); IcebergDataset icebergDataset = - new TrickIcebergDataset(testDbName, testTblName, icebergTable, targetTable, new Properties(), sourceFs); + new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -283,9 +283,9 @@ public void testFsOwnershipAndPermissionPreservationWhenDestEmpty() throws IOExc sourceBuilder.addPathsAndFileStatuses(expectedPathsAndFileStatuses); FileSystem sourceFs = sourceBuilder.build(); - IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergTable targetTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, targetTable, new Properties(), sourceFs); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); + IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -311,9 +311,9 @@ public void testFsOwnershipAndPermissionWithoutPreservationWhenDestEmpty() throw sourceBuilder.addPaths(expectedPaths); FileSystem sourceFs = sourceBuilder.build(); - IcebergTable icebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); - IcebergTable targetTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); - IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, icebergTable, targetTable, new Properties(), sourceFs); + IcebergTable srcIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_0)); + IcebergTable destIcebergTable = MockIcebergTable.withSnapshots(Arrays.asList(SNAPSHOT_PATHS_1)); + IcebergDataset icebergDataset = new TrickIcebergDataset(testDbName, testTblName, srcIcebergTable, destIcebergTable, new Properties(), sourceFs); MockFileSystemBuilder destBuilder = new MockFileSystemBuilder(DEST_FS_URI); FileSystem destFs = destBuilder.build(); @@ -449,9 +449,9 @@ private static void verifyPostPublishStep(String json) { * Without this, so to lose the mock, we'd be unable to set up any source paths as existing. */ protected static class TrickIcebergDataset extends IcebergDataset { - public TrickIcebergDataset(String db, String table, IcebergTable icebergTbl, IcebergTable targetIcebergTbl, Properties properties, + public TrickIcebergDataset(String db, String table, IcebergTable srcIcebergTbl, IcebergTable destIcebergTbl, Properties properties, FileSystem sourceFs) { - super(db, table, icebergTbl, targetIcebergTbl, properties, sourceFs); + super(db, table, srcIcebergTbl, destIcebergTbl, properties, sourceFs); } @Override // as the `static` is not mock-able From 9e82bdbc052380114e0cbc9f8083d7181023be9e Mon Sep 17 00:00:00 2001 From: Meeth Gala Date: Tue, 28 Mar 2023 21:52:02 -0700 Subject: [PATCH 4/9] adding pre-check for dest iceberg table --- .../management/copy/iceberg/BaseIcebergCatalog.java | 4 ++-- .../data/management/copy/iceberg/IcebergCatalog.java | 2 +- .../data/management/copy/iceberg/IcebergDataset.java | 4 ++-- .../management/copy/iceberg/IcebergDatasetFinder.java | 10 ++++++---- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java index 565d1292aaf..07da9cbf4c9 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java @@ -41,7 +41,7 @@ protected BaseIcebergCatalog(String catalogName, Class compan } @Override - public IcebergTable openTable(String dbName, String tableName) throws IOException { + public IcebergTable openTable(String dbName, String tableName) { TableIdentifier tableId = TableIdentifier.of(dbName, tableName); return new IcebergTable(tableId, createTableOperations(tableId), this.getCatalogUri()); } @@ -50,5 +50,5 @@ protected Catalog createCompanionCatalog(Map properties, Configu return CatalogUtil.loadCatalog(this.companionCatalogClass.getName(), this.catalogName, properties, configuration); } - protected abstract TableOperations createTableOperations(TableIdentifier tableId) throws IOException; + protected abstract TableOperations createTableOperations(TableIdentifier tableId); } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java index 6a034cda2ee..1db78674ef5 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java @@ -28,7 +28,7 @@ * Any catalog from which to access {@link IcebergTable}s. */ public interface IcebergCatalog { - IcebergTable openTable(String dbName, String tableName) throws IOException; + IcebergTable openTable(String dbName, String tableName); String getCatalogUri(); void initialize(Map properties, Configuration configuration); boolean tableAlreadyExists(TableIdentifier tableIdentifier); diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java index 6d8ff805003..3c3d32fd419 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java @@ -72,8 +72,8 @@ public class IcebergDataset implements PrioritizedCopyableDataset { protected final FileSystem sourceFs; private final boolean shouldTolerateMissingSourceFiles = true; // TODO: make parameterizable, if desired - /** Target database name */ - public static final String TARGET_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.target.database"; + /** Destination database name */ + public static final String DESTINATION_DATABASE_KEY = IcebergDatasetFinder.ICEBERG_DATASET_PREFIX + ".copy.destination.database"; public IcebergDataset(String db, String table, IcebergTable srcIcebergTable, IcebergTable destIcebergTable, Properties properties, FileSystem sourceFs) { this.dbName = db; diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index 511141c6da2..6954d57de34 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import lombok.RequiredArgsConstructor; @@ -54,9 +55,9 @@ public class IcebergDatasetFinder implements IterableDatasetFinder findDatasets() throws IOException { */ matchingDatasets.add(createIcebergDataset(dbName, tblName, sourceIcebergCatalog, destinationIcebergCatalog, properties, sourceFs)); log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(), - matchingDatasets, dbName, tblName); + matchingDatasets, dbName, tblName); // until future support added to specify multiple icebergs, count expected always to be one return matchingDatasets; } @@ -114,6 +115,7 @@ public Iterator getDatasetsIterator() throws IOException { protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, Properties properties, FileSystem fs) throws IOException { IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName); IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName); + Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(TableIdentifier.of(dbName, tblName)), "Missing Destination Iceberg Table!"); return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs); } From 61e1829ac345b8549aa87af2fd4179b72e5d0642 Mon Sep 17 00:00:00 2001 From: Meeth Gala Date: Wed, 29 Mar 2023 11:22:27 -0700 Subject: [PATCH 5/9] change params for tableAlreadyExists and address checkstyle --- .../data/management/copy/iceberg/BaseIcebergCatalog.java | 1 - .../data/management/copy/iceberg/IcebergCatalog.java | 4 +--- .../management/copy/iceberg/IcebergDatasetFinder.java | 3 +-- .../data/management/copy/iceberg/IcebergHiveCatalog.java | 8 ++++---- .../data/management/copy/iceberg/IcebergTable.java | 2 +- 5 files changed, 7 insertions(+), 11 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java index 07da9cbf4c9..0ac4dcc0b8a 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/BaseIcebergCatalog.java @@ -17,7 +17,6 @@ package org.apache.gobblin.data.management.copy.iceberg; -import java.io.IOException; import java.util.Map; import org.apache.hadoop.conf.Configuration; diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java index 1db78674ef5..ac342e2e3a4 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergCatalog.java @@ -17,11 +17,9 @@ package org.apache.gobblin.data.management.copy.iceberg; -import java.io.IOException; import java.util.Map; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.catalog.TableIdentifier; /** @@ -31,5 +29,5 @@ public interface IcebergCatalog { IcebergTable openTable(String dbName, String tableName); String getCatalogUri(); void initialize(Map properties, Configuration configuration); - boolean tableAlreadyExists(TableIdentifier tableIdentifier); + boolean tableAlreadyExists(IcebergTable icebergTable); } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index 6954d57de34..d9773f1571e 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -31,7 +31,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.iceberg.CatalogProperties; -import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import lombok.RequiredArgsConstructor; @@ -115,7 +114,7 @@ public Iterator getDatasetsIterator() throws IOException { protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, Properties properties, FileSystem fs) throws IOException { IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName); IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName); - Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(TableIdentifier.of(dbName, tblName)), "Missing Destination Iceberg Table!"); + Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, tblName)); return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs); } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java index 7d97e0b2a01..af541a79a56 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergHiveCatalog.java @@ -20,7 +20,7 @@ import java.util.Map; import org.apache.hadoop.conf.Configuration; -import org.apache.iceberg.CatalogProperties; +import org.apache.hadoop.hive.conf.HiveConf; import org.apache.iceberg.TableOperations; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hive.HiveCatalog; @@ -49,7 +49,7 @@ public void initialize(Map properties, Configuration configurati @Override public String getCatalogUri() { - return hc.getConf().get(CatalogProperties.URI, "<>"); + return hc.getConf().get(HiveConf.ConfVars.METASTOREURIS.varname, "<>"); } @Override @@ -58,7 +58,7 @@ protected TableOperations createTableOperations(TableIdentifier tableId) { } @Override - public boolean tableAlreadyExists(TableIdentifier tableId) { - return hc.tableExists(tableId); + public boolean tableAlreadyExists(IcebergTable icebergTable) { + return hc.tableExists(icebergTable.getTableId()); } } diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index 7561b1847aa..64923677fdf 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -67,7 +67,7 @@ public TableNotFoundException(TableIdentifier tableId) { this.tableId = tableId; } } - + @Getter private final TableIdentifier tableId; private final TableOperations tableOps; private final String catalogUri; From a487cc7464b80f8706fce0ef2f23db9157b9a10d Mon Sep 17 00:00:00 2001 From: Meeth Gala Date: Wed, 29 Mar 2023 13:10:16 -0700 Subject: [PATCH 6/9] add pre-check for iceberg table on source and guard against missing dest metadata while committing --- .../data/management/copy/iceberg/IcebergDataset.java | 1 - .../data/management/copy/iceberg/IcebergDatasetFinder.java | 5 ++--- .../data/management/copy/iceberg/IcebergRegisterStep.java | 1 - .../gobblin/data/management/copy/iceberg/IcebergTable.java | 6 ++++-- .../data/management/copy/iceberg/IcebergTableTest.java | 1 - 5 files changed, 6 insertions(+), 8 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java index 3c3d32fd419..f4db7d4ff78 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDataset.java @@ -154,7 +154,6 @@ Collection generateCopyEntities(FileSystem targetFs, CopyConfigurati fileEntity.setSourceData(getSourceDataset(this.sourceFs)); fileEntity.setDestinationData(getDestinationDataset(targetFs)); copyEntities.add(fileEntity); - } copyEntities.add(createPostPublishStep(this.srcIcebergTable, this.destIcebergTable)); log.info("~{}.{}~ generated {} copy entities", dbName, inputTableName, copyEntities.size()); diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index d9773f1571e..a02b527f7f8 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -87,9 +87,7 @@ public List findDatasets() throws IOException { IcebergCatalog sourceIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.SOURCE); IcebergCatalog destinationIcebergCatalog = createIcebergCatalog(this.properties, CatalogLocation.DESTINATION); - /* Each Iceberg dataset maps to an Iceberg table - * TODO: The user provided database and table names needs to be pre-checked and verified against the existence of a valid Iceberg table - */ + /* Each Iceberg dataset maps to an Iceberg table */ matchingDatasets.add(createIcebergDataset(dbName, tblName, sourceIcebergCatalog, destinationIcebergCatalog, properties, sourceFs)); log.info("Found {} matching datasets: {} for the database name: {} and table name: {}", matchingDatasets.size(), matchingDatasets, dbName, tblName); // until future support added to specify multiple icebergs, count expected always to be one @@ -113,6 +111,7 @@ public Iterator getDatasetsIterator() throws IOException { */ protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, Properties properties, FileSystem fs) throws IOException { IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName); + Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName)); IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName); Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, tblName)); return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs); diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java index b5c7ae1654a..75f26787b09 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergRegisterStep.java @@ -29,7 +29,6 @@ /** * {@link CommitStep} to perform Iceberg registration. */ - @Slf4j @AllArgsConstructor public class IcebergRegisterStep implements CommitStep { diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java index 64923677fdf..e8d0ee0ac28 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTable.java @@ -195,8 +195,10 @@ protected DatasetDescriptor getDatasetDescriptor(FileSystem fs) { return descriptor; } /** Registers {@link IcebergTable} after publishing data. - * @param dstMetadata is null if target {@link IcebergTable} is absent */ + * @param dstMetadata is null if destination {@link IcebergTable} is absent, in which case registration is skipped */ protected void registerIcebergTable(TableMetadata srcMetadata, TableMetadata dstMetadata) { - this.tableOps.commit(srcMetadata, dstMetadata); + if (dstMetadata != null) { + this.tableOps.commit(srcMetadata, dstMetadata); + } } } diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java index 3353c3365d7..20ea30610e7 100644 --- a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/copy/iceberg/IcebergTableTest.java @@ -40,7 +40,6 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.hive.HiveMetastoreTest; import org.apache.iceberg.shaded.org.apache.avro.SchemaBuilder; - import org.testng.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; From b57ca3312f7f84d026fa2ae324949f631e8ff975 Mon Sep 17 00:00:00 2001 From: Meeth Gala Date: Fri, 31 Mar 2023 11:05:19 -0700 Subject: [PATCH 7/9] address feedback on PR --- .../data/management/copy/iceberg/IcebergDatasetFinder.java | 5 +++-- gradle/scripts/dependencyDefinitions.gradle | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index a02b527f7f8..6d1f99a0cfd 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -107,6 +107,7 @@ public Iterator getDatasetsIterator() throws IOException { /** * Requires both source and destination catalogs to connect to their respective {@link IcebergTable} * Note: the destination side {@link IcebergTable} should be present before initiating replication + * TODO: Rethink strategy to enforce dest iceberg table * @return {@link IcebergDataset} with its corresponding source and destination {@link IcebergTable} */ protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, Properties properties, FileSystem fs) throws IOException { @@ -125,14 +126,14 @@ protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLoca switch (location) { case SOURCE: catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY); - Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service URI is required"); + Preconditions.checkNotNull(catalogUri, "Provided: {%s} Source Catalog Table Service URI is required", catalogUri); // introducing an optional property for catalogs requiring cluster specific properties Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); break; case DESTINATION: catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY); - Preconditions.checkNotNull(catalogUri, "Destination Catalog Table Service URI is required"); + Preconditions.checkNotNull(catalogUri, "Provided: {%s} Destination Catalog Table Service URI is required", catalogUri); // introducing an optional property for catalogs requiring cluster specific properties Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); icebergCatalogClassName = properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); diff --git a/gradle/scripts/dependencyDefinitions.gradle b/gradle/scripts/dependencyDefinitions.gradle index 8c9179b0325..56511ff0516 100644 --- a/gradle/scripts/dependencyDefinitions.gradle +++ b/gradle/scripts/dependencyDefinitions.gradle @@ -123,7 +123,7 @@ ext.externalDependency = [ "guiceMultibindings": "com.google.inject.extensions:guice-multibindings:4.0", "guiceServlet": "com.google.inject.extensions:guice-servlet:4.0", "derby": "org.apache.derby:derby:10.12.1.1", - "mockito": "org.mockito:mockito-inline:4.11.0", + "mockito": "org.mockito:mockito-inline:4.11.0", // upgraded to allow mocking for constructors, static and final methods; specifically for iceberg distcp "salesforceWsc": "com.force.api:force-wsc:" + salesforceVersion, "salesforcePartner": "com.force.api:force-partner-api:" + salesforceVersion, "scala": "org.scala-lang:scala-library:2.11.8", From 09cf43c60e5be5295d0a9fb87453e939393e3e11 Mon Sep 17 00:00:00 2001 From: Meeth Gala Date: Fri, 31 Mar 2023 11:09:34 -0700 Subject: [PATCH 8/9] updated javadoc --- .../data/management/copy/iceberg/IcebergDatasetFinder.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index 6d1f99a0cfd..78d867e85a0 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -107,13 +107,13 @@ public Iterator getDatasetsIterator() throws IOException { /** * Requires both source and destination catalogs to connect to their respective {@link IcebergTable} * Note: the destination side {@link IcebergTable} should be present before initiating replication - * TODO: Rethink strategy to enforce dest iceberg table * @return {@link IcebergDataset} with its corresponding source and destination {@link IcebergTable} */ protected IcebergDataset createIcebergDataset(String dbName, String tblName, IcebergCatalog sourceIcebergCatalog, IcebergCatalog destinationIcebergCatalog, Properties properties, FileSystem fs) throws IOException { IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName); Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName)); IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName); + // TODO: Rethink strategy to enforce dest iceberg table Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, tblName)); return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs); } From 7fbf7f1b3296ce042e850abd6453c5ffce9dbfe6 Mon Sep 17 00:00:00 2001 From: Meeth Gala Date: Fri, 31 Mar 2023 16:54:22 -0700 Subject: [PATCH 9/9] update error message for catalog uri missing --- .../data/management/copy/iceberg/IcebergDatasetFinder.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java index 78d867e85a0..b20a1bc292c 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/iceberg/IcebergDatasetFinder.java @@ -126,14 +126,14 @@ protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLoca switch (location) { case SOURCE: catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY); - Preconditions.checkNotNull(catalogUri, "Provided: {%s} Source Catalog Table Service URI is required", catalogUri); + Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Source Catalog Table Service URI is required", ICEBERG_SRC_CATALOG_URI_KEY); // introducing an optional property for catalogs requiring cluster specific properties Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS); break; case DESTINATION: catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY); - Preconditions.checkNotNull(catalogUri, "Provided: {%s} Destination Catalog Table Service URI is required", catalogUri); + Preconditions.checkNotNull(catalogUri, "Provide: {%s} as Destination Catalog Table Service URI is required", ICEBERG_DEST_CATALOG_URI_KEY); // introducing an optional property for catalogs requiring cluster specific properties Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value)); icebergCatalogClassName = properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);