diff --git a/airbyte-integrations/connectors/destination-databricks/Dockerfile b/airbyte-integrations/connectors/destination-databricks/Dockerfile index ce0effa21c53..450c77f83318 100644 --- a/airbyte-integrations/connectors/destination-databricks/Dockerfile +++ b/airbyte-integrations/connectors/destination-databricks/Dockerfile @@ -24,5 +24,5 @@ ENV APPLICATION destination-databricks COPY --from=build /airbyte /airbyte -LABEL io.airbyte.version=1.0.2 +LABEL io.airbyte.version=1.1.0 LABEL io.airbyte.name=airbyte/destination-databricks diff --git a/airbyte-integrations/connectors/destination-databricks/metadata.yaml b/airbyte-integrations/connectors/destination-databricks/metadata.yaml index 6726b37d9455..ff4720017197 100644 --- a/airbyte-integrations/connectors/destination-databricks/metadata.yaml +++ b/airbyte-integrations/connectors/destination-databricks/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: database connectorType: destination definitionId: 072d5540-f236-4294-ba7c-ade8fd918496 - dockerImageTag: 1.0.2 + dockerImageTag: 1.1.0 dockerRepository: airbyte/destination-databricks githubIssueLabel: destination-databricks icon: databricks.svg diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java index d363e9bb440d..dbecbca4f33c 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java @@ -11,6 +11,7 @@ import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_PORT_KEY; import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_PURGE_STAGING_DATA_KEY; import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_SCHEMA_KEY; +import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_ENABLE_SCHEMA_EVOLUTION_KEY; import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_SERVER_HOSTNAME_KEY; import com.fasterxml.jackson.databind.JsonNode; @@ -23,12 +24,14 @@ public record DatabricksDestinationConfig(String serverHostname, String catalog, String schema, boolean isPurgeStagingData, + boolean enableSchemaEvolution, DatabricksStorageConfigProvider storageConfig) { static final String DEFAULT_DATABRICKS_PORT = "443"; static final String DEFAULT_DATABASE_SCHEMA = "default"; static final String DEFAULT_CATALOG = "hive_metastore"; static final boolean DEFAULT_PURGE_STAGING_DATA = true; + static final boolean DEFAULT_ENABLE_SCHEMA_EVOLUTION = false; public static DatabricksDestinationConfig get(final JsonNode config) { Preconditions.checkArgument( @@ -42,6 +45,7 @@ public static DatabricksDestinationConfig get(final JsonNode config) { config.get(DATABRICKS_PERSONAL_ACCESS_TOKEN_KEY).asText(), config.has(DATABRICKS_CATALOG_KEY) ? config.get(DATABRICKS_CATALOG_KEY).asText() : DEFAULT_CATALOG, config.has(DATABRICKS_SCHEMA_KEY) ? config.get(DATABRICKS_SCHEMA_KEY).asText() : DEFAULT_DATABASE_SCHEMA, + config.has(DATABRICKS_ENABLE_SCHEMA_EVOLUTION_KEY) ? config.get(DATABRICKS_ENABLE_SCHEMA_EVOLUTION_KEY).asBoolean() : DEFAULT_ENABLE_SCHEMA_EVOLUTION, config.has(DATABRICKS_PURGE_STAGING_DATA_KEY) ? config.get(DATABRICKS_PURGE_STAGING_DATA_KEY).asBoolean() : DEFAULT_PURGE_STAGING_DATA, DatabricksStorageConfigProvider.getDatabricksStorageConfig(config.get(DATABRICKS_DATA_SOURCE_KEY))); } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/s3/DatabricksS3StreamCopier.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/s3/DatabricksS3StreamCopier.java index fade966d0940..3d4d4773d30c 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/s3/DatabricksS3StreamCopier.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/s3/DatabricksS3StreamCopier.java @@ -115,12 +115,14 @@ public String generateMergeStatement(final String destTableName) { "COPY INTO %s.%s.%s " + "FROM '%s' " + "FILEFORMAT = PARQUET " + - "PATTERN = '%s'", + "PATTERN = '%s' " + + "COPY_OPTIONS ('mergeSchema' = '%s')", catalogName, schemaName, destTableName, getTmpTableLocation(), - parquetWriter.getOutputFilename()); + parquetWriter.getOutputFilename(), + databricksConfig.enableSchemaEvolution()); LOGGER.info(copyData); return copyData; } diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/utils/DatabricksConstants.java b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/utils/DatabricksConstants.java index f92eea022e12..04226ddf801f 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/utils/DatabricksConstants.java +++ b/airbyte-integrations/connectors/destination-databricks/src/main/java/io/airbyte/integrations/destination/databricks/utils/DatabricksConstants.java @@ -16,6 +16,7 @@ public class DatabricksConstants { public static final String DATABRICKS_PORT_KEY = "databricks_port"; public static final String DATABRICKS_CATALOG_KEY = "database"; public static final String DATABRICKS_SCHEMA_KEY = "schema"; + public static final String DATABRICKS_ENABLE_SCHEMA_EVOLUTION_KEY = "enable_schema_evolution"; public static final String DATABRICKS_CATALOG_JDBC_KEY = "ConnCatalog"; public static final String DATABRICKS_SCHEMA_JDBC_KEY = "ConnSchema"; public static final String DATABRICKS_PURGE_STAGING_DATA_KEY = "purge_staging_data"; diff --git a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json index c5dca06931d3..763be9d75815 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json +++ b/airbyte-integrations/connectors/destination-databricks/src/main/resources/spec.json @@ -67,12 +67,19 @@ "default": "default", "order": 7 }, + "enable_schema_evolution": { + "title": "Support schema evolution for all streams.", + "type": "boolean", + "description": "Support schema evolution for all streams. If \"false\", the connector might fail when a stream's schema changes.", + "default": false, + "order": 8 + }, "data_source": { "title": "Data Source", "type": "object", "description": "Storage on which the delta lake is built.", "default": "MANAGED_TABLES_STORAGE", - "order": 8, + "order": 9, "oneOf": [ { "title": "[Recommended] Managed tables", @@ -236,7 +243,7 @@ "type": "boolean", "description": "Default to 'true'. Switch it to 'false' for debugging purpose.", "default": true, - "order": 9 + "order": 10 } } } diff --git a/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfigTest.java b/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfigTest.java index 77cc43081f90..d70b832054cd 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfigTest.java +++ b/airbyte-integrations/connectors/destination-databricks/src/test/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfigTest.java @@ -44,10 +44,11 @@ public void testConfigCreationFromJsonS3() { assertEquals(DatabricksDestinationConfig.DEFAULT_DATABRICKS_PORT, config1.port()); assertEquals(DatabricksDestinationConfig.DEFAULT_DATABASE_SCHEMA, config1.schema()); - databricksConfig.put("databricks_port", "1000").put("schema", "testing_schema"); + databricksConfig.put("databricks_port", "1000").put("schema", "testing_schema").put("enable_schema_evolution", true); final DatabricksDestinationConfig config2 = DatabricksDestinationConfig.get(databricksConfig); assertEquals("1000", config2.port()); assertEquals("testing_schema", config2.schema()); + assertEquals(true, config2.enableSchemaEvolution()); assertEquals(DatabricksS3StorageConfigProvider.class, config2.storageConfig().getClass()); } @@ -76,11 +77,12 @@ public void testConfigCreationFromJsonAzure() { assertEquals(DatabricksDestinationConfig.DEFAULT_DATABRICKS_PORT, config1.port()); assertEquals(DatabricksDestinationConfig.DEFAULT_DATABASE_SCHEMA, config1.schema()); - databricksConfig.put("databricks_port", "1000").put("schema", "testing_schema"); + databricksConfig.put("databricks_port", "1000").put("schema", "testing_schema").put("enable_schema_evolution", true); final DatabricksDestinationConfig config2 = DatabricksDestinationConfig.get(databricksConfig); assertEquals("1000", config2.port()); assertEquals("testing_schema", config2.schema()); - + assertEquals(true, config2.enableSchemaEvolution()); + assertEquals(DatabricksAzureBlobStorageConfigProvider.class, config2.storageConfig().getClass()); } diff --git a/airbyte-integrations/connectors/destination-databricks/src/test/resources/config.json b/airbyte-integrations/connectors/destination-databricks/src/test/resources/config.json index 61b5234260ea..e72c71f477d7 100644 --- a/airbyte-integrations/connectors/destination-databricks/src/test/resources/config.json +++ b/airbyte-integrations/connectors/destination-databricks/src/test/resources/config.json @@ -6,6 +6,7 @@ "databricks_personal_access_token": "test_token", "database" : "test", "schema": "test", + "enable_schema_evolution": "true", "data_source": { "data_source_type": "S3_STORAGE", "s3_bucket_name": "required", diff --git a/docs/integrations/destinations/databricks.md b/docs/integrations/destinations/databricks.md index fd2c7c666d16..1b1d89f94eb2 100644 --- a/docs/integrations/destinations/databricks.md +++ b/docs/integrations/destinations/databricks.md @@ -177,7 +177,8 @@ Provide your Amazon S3 data: | | Port | string | Optional. Default to "443". See [documentation](https://docs.databricks.com/integrations/bi/jdbc-odbc-bi.html#get-server-hostname-port-http-path-and-jdbc-url). | | | Personal Access Token | string | Required. Example: `dapi0123456789abcdefghij0123456789AB`. See [documentation](https://docs.databricks.com/sql/user/security/personal-access-tokens.html). | | General | Databricks catalog | string | Optional. The name of the catalog. If not specified otherwise, the "hive_metastore" will be used. | -| | Database schema | string | Optional. The default schema tables are written. If not specified otherwise, the "default" will be used. | +| | Database schema | string | Optional. The default schema tables are written. If not specified otherwise, the "default" will be used. +| | Schema evolution | boolean | Optional. The connector enables automatic schema evolution in the destination tables. | | | Purge Staging Data | boolean | The connector creates staging files and tables on S3 or Azure. By default, they will be purged when the data sync is complete. Set it to `false` for debugging purposes. | | Data Source - S3 | Bucket Name | string | Name of the bucket to sync data into. | | | Bucket Path | string | Subdirectory under the above bucket to sync the data into. | @@ -261,6 +262,7 @@ Suppose you are interested in learning more about the Databricks connector or de | Version | Date | Pull Request | Subject | |:--------|:-----------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------| +| 1.1.0 | 2023-06-02 | [\#26942](https://github.com/airbytehq/airbyte/pull/26942) | Support schema evolution | | 1.0.2 | 2023-04-20 | [\#25366](https://github.com/airbytehq/airbyte/pull/25366) | Fix default catalog to be `hive_metastore` | | 1.0.1 | 2023-03-30 | [\#24657](https://github.com/airbytehq/airbyte/pull/24657) | Fix support for external tables on S3 | | 1.0.0 | 2023-03-21 | [\#23965](https://github.com/airbytehq/airbyte/pull/23965) | Added: Managed table storage type, Databricks Catalog field |