Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1802]Register iceberg table metadata update with destination side catalog #3663

Merged
merged 9 commits into from
Apr 3, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ protected IcebergDataset createIcebergDataset(String dbName, String tblName, Ice
IcebergTable srcIcebergTable = sourceIcebergCatalog.openTable(dbName, tblName);
Preconditions.checkArgument(sourceIcebergCatalog.tableAlreadyExists(srcIcebergTable), String.format("Missing Source Iceberg Table: {%s}.{%s}", dbName, tblName));
IcebergTable destIcebergTable = destinationIcebergCatalog.openTable(dbName, tblName);
// TODO: Rethink strategy to enforce dest iceberg table
Preconditions.checkArgument(destinationIcebergCatalog.tableAlreadyExists(destIcebergTable), String.format("Missing Destination Iceberg Table: {%s}.{%s}", dbName, tblName));
return new IcebergDataset(dbName, tblName, srcIcebergTable, destIcebergTable, properties, fs);
}
Expand All @@ -125,14 +126,14 @@ protected IcebergCatalog createIcebergCatalog(Properties properties, CatalogLoca
switch (location) {
case SOURCE:
catalogUri = properties.getProperty(ICEBERG_SRC_CATALOG_URI_KEY);
Preconditions.checkNotNull(catalogUri, "Source Catalog Table Service URI is required");
Preconditions.checkNotNull(catalogUri, "Provided: {%s} Source Catalog Table Service URI is required", catalogUri);
meethngala marked this conversation as resolved.
Show resolved Hide resolved
// introducing an optional property for catalogs requiring cluster specific properties
Optional.ofNullable(properties.getProperty(ICEBERG_SRC_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
icebergCatalogClassName = properties.getProperty(ICEBERG_SRC_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
break;
case DESTINATION:
catalogUri = properties.getProperty(ICEBERG_DEST_CATALOG_URI_KEY);
Preconditions.checkNotNull(catalogUri, "Destination Catalog Table Service URI is required");
Preconditions.checkNotNull(catalogUri, "Provided: {%s} Destination Catalog Table Service URI is required", catalogUri);
// introducing an optional property for catalogs requiring cluster specific properties
Optional.ofNullable(properties.getProperty(ICEBERG_DEST_CLUSTER_NAME)).ifPresent(value -> catalogProperties.put(ICEBERG_CLUSTER_KEY, value));
icebergCatalogClassName = properties.getProperty(ICEBERG_DEST_CATALOG_CLASS_KEY, DEFAULT_ICEBERG_CATALOG_CLASS);
Expand Down
2 changes: 1 addition & 1 deletion gradle/scripts/dependencyDefinitions.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ ext.externalDependency = [
"guiceMultibindings": "com.google.inject.extensions:guice-multibindings:4.0",
"guiceServlet": "com.google.inject.extensions:guice-servlet:4.0",
"derby": "org.apache.derby:derby:10.12.1.1",
"mockito": "org.mockito:mockito-inline:4.11.0",
"mockito": "org.mockito:mockito-inline:4.11.0", // upgraded to allow mocking for constructors, static and final methods; specifically for iceberg distcp
"salesforceWsc": "com.force.api:force-wsc:" + salesforceVersion,
"salesforcePartner": "com.force.api:force-partner-api:" + salesforceVersion,
"scala": "org.scala-lang:scala-library:2.11.8",
Expand Down