Skip to content

Commit

Permalink
Make schema field in source-snowflake mean a subset of the specified …
Browse files Browse the repository at this point in the history
…o… (#20465)

* Make schema field in source-postgres mean a subset of the specified of schema when during discover(). update UI

* Add missing file

* Fix failing acceptance test

* sanity

* update doc

* typo

* version bump and release note

* Fix failing test

* fix format
  • Loading branch information
rodireich authored Jan 12, 2023
1 parent 4778615 commit 94c84f7
Show file tree
Hide file tree
Showing 11 changed files with 107 additions and 52 deletions.
4 changes: 2 additions & 2 deletions airbyte-config/init/src/test/resources/connector_catalog.json
Original file line number Diff line number Diff line change
Expand Up @@ -12691,7 +12691,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Snowflake Source Spec",
"type": "object",
"required": ["host", "role", "warehouse", "database", "schema"],
"required": ["host", "role", "warehouse", "database"],
"properties": {
"credentials": {
"title": "Authorization Method",
Expand Down Expand Up @@ -12799,7 +12799,7 @@
"order": 4
},
"schema": {
"description": "The source Snowflake schema tables.",
"description": "The source Snowflake schema tables. Leave empty to access tables from multiple schemas.",
"examples": ["AIRBYTE_SCHEMA"],
"type": "string",
"title": "Schema",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ public void testDataTypes() throws Exception {
final List<AirbyteMessage> allMessages = runRead(catalog);
final UUID catalogId = runDiscover();
final Map<String, AirbyteStream> streams = getLastPersistedCatalog().getStreams().stream()
.collect(Collectors.toMap(AirbyteStream::getName, s -> s));
.collect(Collectors.toMap(
s -> "%s.%s".formatted(s.getNamespace(), s.getName()),
s -> s));
final List<AirbyteMessage> recordMessages = allMessages.stream().filter(m -> m.getType() == Type.RECORD).toList();
final Map<String, List<String>> expectedValues = new HashMap<>();
testDataHolders.forEach(testDataHolder -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-snowflake

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.27
LABEL io.airbyte.version=0.1.28
LABEL io.airbyte.name=airbyte/source-snowflake
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,8 +38,9 @@ public class SnowflakeDataSourceUtils {
public static final String AIRBYTE_OSS = "airbyte_oss";
public static final String AIRBYTE_CLOUD = "airbyte_cloud";
private static final String JDBC_CONNECTION_STRING =
"role=%s&warehouse=%s&database=%s&schema=%s&JDBC_QUERY_RESULT_FORMAT=%s&CLIENT_SESSION_KEEP_ALIVE=%s&application=%s";
"role=%s&warehouse=%s&database=%s&JDBC_QUERY_RESULT_FORMAT=%s&CLIENT_SESSION_KEEP_ALIVE=%s&application=%s";

private static final String JDBC_SCHEMA_PARAM = "&schema=%s&CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX=true";
private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeDataSourceUtils.class);
private static final int PAUSE_BETWEEN_TOKEN_REFRESH_MIN = 7; // snowflake access token's TTL is 10min and can't be modified
private static final String REFRESH_TOKEN_URL = "https://%s/oauth/token-request";
Expand Down Expand Up @@ -141,13 +143,16 @@ public static String buildJDBCUrl(final JsonNode config, final String airbyteEnv
config.get("role").asText(),
config.get("warehouse").asText(),
config.get(JdbcUtils.DATABASE_KEY).asText(),
config.get("schema").asText(),
// Needed for JDK17 - see
// https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow
"JSON",
true,
airbyteEnvironment));

if (config.get("schema") != null && StringUtils.isNotBlank(config.get("schema").asText())) {
jdbcUrl.append(JDBC_SCHEMA_PARAM.formatted(config.get("schema").asText()));
}

// https://docs.snowflake.com/en/user-guide/jdbc-configure.html#jdbc-driver-connection-string
if (config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) {
jdbcUrl.append("&").append(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Snowflake Source Spec",
"type": "object",
"required": ["host", "role", "warehouse", "database", "schema"],
"required": ["host", "role", "warehouse", "database"],
"properties": {
"credentials": {
"title": "Authorization Method",
Expand Down Expand Up @@ -110,7 +110,7 @@
"order": 4
},
"schema": {
"description": "The source Snowflake schema tables.",
"description": "The source Snowflake schema tables. Leave empty to access tables from multiple schemas.",
"examples": ["AIRBYTE_SCHEMA"],
"type": "string",
"title": "Schema",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static io.airbyte.integrations.source.snowflake.SnowflakeDataSourceUtils.AIRBYTE_OSS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -19,6 +20,7 @@
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils;
import io.airbyte.integrations.source.relationaldb.models.DbStreamState;
import io.airbyte.integrations.source.snowflake.SnowflakeSource;
import io.airbyte.protocol.models.Field;
Expand All @@ -37,6 +39,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -226,4 +229,47 @@ protected List<AirbyteMessage> getExpectedAirbyteMessagesSecondSync(final String
return expectedMessages;
}

/* Test that schema config key is making discover pull tables of this schema only */
@Test
void testDiscoverSchemaConfig() throws Exception {
// add table and data to a separate schema.
database.execute(connection -> {
connection.createStatement().execute(
String.format("CREATE TABLE %s(id VARCHAR(200) NOT NULL, name VARCHAR(200) NOT NULL)",
RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME)));
connection.createStatement()
.execute(String.format("INSERT INTO %s(id, name) VALUES ('1','picard')",
RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME)));
connection.createStatement()
.execute(String.format("INSERT INTO %s(id, name) VALUES ('2', 'crusher')",
RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME)));
connection.createStatement()
.execute(String.format("INSERT INTO %s(id, name) VALUES ('3', 'vash')",
RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME)));
connection.createStatement().execute(
String.format("CREATE TABLE %s(id VARCHAR(200) NOT NULL, name VARCHAR(200) NOT NULL)",
RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME, Strings.addRandomSuffix(TABLE_NAME, "_", 4))));
});

JsonNode confWithSchema = ((ObjectNode) config).put("schema", SCHEMA_NAME);
AirbyteCatalog actual = source.discover(confWithSchema);

assertFalse(actual.getStreams().isEmpty());

var streams = actual.getStreams().stream()
.filter(s -> !s.getNamespace().equals(SCHEMA_NAME))
.collect(Collectors.toList());

assertTrue(streams.isEmpty());

confWithSchema = ((ObjectNode) config).put("schema", SCHEMA_NAME2);
actual = source.discover(confWithSchema);
assertFalse(actual.getStreams().isEmpty());

streams = actual.getStreams().stream()
.filter(s -> !s.getNamespace().equals(SCHEMA_NAME2))
.collect(Collectors.toList());

assertTrue(streams.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withCursorField(Lists.newArrayList("ID"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s.%s", SCHEMA_NAME, STREAM_NAME1),
STREAM_NAME1, SCHEMA_NAME,
Field.of("ID", JsonSchemaType.NUMBER),
Field.of("NAME", JsonSchemaType.STRING))
.withSupportedSyncModes(
Expand All @@ -84,7 +84,7 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withSyncMode(SyncMode.FULL_REFRESH)
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withStream(CatalogHelpers.createAirbyteStream(
String.format("%s.%s", SCHEMA_NAME, STREAM_NAME2),
STREAM_NAME2, SCHEMA_NAME,
Field.of("ID", JsonSchemaType.NUMBER),
Field.of("NAME", JsonSchemaType.STRING))
.withSupportedSyncModes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,10 @@ protected DataSource createDataSource() {
final StringBuilder jdbcUrl = new StringBuilder(
String.format("jdbc:snowflake://%s/?", config.get(JdbcUtils.HOST_KEY).asText()));
jdbcUrl.append(String.format(
"role=%s&warehouse=%s&database=%s&schema=%s&JDBC_QUERY_RESULT_FORMAT=%s&CLIENT_SESSION_KEEP_ALIVE=%s",
"role=%s&warehouse=%s&database=%s&JDBC_QUERY_RESULT_FORMAT=%s&CLIENT_SESSION_KEEP_ALIVE=%s",
config.get("role").asText(),
config.get("warehouse").asText(),
config.get(JdbcUtils.DATABASE_KEY).asText(),
config.get(JdbcUtils.SCHEMA_KEY).asText(),
// Needed for JDK17 - see
// https://stackoverflow.com/questions/67409650/snowflake-jdbc-driver-internal-error-fail-to-retrieve-row-count-for-first-arrow
"JSON",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Snowflake Source Spec",
"type": "object",
"required": ["host", "role", "warehouse", "database", "schema"],
"required": ["host", "role", "warehouse", "database"],
"properties": {
"credentials": {
"title": "Authorization Method",
Expand Down Expand Up @@ -110,7 +110,7 @@
"order": 4
},
"schema": {
"description": "The source Snowflake schema tables.",
"description": "The source Snowflake schema tables. Leave empty to access tables from multiple schemas.",
"examples": ["AIRBYTE_SCHEMA"],
"type": "string",
"title": "Schema",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ class SnowflakeDataSourceUtilsTest {
}
""";
private final String expectedJdbcUrl =
"jdbc:snowflake://host/?role=role&warehouse=WAREHOUSE&database=DATABASE&schema=SOURCE_SCHEMA&JDBC_QUERY_RESULT_FORMAT=JSON&CLIENT_SESSION_KEEP_ALIVE=true&application=airbyte_oss";
"jdbc:snowflake://host/?role=role&warehouse=WAREHOUSE&database=DATABASE"
+ "&JDBC_QUERY_RESULT_FORMAT=JSON&CLIENT_SESSION_KEEP_ALIVE=true&application=airbyte_oss"
+ "&schema=SOURCE_SCHEMA&CLIENT_METADATA_REQUEST_USE_CONNECTION_CTX=true";

@Test
void testBuildJDBCUrl() {
Expand Down
Loading

0 comments on commit 94c84f7

Please sign in to comment.