Skip to content

Commit

Permalink
Destination Snowflake: set jdbc application env variable depends on e…
Browse files Browse the repository at this point in the history
…nv - airbyte_oss or airbyte_cloud (#19302)

* [19250] Destination Snowflake: set jdbc application env variable depends on env - airbyte_oss or airbyte_cloud
  • Loading branch information
etsybaev authored and akashkulk committed Nov 17, 2022
1 parent 82f0d57 commit b891f17
Show file tree
Hide file tree
Showing 17 changed files with 78 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@
- name: Snowflake
destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.4.39
dockerImageTag: 0.4.40
documentationUrl: https://docs.airbyte.com/integrations/destinations/snowflake
icon: snowflake.svg
normalizationRepository: airbyte/normalization-snowflake
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5408,7 +5408,7 @@
supported_destination_sync_modes:
- "overwrite"
- "append"
- dockerImage: "airbyte/destination-snowflake:0.4.39"
- dockerImage: "airbyte/destination-snowflake:0.4.40"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/destinations/snowflake"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,5 @@ RUN tar xf ${APPLICATION}.tar --strip-components=1

ENV ENABLE_SENTRY true

LABEL io.airbyte.version=0.4.39
LABEL io.airbyte.version=0.4.40
LABEL io.airbyte.name=airbyte/destination-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ plugins {
}

application {
mainClass = 'io.airbyte.integrations.destination.snowflake.SnowflakeDestination'
mainClass = 'io.airbyte.integrations.destination.snowflake.SnowflakeDestinationRunner'
// enable when profiling
applicationDefaultJvmArgs = [
'-XX:+ExitOnOutOfMemoryError',
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake;

public class OssCloudEnvVarConsts {

public static final String AIRBYTE_OSS = "airbyte_oss";
public static final String AIRBYTE_CLOUD = "airbyte_cloud";

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@

public class SnowflakeCopyAzureBlobStorageDestination extends CopyDestination {

private final String airbyteEnvironment;

public SnowflakeCopyAzureBlobStorageDestination(final String airbyteEnvironment) {
this.airbyteEnvironment = airbyteEnvironment;
}

@Override
public AirbyteMessageConsumer getConsumer(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
Expand Down Expand Up @@ -52,7 +58,7 @@ public ExtendedNameTransformer getNameTransformer() {

@Override
public DataSource getDataSource(final JsonNode config) {
return SnowflakeDatabase.createDataSource(config);
return SnowflakeDatabase.createDataSource(config, airbyteEnvironment);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public class SnowflakeDatabase {
private static final String CONNECTION_STRING_IDENTIFIER_KEY = "application";
private static final String CONNECTION_STRING_IDENTIFIER_VAL = "Airbyte_Connector";

public static HikariDataSource createDataSource(final JsonNode config) {
public static HikariDataSource createDataSource(final JsonNode config, final String airbyteEnvironment) {
final HikariDataSource dataSource = new HikariDataSource();

final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:snowflake://%s/?",
Expand Down Expand Up @@ -129,7 +129,7 @@ public static HikariDataSource createDataSource(final JsonNode config) {

// https://docs.snowflake.com/en/user-guide/jdbc-parameters.html#application
// identify airbyte traffic to snowflake to enable partnership & optimization opportunities
properties.put("application", "airbyte");
properties.put("application", airbyteEnvironment); // see envs in OssCloudEnvVarConsts class
// Needed for JDK17 - see
// https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow
properties.put("JDBC_QUERY_RESULT_FORMAT", "JSON");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@

package io.airbyte.integrations.destination.snowflake;

import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.jdbc.copy.SwitchingDestination;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand All @@ -21,14 +19,9 @@ enum DestinationType {
INTERNAL_STAGING
}

public SnowflakeDestination() {
super(DestinationType.class, SnowflakeDestinationResolver::getTypeFromConfig, SnowflakeDestinationResolver.getTypeToDestination());
}

public static void main(final String[] args) throws Exception {
final Destination destination = new SnowflakeDestination();
new IntegrationRunner(destination).run(args);
SCHEDULED_EXECUTOR_SERVICE.shutdownNow();
public SnowflakeDestination(final String airbyteEnvironment) {
super(DestinationType.class, SnowflakeDestinationResolver::getTypeFromConfig,
SnowflakeDestinationResolver.getTypeToDestination(airbyteEnvironment));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,12 @@ public static boolean isAzureBlobCopy(final JsonNode config) {
&& config.get("loading_method").has("azure_blob_storage_account_name");
}

public static Map<DestinationType, Destination> getTypeToDestination() {
final SnowflakeS3StagingDestination s3StagingDestination = new SnowflakeS3StagingDestination();
final SnowflakeGcsStagingDestination gcsStagingDestination = new SnowflakeGcsStagingDestination();
final SnowflakeInternalStagingDestination internalStagingDestination = new SnowflakeInternalStagingDestination();
final SnowflakeCopyAzureBlobStorageDestination azureBlobStorageDestination = new SnowflakeCopyAzureBlobStorageDestination();
public static Map<DestinationType, Destination> getTypeToDestination(
final String airbyteEnvironment) {
final SnowflakeS3StagingDestination s3StagingDestination = new SnowflakeS3StagingDestination(airbyteEnvironment);
final SnowflakeGcsStagingDestination gcsStagingDestination = new SnowflakeGcsStagingDestination(airbyteEnvironment);
final SnowflakeInternalStagingDestination internalStagingDestination = new SnowflakeInternalStagingDestination(airbyteEnvironment);
final SnowflakeCopyAzureBlobStorageDestination azureBlobStorageDestination = new SnowflakeCopyAzureBlobStorageDestination(airbyteEnvironment);

return ImmutableMap.of(
DestinationType.COPY_S3, s3StagingDestination,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.snowflake;

import static io.airbyte.integrations.destination.snowflake.SnowflakeDestination.SCHEDULED_EXECUTOR_SERVICE;

import io.airbyte.integrations.base.adaptive.AdaptiveDestinationRunner;

public class SnowflakeDestinationRunner {

public static void main(final String[] args) throws Exception {
AdaptiveDestinationRunner.baseOnEnv()
.withOssDestination(() -> new SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS))
.withCloudDestination(() -> new SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_CLOUD))
.run(args);
SCHEDULED_EXECUTOR_SERVICE.shutdownNow();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@
public class SnowflakeGcsStagingDestination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeGcsStagingDestination.class);
private String airbyteEnvironment;

public SnowflakeGcsStagingDestination() {
this(new SnowflakeSQLNameTransformer());
public SnowflakeGcsStagingDestination(final String airbyteEnvironment) {
this(new SnowflakeSQLNameTransformer(), airbyteEnvironment);
}

public SnowflakeGcsStagingDestination(final SnowflakeSQLNameTransformer nameTransformer) {
public SnowflakeGcsStagingDestination(final SnowflakeSQLNameTransformer nameTransformer, final String airbyteEnvironment) {
super("", nameTransformer, new SnowflakeSqlOperations());
this.airbyteEnvironment = airbyteEnvironment;
}

@Override
Expand Down Expand Up @@ -101,7 +103,7 @@ public static Storage getStorageClient(final GcsConfig gcsConfig) throws IOExcep

@Override
protected DataSource getDataSource(final JsonNode config) {
return SnowflakeDatabase.createDataSource(config);
return SnowflakeDatabase.createDataSource(config, airbyteEnvironment);
}

@Override
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
public class SnowflakeInternalStagingDestination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeInternalStagingDestination.class);
private String airbyteEnvironment;

public SnowflakeInternalStagingDestination() {
this(new SnowflakeSQLNameTransformer());
public SnowflakeInternalStagingDestination(final String airbyteEnvironment) {
this(new SnowflakeSQLNameTransformer(), airbyteEnvironment);
}

public SnowflakeInternalStagingDestination(final NamingConventionTransformer nameTransformer) {
public SnowflakeInternalStagingDestination(final NamingConventionTransformer nameTransformer, final String airbyteEnvironment) {
super("", nameTransformer, new SnowflakeInternalStagingSqlOperations(nameTransformer));
this.airbyteEnvironment = airbyteEnvironment;
}

@Override
Expand Down Expand Up @@ -79,7 +81,7 @@ private static void attemptStageOperations(final String outputSchema,

@Override
protected DataSource getDataSource(final JsonNode config) {
return SnowflakeDatabase.createDataSource(config);
return SnowflakeDatabase.createDataSource(config, airbyteEnvironment);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
public class SnowflakeS3StagingDestination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeS3StagingDestination.class);
private String airbyteEnvironment;

public SnowflakeS3StagingDestination() {
this(new SnowflakeSQLNameTransformer());
public SnowflakeS3StagingDestination(final String airbyteEnvironment) {
this(new SnowflakeSQLNameTransformer(), airbyteEnvironment);
}

public SnowflakeS3StagingDestination(final SnowflakeSQLNameTransformer nameTransformer) {
public SnowflakeS3StagingDestination(final SnowflakeSQLNameTransformer nameTransformer, final String airbyteEnvironment) {
super("", nameTransformer, new SnowflakeSqlOperations());
this.airbyteEnvironment = airbyteEnvironment;
}

@Override
Expand Down Expand Up @@ -93,7 +95,7 @@ private static void attemptStageOperations(final String outputSchema,

@Override
protected DataSource getDataSource(final JsonNode config) {
return SnowflakeDatabase.createDataSource(config);
return SnowflakeDatabase.createDataSource(config, airbyteEnvironment);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,15 @@ void testCheckFailsWithInvalidPermissions() throws Exception {
// this connector should be updated with multiple credentials, each with a clear purpose (valid,
// invalid: insufficient permissions, invalid: wrong password, etc..)
final JsonNode credentialsJsonString = Jsons.deserialize(Files.readString(Paths.get("secrets/config.json")));
final AirbyteConnectionStatus check = new SnowflakeDestination().check(credentialsJsonString);
final AirbyteConnectionStatus check = new SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS).check(credentialsJsonString);
assertEquals(AirbyteConnectionStatus.Status.FAILED, check.getStatus());
}

@Test
public void testInvalidSchemaName() throws Exception {
final JsonNode config = getConfig();
final String schema = config.get("schema").asText();
final DataSource dataSource = SnowflakeDatabase.createDataSource(config);
final DataSource dataSource = SnowflakeDatabase.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS);
try {
final JdbcDatabase database = SnowflakeDatabase.getDatabase(dataSource);
assertDoesNotThrow(() -> syncWithNamingResolver(database, schema));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ protected void setup(final TestDestinationEnv testEnv) throws Exception {
this.config = Jsons.clone(getStaticConfig());
((ObjectNode) config).put("schema", schemaName);

dataSource = SnowflakeDatabase.createDataSource(config);
dataSource = SnowflakeDatabase.createDataSource(config, OssCloudEnvVarConsts.AIRBYTE_OSS);
database = SnowflakeDatabase.getDatabase(dataSource);
database.execute(createSchemaQuery);
}
Expand Down Expand Up @@ -223,7 +223,7 @@ public void testBackwardCompatibilityAfterAddingOauth() {
@Test
void testCheckWithKeyPairAuth() throws Exception {
final JsonNode credentialsJsonString = Jsons.deserialize(IOs.readFile(Path.of("secrets/config_key_pair.json")));
final AirbyteConnectionStatus check = new SnowflakeDestination().check(credentialsJsonString);
final AirbyteConnectionStatus check = new SnowflakeDestination(OssCloudEnvVarConsts.AIRBYTE_OSS).check(credentialsJsonString);
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, check.getStatus());
}

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/destinations/snowflake.md
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ Now that you have set up the Snowflake destination connector, check out the foll
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.4.40 | 2022-11-11 | [\#19302](https://github.com/airbytehq/airbyte/pull/19302) | Set jdbc application env variable depends on env - airbyte_oss or airbyte_cloud |
| 0.4.39 | 2022-11-09 | [\#18970](https://github.com/airbytehq/airbyte/pull/18970) | Updated "check" connection method to handle more errors |
| 0.4.38 | 2022-09-26 | [\#17115](https://github.com/airbytehq/airbyte/pull/17115) | Added connection string identifier |
| 0.4.37 | 2022-09-21 | [\#16839](https://github.com/airbytehq/airbyte/pull/16839) | Update JDBC driver for Snowflake to 3.13.19 |
Expand Down

0 comments on commit b891f17

Please sign in to comment.