Skip to content

Commit

Permalink
  • Loading branch information
gpodevijn committed Jun 8, 2023
1 parent 1eddb30 commit c933f9b
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_PORT_KEY;
import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_PURGE_STAGING_DATA_KEY;
import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_SCHEMA_KEY;
import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_ENABLE_SCHEMA_EVOLUTION_KEY;
import static io.airbyte.integrations.destination.databricks.utils.DatabricksConstants.DATABRICKS_SERVER_HOSTNAME_KEY;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -23,12 +24,14 @@ public record DatabricksDestinationConfig(String serverHostname,
String catalog,
String schema,
boolean isPurgeStagingData,
boolean enableSchemaEvolution,
DatabricksStorageConfigProvider storageConfig) {

static final String DEFAULT_DATABRICKS_PORT = "443";
static final String DEFAULT_DATABASE_SCHEMA = "default";
static final String DEFAULT_CATALOG = "hive_metastore";
static final boolean DEFAULT_PURGE_STAGING_DATA = true;
static final boolean DEFAULT_ENABLE_SCHEMA_EVOLUTION = false;

public static DatabricksDestinationConfig get(final JsonNode config) {
Preconditions.checkArgument(
Expand All @@ -42,6 +45,7 @@ public static DatabricksDestinationConfig get(final JsonNode config) {
config.get(DATABRICKS_PERSONAL_ACCESS_TOKEN_KEY).asText(),
config.has(DATABRICKS_CATALOG_KEY) ? config.get(DATABRICKS_CATALOG_KEY).asText() : DEFAULT_CATALOG,
config.has(DATABRICKS_SCHEMA_KEY) ? config.get(DATABRICKS_SCHEMA_KEY).asText() : DEFAULT_DATABASE_SCHEMA,
config.has(DATABRICKS_ENABLE_SCHEMA_EVOLUTION_KEY) ? config.get(DATABRICKS_ENABLE_SCHEMA_EVOLUTION_KEY).asBoolean() : DEFAULT_ENABLE_SCHEMA_EVOLUTION,
config.has(DATABRICKS_PURGE_STAGING_DATA_KEY) ? config.get(DATABRICKS_PURGE_STAGING_DATA_KEY).asBoolean() : DEFAULT_PURGE_STAGING_DATA,
DatabricksStorageConfigProvider.getDatabricksStorageConfig(config.get(DATABRICKS_DATA_SOURCE_KEY)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,14 @@ public String generateMergeStatement(final String destTableName) {
"COPY INTO %s.%s.%s " +
"FROM '%s' " +
"FILEFORMAT = PARQUET " +
"PATTERN = '%s'",
"PATTERN = '%s' " +
"COPY_OPTIONS ('mergeSchema' = '%s')",
catalogName,
schemaName,
destTableName,
getTmpTableLocation(),
parquetWriter.getOutputFilename());
parquetWriter.getOutputFilename(),
databricksConfig.enableSchemaEvolution());
LOGGER.info(copyData);
return copyData;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class DatabricksConstants {
public static final String DATABRICKS_PORT_KEY = "databricks_port";
public static final String DATABRICKS_CATALOG_KEY = "database";
public static final String DATABRICKS_SCHEMA_KEY = "schema";
public static final String DATABRICKS_ENABLE_SCHEMA_EVOLUTION_KEY = "enable_schema_evolution";
public static final String DATABRICKS_CATALOG_JDBC_KEY = "ConnCatalog";
public static final String DATABRICKS_SCHEMA_JDBC_KEY = "ConnSchema";
public static final String DATABRICKS_PURGE_STAGING_DATA_KEY = "purge_staging_data";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,19 @@
"default": "default",
"order": 7
},
"enable_schema_evolution": {
"title": "Support schema evolution for all streams.",
"type": "boolean",
"description": "Support schema evolution for all streams. If \"false\", the connector might fail when a stream's schema changes.",
"default": false,
"order": 8
},
"data_source": {
"title": "Data Source",
"type": "object",
"description": "Storage on which the delta lake is built.",
"default": "MANAGED_TABLES_STORAGE",
"order": 8,
"order": 9,
"oneOf": [
{
"title": "[Recommended] Managed tables",
Expand Down Expand Up @@ -236,7 +243,7 @@
"type": "boolean",
"description": "Default to 'true'. Switch it to 'false' for debugging purpose.",
"default": true,
"order": 9
"order": 10
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@ public void testConfigCreationFromJsonS3() {
assertEquals(DatabricksDestinationConfig.DEFAULT_DATABRICKS_PORT, config1.port());
assertEquals(DatabricksDestinationConfig.DEFAULT_DATABASE_SCHEMA, config1.schema());

databricksConfig.put("databricks_port", "1000").put("schema", "testing_schema");
databricksConfig.put("databricks_port", "1000").put("schema", "testing_schema").put("enable_schema_evolution", true);
final DatabricksDestinationConfig config2 = DatabricksDestinationConfig.get(databricksConfig);
assertEquals("1000", config2.port());
assertEquals("testing_schema", config2.schema());
assertEquals(true, config2.enableSchemaEvolution());

assertEquals(DatabricksS3StorageConfigProvider.class, config2.storageConfig().getClass());
}
Expand Down Expand Up @@ -76,11 +77,12 @@ public void testConfigCreationFromJsonAzure() {
assertEquals(DatabricksDestinationConfig.DEFAULT_DATABRICKS_PORT, config1.port());
assertEquals(DatabricksDestinationConfig.DEFAULT_DATABASE_SCHEMA, config1.schema());

databricksConfig.put("databricks_port", "1000").put("schema", "testing_schema");
databricksConfig.put("databricks_port", "1000").put("schema", "testing_schema").put("enable_schema_evolution", true);
final DatabricksDestinationConfig config2 = DatabricksDestinationConfig.get(databricksConfig);
assertEquals("1000", config2.port());
assertEquals("testing_schema", config2.schema());

assertEquals(true, config2.enableSchemaEvolution());

assertEquals(DatabricksAzureBlobStorageConfigProvider.class, config2.storageConfig().getClass());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"databricks_personal_access_token": "test_token",
"database" : "test",
"schema": "test",
"enable_schema_evolution": "true",
"data_source": {
"data_source_type": "S3_STORAGE",
"s3_bucket_name": "required",
Expand Down

0 comments on commit c933f9b

Please sign in to comment.