Skip to content

Commit

Permalink
✨ Destination Databricks: connector supports schema evolution (schema…
Browse files Browse the repository at this point in the history
  • Loading branch information
gpodevijn authored Jun 8, 2023
1 parent 1315f5a commit eb14b96
Show file tree
Hide file tree
Showing 9 changed files with 29 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ ENV APPLICATION destination-databricks

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.2
LABEL io.airbyte.version=1.1.0
LABEL io.airbyte.name=airbyte/destination-databricks
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: 072d5540-f236-4294-ba7c-ade8fd918496
dockerImageTag: 1.0.2
dockerImageTag: 1.1.0
dockerRepository: airbyte/destination-databricks
githubIssueLabel: destination-databricks
icon: databricks.svg
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_PORT_KEY;
import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_PURGE_STAGING_DATA_KEY;
import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_SCHEMA_KEY;
import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_ENABLE_SCHEMA_EVOLUTION_KEY;
import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_SERVER_HOSTNAME_KEY;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -23,12 +24,14 @@ public record DatabricksDestinationConfig(String serverHostname,
String catalog,
String schema,
boolean isPurgeStagingData,
boolean enableSchemaEvolution,
DatabricksStorageConfigProvider storageConfig) {

static final String DEFAULT_DATABRICKS_PORT = "443";
static final String DEFAULT_DATABASE_SCHEMA = "default";
static final String DEFAULT_CATALOG = "hive_metastore";
static final boolean DEFAULT_PURGE_STAGING_DATA = true;
static final boolean DEFAULT_ENABLE_SCHEMA_EVOLUTION = false;

public static DatabricksDestinationConfig get(final JsonNode config) {
Preconditions.checkArgument(
Expand All @@ -42,6 +45,7 @@ public static DatabricksDestinationConfig get(final JsonNode config) {
config.get(DATABRICKS_PERSONAL_ACCESS_TOKEN_KEY).asText(),
config.has(DATABRICKS_CATALOG_KEY) ? config.get(DATABRICKS_CATALOG_KEY).asText() : DEFAULT_CATALOG,
config.has(DATABRICKS_SCHEMA_KEY) ? config.get(DATABRICKS_SCHEMA_KEY).asText() : DEFAULT_DATABASE_SCHEMA,
config.has(DATABRICKS_ENABLE_SCHEMA_EVOLUTION_KEY) ? config.get(DATABRICKS_ENABLE_SCHEMA_EVOLUTION_KEY).asBoolean() : DEFAULT_ENABLE_SCHEMA_EVOLUTION,
config.has(DATABRICKS_PURGE_STAGING_DATA_KEY) ? config.get(DATABRICKS_PURGE_STAGING_DATA_KEY).asBoolean() : DEFAULT_PURGE_STAGING_DATA,
DatabricksStorageConfigProvider.getDatabricksStorageConfig(config.get(DATABRICKS_DATA_SOURCE_KEY)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ public String generateMergeStatement(final String destTableName) {
"COPY INTO %s.%s.%s " +
"FROM '%s' " +
"FILEFORMAT = PARQUET " +
"PATTERN = '%s'",
"PATTERN = '%s' " +
"COPY_OPTIONS ('mergeSchema' = '%s')",
catalogName,
schemaName,
destTableName,
getTmpTableLocation(),
parquetWriter.getOutputFilename());
parquetWriter.getOutputFilename(),
databricksConfig.enableSchemaEvolution());
LOGGER.info(copyData);
return copyData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class DatabricksConstants {
public static final String DATABRICKS_PORT_KEY = "databricks_port";
public static final String DATABRICKS_CATALOG_KEY = "database";
public static final String DATABRICKS_SCHEMA_KEY = "schema";
public static final String DATABRICKS_ENABLE_SCHEMA_EVOLUTION_KEY = "enable_schema_evolution";
public static final String DATABRICKS_CATALOG_JDBC_KEY = "ConnCatalog";
public static final String DATABRICKS_SCHEMA_JDBC_KEY = "ConnSchema";
public static final String DATABRICKS_PURGE_STAGING_DATA_KEY = "purge_staging_data";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,19 @@
"default": "default",
"order": 7
},
"enable_schema_evolution": {
"title": "Support schema evolution for all streams.",
"type": "boolean",
"description": "Support schema evolution for all streams. If \"false\", the connector might fail when a stream's schema changes.",
"default": false,
"order": 8
},
"data_source": {
"title": "Data Source",
"type": "object",
"description": "Storage on which the delta lake is built.",
"default": "MANAGED_TABLES_STORAGE",
"order": 8,
"order": 9,
"oneOf": [
{
"title": "[Recommended] Managed tables",
Expand Down Expand Up @@ -236,7 +243,7 @@
"type": "boolean",
"description": "Default to 'true'. Switch it to 'false' for debugging purpose.",
"default": true,
"order": 9
"order": 10
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ public void testConfigCreationFromJsonS3() {
assertEquals(DatabricksDestinationConfig.DEFAULT_DATABRICKS_PORT, config1.port());
assertEquals(DatabricksDestinationConfig.DEFAULT_DATABASE_SCHEMA, config1.schema());

databricksConfig.put("databricks_port", "1000").put("schema", "testing_schema");
databricksConfig.put("databricks_port", "1000").put("schema", "testing_schema").put("enable_schema_evolution", true);
final DatabricksDestinationConfig config2 = DatabricksDestinationConfig.get(databricksConfig);
assertEquals("1000", config2.port());
assertEquals("testing_schema", config2.schema());
assertEquals(true, config2.enableSchemaEvolution());

assertEquals(DatabricksS3StorageConfigProvider.class, config2.storageConfig().getClass());
}
Expand Down Expand Up @@ -76,11 +77,12 @@ public void testConfigCreationFromJsonAzure() {
assertEquals(DatabricksDestinationConfig.DEFAULT_DATABRICKS_PORT, config1.port());
assertEquals(DatabricksDestinationConfig.DEFAULT_DATABASE_SCHEMA, config1.schema());

databricksConfig.put("databricks_port", "1000").put("schema", "testing_schema");
databricksConfig.put("databricks_port", "1000").put("schema", "testing_schema").put("enable_schema_evolution", true);
final DatabricksDestinationConfig config2 = DatabricksDestinationConfig.get(databricksConfig);
assertEquals("1000", config2.port());
assertEquals("testing_schema", config2.schema());

assertEquals(true, config2.enableSchemaEvolution());

assertEquals(DatabricksAzureBlobStorageConfigProvider.class, config2.storageConfig().getClass());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"databricks_personal_access_token": "test_token",
"database" : "test",
"schema": "test",
"enable_schema_evolution": "true",
"data_source": {
"data_source_type": "S3_STORAGE",
"s3_bucket_name": "required",
Expand Down
4 changes: 3 additions & 1 deletion docs/integrations/destinations/databricks.md
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,8 @@ Provide your Amazon S3 data:
| | Port | string | Optional. Default to "443". See [documentation](https://docs.databricks.com/integrations/bi/jdbc-odbc-bi.html#get-server-hostname-port-http-path-and-jdbc-url). |
| | Personal Access Token | string | Required. Example: `dapi0123456789abcdefghij0123456789AB`. See [documentation](https://docs.databricks.com/sql/user/security/personal-access-tokens.html). |
| General | Databricks catalog | string | Optional. The name of the catalog. If not specified otherwise, the "hive_metastore" will be used. |
| | Database schema | string | Optional. The default schema tables are written. If not specified otherwise, the "default" will be used. |
| | Database schema | string | Optional. The default schema tables are written. If not specified otherwise, the "default" will be used.
| | Schema evolution | boolean | Optional. The connector enables automatic schema evolution in the destination tables. |
| | Purge Staging Data | boolean | The connector creates staging files and tables on S3 or Azure. By default, they will be purged when the data sync is complete. Set it to `false` for debugging purposes. |
| Data Source - S3 | Bucket Name | string | Name of the bucket to sync data into. |
| | Bucket Path | string | Subdirectory under the above bucket to sync the data into. |
Expand Down Expand Up @@ -261,6 +262,7 @@ Suppose you are interested in learning more about the Databricks connector or de

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------|
| 1.1.0 | 2023-06-02 | [\#26942](https://github.com/airbytehq/airbyte/pull/26942) | Support schema evolution |
| 1.0.2 | 2023-04-20 | [\#25366](https://github.com/airbytehq/airbyte/pull/25366) | Fix default catalog to be `hive_metastore` |
| 1.0.1 | 2023-03-30 | [\#24657](https://github.com/airbytehq/airbyte/pull/24657) | Fix support for external tables on S3 |
| 1.0.0 | 2023-03-21 | [\#23965](https://github.com/airbytehq/airbyte/pull/23965) | Added: Managed table storage type, Databricks Catalog field |
Expand Down

0 comments on commit eb14b96

Please sign in to comment.