diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java index d363e9bb440d..dbecbca4f33c 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java @@ -11,6 +11,7 @@ import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_PORT_KEY; import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_PURGE_STAGING_DATA_KEY; import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_SCHEMA_KEY; +import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_ENABLE_SCHEMA_EVOLUTION_KEY; import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_SERVER_HOSTNAME_KEY; import com.fasterxml.jackson.databind.JsonNode; @@ -23,12 +24,14 @@ public record DatabricksDestinationConfig(String serverHostname, String catalog, String schema, boolean isPurgeStagingData, + boolean enableSchemaEvolution, DatabricksStorageConfigProvider storageConfig) { static final String DEFAULT_DATABRICKS_PORT = "443"; static final String DEFAULT_DATABASE_SCHEMA = "default"; static final String DEFAULT_CATALOG = "hive_metastore"; static final boolean DEFAULT_PURGE_STAGING_DATA = true; + static final boolean DEFAULT_ENABLE_SCHEMA_EVOLUTION = false; public static DatabricksDestinationConfig get(final JsonNode config) { Preconditions.checkArgument( @@ -42,6 +45,7 @@ public static DatabricksDestinationConfig get(final JsonNode config) { config.get(DATABRICKS_PERSONAL_ACCESS_TOKEN_KEY).asText(), config.has(DATABRICKS_CATALOG_KEY) ? config.get(DATABRICKS_CATALOG_KEY).asText() : DEFAULT_CATALOG, config.has(DATABRICKS_SCHEMA_KEY) ? config.get(DATABRICKS_SCHEMA_KEY).asText() : DEFAULT_DATABASE_SCHEMA, + config.has(DATABRICKS_ENABLE_SCHEMA_EVOLUTION_KEY) ? config.get(DATABRICKS_ENABLE_SCHEMA_EVOLUTION_KEY).asBoolean() : DEFAULT_ENABLE_SCHEMA_EVOLUTION, config.has(DATABRICKS_PURGE_STAGING_DATA_KEY) ? config.get(DATABRICKS_PURGE_STAGING_DATA_KEY).asBoolean() : DEFAULT_PURGE_STAGING_DATA, DatabricksStorageConfigProvider.getDatabricksStorageConfig(config.get(DATABRICKS_DATA_SOURCE_KEY))); } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/s3/DatabricksS3StreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/s3/DatabricksS3StreamCopier.java index fade966d0940..3d4d4773d30c 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/s3/DatabricksS3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/s3/DatabricksS3StreamCopier.java @@ -115,12 +115,14 @@ public String generateMergeStatement(final String destTableName) { "COPY INTO %s.%s.%s " + "FROM '%s' " + "FILEFORMAT = PARQUET " + - "PATTERN = '%s'", + "PATTERN = '%s' " + + "COPY_OPTIONS ('mergeSchema' = '%s')", catalogName, schemaName, destTableName, getTmpTableLocation(), - parquetWriter.getOutputFilename()); + parquetWriter.getOutputFilename(), + databricksConfig.enableSchemaEvolution()); LOGGER.info(copyData); return copyData; } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/utils/DatabricksConstants.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/utils/DatabricksConstants.java index f92eea022e12..04226ddf801f 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/utils/DatabricksConstants.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/utils/DatabricksConstants.java @@ -16,6 +16,7 @@ public class DatabricksConstants { public static final String DATABRICKS_PORT_KEY = "databricks_port"; public static final String DATABRICKS_CATALOG_KEY = "database"; public static final String DATABRICKS_SCHEMA_KEY = "schema"; + public static final String DATABRICKS_ENABLE_SCHEMA_EVOLUTION_KEY = "enable_schema_evolution"; public static final String DATABRICKS_CATALOG_JDBC_KEY = "ConnCatalog"; public static final String DATABRICKS_SCHEMA_JDBC_KEY = "ConnSchema"; public static final String DATABRICKS_PURGE_STAGING_DATA_KEY = "purge_staging_data"; diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json index c5dca06931d3..763be9d75815 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json @@ -67,12 +67,19 @@ "default": "default", "order": 7 }, + "enable_schema_evolution": { + "title": "Support schema evolution for all streams.", + "type": "boolean", + "description": "Support schema evolution for all streams. If \"false\", the connector might fail when a stream's schema changes.", + "default": false, + "order": 8 + }, "data_source": { "title": "Data Source", "type": "object", "description": "Storage on which the delta lake is built.", "default": "MANAGED_TABLES_STORAGE", - "order": 8, + "order": 9, "oneOf": [ { "title": "[Recommended] Managed tables", @@ -236,7 +243,7 @@ "type": "boolean", "description": "Default to 'true'. Switch it to 'false' for debugging purpose.", "default": true, - "order": 9 + "order": 10 } } } diff --git a/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfigTest.java b/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfigTest.java index 77cc43081f90..d70b832054cd 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfigTest.java +++ b/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfigTest.java @@ -44,10 +44,11 @@ public void testConfigCreationFromJsonS3() { assertEquals(DatabricksDestinationConfig.DEFAULT_DATABRICKS_PORT, config1.port()); assertEquals(DatabricksDestinationConfig.DEFAULT_DATABASE_SCHEMA, config1.schema()); - databricksConfig.put("databricks_port", "1000").put("schema", "testing_schema"); + databricksConfig.put("databricks_port", "1000").put("schema", "testing_schema").put("enable_schema_evolution", true); final DatabricksDestinationConfig config2 = DatabricksDestinationConfig.get(databricksConfig); assertEquals("1000", config2.port()); assertEquals("testing_schema", config2.schema()); + assertEquals(true, config2.enableSchemaEvolution()); assertEquals(DatabricksS3StorageConfigProvider.class, config2.storageConfig().getClass()); } @@ -76,11 +77,12 @@ public void testConfigCreationFromJsonAzure() { assertEquals(DatabricksDestinationConfig.DEFAULT_DATABRICKS_PORT, config1.port()); assertEquals(DatabricksDestinationConfig.DEFAULT_DATABASE_SCHEMA, config1.schema()); - databricksConfig.put("databricks_port", "1000").put("schema", "testing_schema"); + databricksConfig.put("databricks_port", "1000").put("schema", "testing_schema").put("enable_schema_evolution", true); final DatabricksDestinationConfig config2 = DatabricksDestinationConfig.get(databricksConfig); assertEquals("1000", config2.port()); assertEquals("testing_schema", config2.schema()); - + assertEquals(true, config2.enableSchemaEvolution()); + assertEquals(DatabricksAzureBlobStorageConfigProvider.class, config2.storageConfig().getClass()); } diff --git a/airbyte-integrations/connectors/destination-databricks/src/test/resources/config.json b/airbyte-integrations/connectors/destination-databricks/src/test/resources/config.json index 61b5234260ea..e72c71f477d7 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/test/resources/config.json +++ b/airbyte-integrations/connectors/destination-databricks/src/test/resources/config.json @@ -6,6 +6,7 @@ "databricks_personal_access_token": "test_token", "database" : "test", "schema": "test", + "enable_schema_evolution": "true", "data_source": { "data_source_type": "S3_STORAGE", "s3_bucket_name": "required",