From f30869001a975784db9f57a81ea49b5cc8ef9e98 Mon Sep 17 00:00:00 2001 From: Charles Date: Mon, 27 Sep 2021 16:46:39 -0700 Subject: [PATCH] Exposing SSL-only version of Postgres Source (#6362) --- .../SpecModifyingSource.java | 52 +++++++ .../ClickHouseJdbcSourceAcceptanceTest.java | 8 +- .../CockroachDbJdbcSourceAcceptanceTest.java | 6 +- .../Db2JdbcSourceAcceptanceTest.java | 4 +- .../AbstractJdbcSourceAcceptanceTest.java | 8 +- .../jdbc/test/JdbcSourceAcceptanceTest.java | 88 +++++++----- .../mssql/MssqlJdbcSourceAcceptanceTest.java | 2 +- .../mysql/MySqlJdbcSourceAcceptanceTest.java | 4 +- .../OracleJdbcSourceAcceptanceTest.java | 29 ++-- .../.dockerignore | 3 + .../source-postgres-strict-encrypt/Dockerfile | 12 ++ .../acceptance-test-config.yml | 6 + .../build.gradle | 31 +++++ .../postgres/PostgresSourceStrictEncrypt.java | 38 +++++ ...gresSourceStrictEncryptAcceptanceTest.java | 131 ++++++++++++++++++ ...StrictEncryptJdbcSourceAcceptanceTest.java | 105 ++++++++++++++ .../src/test/resources/expected_spec.json | 122 ++++++++++++++++ .../source/postgres/PostgresSource.java | 17 ++- .../PostgresJdbcSourceAcceptanceTest.java | 2 +- .../RedshiftJdbcSourceAcceptanceTest.java | 2 +- ...ffoldJavaJdbcJdbcSourceAcceptanceTest.java | 2 +- .../SnowflakeJdbcSourceAcceptanceTest.java | 2 +- 22 files changed, 605 insertions(+), 69 deletions(-) create mode 100644 airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java create mode 100644 airbyte-integrations/connectors/source-postgres-strict-encrypt/.dockerignore create mode 100644 airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile create mode 100644 airbyte-integrations/connectors/source-postgres-strict-encrypt/acceptance-test-config.yml create mode 100644 airbyte-integrations/connectors/source-postgres-strict-encrypt/build.gradle create mode 100644 airbyte-integrations/connectors/source-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrictEncrypt.java create mode 100644 airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictEncryptAcceptanceTest.java create mode 100644 airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java create mode 100644 airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/resources/expected_spec.json diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java new file mode 100644 index 000000000000..afa0373865b0 --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java @@ -0,0 +1,52 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.base.spec_modification; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.integrations.base.Source; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConnectorSpecification; + +/** + * In some cases we want to prune or mutate the spec for an existing source. The common case is that + * we want to remove features that are not appropriate for some reason. e.g. In cloud, we do not + * want to allow users to send data unencrypted. + */ +public abstract class SpecModifyingSource implements Source { + + private final Source source; + + public SpecModifyingSource(final Source source) { + this.source = source; + } + + public abstract ConnectorSpecification modifySpec(ConnectorSpecification originalSpec) throws Exception; + + @Override + public ConnectorSpecification spec() throws Exception { + return modifySpec(source.spec()); + } + + @Override + public AirbyteConnectionStatus check(final JsonNode config) throws Exception { + return source.check(config); + } + + @Override + public AirbyteCatalog discover(final JsonNode config) throws Exception { + return source.discover(config); + } + + @Override + public AutoCloseableIterator read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state) + throws Exception { + return source.read(config, catalog, state); + } + +} diff --git a/airbyte-integrations/connectors/source-clickhouse/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/ClickHouseJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-clickhouse/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/ClickHouseJdbcSourceAcceptanceTest.java index 3d35b30a2864..c7388a29eca7 100644 --- a/airbyte-integrations/connectors/source-clickhouse/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/ClickHouseJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-clickhouse/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/ClickHouseJdbcSourceAcceptanceTest.java @@ -38,7 +38,7 @@ public String getDriverClass() { } @Override - public String createTableQuery(String tableName, String columnClause, String primaryKeyClause) { + public String createTableQuery(final String tableName, final String columnClause, final String primaryKeyClause) { // ClickHouse requires Engine to be mentioned as part of create table query. // Refer : https://clickhouse.tech/docs/en/engines/table-engines/ for more information return String.format("CREATE TABLE %s(%s) %s", @@ -56,12 +56,12 @@ public void tearDown() throws SQLException { } @Override - public String primaryKeyClause(List columns) { + public String primaryKeyClause(final List columns) { if (columns.isEmpty()) { return ""; } - StringBuilder clause = new StringBuilder(); + final StringBuilder clause = new StringBuilder(); clause.append("("); for (int i = 0; i < columns.size(); i++) { clause.append(columns.get(i)); @@ -91,7 +91,7 @@ public void setup() throws Exception { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new ClickHouseSource(); } diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/test/java/io/airbyte/integrations/source/cockroachdb/CockroachDbJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-cockroachdb/src/test/java/io/airbyte/integrations/source/cockroachdb/CockroachDbJdbcSourceAcceptanceTest.java index 632984d75169..66affd568aea 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/src/test/java/io/airbyte/integrations/source/cockroachdb/CockroachDbJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-cockroachdb/src/test/java/io/airbyte/integrations/source/cockroachdb/CockroachDbJdbcSourceAcceptanceTest.java @@ -95,7 +95,7 @@ public boolean supportsSchemas() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new CockroachDbSource(); } @@ -360,7 +360,7 @@ void testReadMultipleTables() throws Exception { @Test void testReadMultipleTablesIncrementally() throws Exception { final String tableName2 = TABLE_NAME + 2; - String streamName2 = streamName + 2; + final String streamName2 = streamName + 2; database.execute(ctx -> { ctx.createStatement().execute( createTableQuery(getFullyQualifiedTableName(tableName2), "id INTEGER, name VARCHAR(200)", @@ -493,7 +493,7 @@ void testDiscoverWithMultipleSchemas() throws Exception { .withSourceDefinedPrimaryKey(List.of(List.of(COL_ROW_ID)))); // sort streams by name so that we are comparing lists with the same order. - Comparator schemaTableCompare = Comparator + final Comparator schemaTableCompare = Comparator .comparing(stream -> stream.getNamespace() + "." + stream.getName()); expected.getStreams().sort(schemaTableCompare); actual.getStreams().sort(schemaTableCompare); diff --git a/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java index 88ebf4052614..e0b9203b2b2b 100644 --- a/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java @@ -72,7 +72,7 @@ public void setup() throws Exception { public void clean() throws Exception { // In Db2 before dropping a schema, all objects that were in that schema must be dropped or moved to // another schema. - for (String tableName : TEST_TABLES) { + for (final String tableName : TEST_TABLES) { final String dropTableQuery = String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME, tableName); super.database.execute(connection -> connection.createStatement().execute(dropTableQuery)); @@ -116,7 +116,7 @@ public String getDriverClass() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new Db2Source(); } diff --git a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java index 135abc5e52aa..65c4a4000c94 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java @@ -65,7 +65,7 @@ public boolean supportsSchemas() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new PostgresTestSource(); } @@ -95,8 +95,8 @@ public PostgresTestSource() { } @Override - public JsonNode toDatabaseConfig(JsonNode config) { - ImmutableMap.Builder configBuilder = ImmutableMap.builder() + public JsonNode toDatabaseConfig(final JsonNode config) { + final ImmutableMap.Builder configBuilder = ImmutableMap.builder() .put("username", config.get("username").asText()) .put("jdbc_url", String.format("jdbc:postgresql://%s:%s/%s", config.get("host").asText(), @@ -115,7 +115,7 @@ public Set getExcludedInternalNameSpaces() { return Set.of("information_schema", "pg_catalog", "pg_internal", "catalog_history"); } - public static void main(String[] args) throws Exception { + public static void main(final String[] args) throws Exception { final Source source = new PostgresTestSource(); LOGGER.info("starting source: {}", PostgresTestSource.class); new IntegrationRunner(source).run(args); diff --git a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index 7b7b14a8f072..dc1578bfbf6d 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -22,6 +22,7 @@ import io.airbyte.commons.util.MoreIterators; import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.SourceJdbcUtils; import io.airbyte.integrations.source.relationaldb.models.DbState; @@ -50,6 +51,7 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; @@ -95,7 +97,7 @@ public abstract class JdbcSourceAcceptanceTest { public JsonNode config; public JdbcDatabase database; - public AbstractJdbcSource source; + public Source source; public static String streamName; /** @@ -126,21 +128,43 @@ public abstract class JdbcSourceAcceptanceTest { /** * An instance of the source that should be tests. * + * @return abstract jdbc source + */ + public abstract AbstractJdbcSource getJdbcSource(); + + /** + * In some cases the Source that is being tested may be an AbstractJdbcSource, but because it is + * decorated, Java cannot recognize it as such. In these cases, as a workaround a user can choose to + * override getJdbcSource and have it return null. Then they can override this method with the + * decorated source AND override getToDatabaseConfigFunction with the appropriate + * toDatabaseConfigFunction that is hidden behind the decorator. + * * @return source */ - public abstract AbstractJdbcSource getSource(); + public Source getSource() { + return getJdbcSource(); + } - protected String createTableQuery(String tableName, String columnClause, String primaryKeyClause) { + /** + * See getSource() for when to override this method. + * + * @return a function that maps a source's config to a jdbc config. + */ + public Function getToDatabaseConfigFunction() { + return getJdbcSource()::toDatabaseConfig; + } + + protected String createTableQuery(final String tableName, final String columnClause, final String primaryKeyClause) { return String.format("CREATE TABLE %s(%s %s %s)", tableName, columnClause, primaryKeyClause.equals("") ? "" : ",", primaryKeyClause); } - protected String primaryKeyClause(List columns) { + protected String primaryKeyClause(final List columns) { if (columns.isEmpty()) { return ""; } - StringBuilder clause = new StringBuilder(); + final StringBuilder clause = new StringBuilder(); clause.append("PRIMARY KEY ("); for (int i = 0; i < columns.size(); i++) { clause.append(columns.get(i)); @@ -155,7 +179,7 @@ protected String primaryKeyClause(List columns) { public void setup() throws Exception { source = getSource(); config = getConfig(); - final JsonNode jdbcConfig = source.toDatabaseConfig(config); + final JsonNode jdbcConfig = getToDatabaseConfigFunction().apply(config); streamName = TABLE_NAME; @@ -253,7 +277,7 @@ void testCheckFailure() throws Exception { @Test void testDiscover() throws Exception { final AirbyteCatalog actual = filterOutOtherSchemas(source.discover(config)); - AirbyteCatalog expected = getCatalog(getDefaultNamespace()); + final AirbyteCatalog expected = getCatalog(getDefaultNamespace()); assertEquals(expected.getStreams().size(), actual.getStreams().size()); actual.getStreams().forEach(actualStream -> { final Optional expectedStream = @@ -265,7 +289,7 @@ void testDiscover() throws Exception { }); } - protected AirbyteCatalog filterOutOtherSchemas(AirbyteCatalog catalog) { + protected AirbyteCatalog filterOutOtherSchemas(final AirbyteCatalog catalog) { if (supportsSchemas()) { final AirbyteCatalog filteredCatalog = Jsons.clone(catalog); filteredCatalog.setStreams(filteredCatalog.getStreams() @@ -312,7 +336,7 @@ void testDiscoverWithMultipleSchemas() throws Exception { Field.of(COL_NAME, JsonSchemaPrimitive.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))); // sort streams by name so that we are comparing lists with the same order. - Comparator schemaTableCompare = Comparator.comparing(stream -> stream.getNamespace() + "." + stream.getName()); + final Comparator schemaTableCompare = Comparator.comparing(stream -> stream.getNamespace() + "." + stream.getName()); expected.getStreams().sort(schemaTableCompare); actual.getStreams().sort(schemaTableCompare); assertEquals(expected, filterOutOtherSchemas(actual)); @@ -325,7 +349,7 @@ void testReadSuccess() throws Exception { source.read(config, getConfiguredCatalogWithOneStream(getDefaultNamespace()), null)); setEmittedAtToNull(actualMessages); - List expectedMessages = getTestMessages(); + final List expectedMessages = getTestMessages(); assertThat(expectedMessages, Matchers.containsInAnyOrder(actualMessages.toArray())); assertThat(actualMessages, Matchers.containsInAnyOrder(expectedMessages.toArray())); } @@ -596,7 +620,7 @@ void testReadOneTableIncrementallyTwice() throws Exception { @Test void testReadMultipleTablesIncrementally() throws Exception { final String tableName2 = TABLE_NAME + 2; - String streamName2 = streamName + 2; + final String streamName2 = streamName + 2; database.execute(ctx -> { ctx.createStatement().execute( createTableQuery(getFullyQualifiedTableName(tableName2), "id INTEGER, name VARCHAR(200)", "")); @@ -692,21 +716,21 @@ void testReadMultipleTablesIncrementally() throws Exception { // when initial and final cursor fields are the same. private void incrementalCursorCheck( - String cursorField, - String initialCursorValue, - String endCursorValue, - List expectedRecordMessages) + final String cursorField, + final String initialCursorValue, + final String endCursorValue, + final List expectedRecordMessages) throws Exception { incrementalCursorCheck(cursorField, cursorField, initialCursorValue, endCursorValue, expectedRecordMessages); } private void incrementalCursorCheck( - String initialCursorField, - String cursorField, - String initialCursorValue, - String endCursorValue, - List expectedRecordMessages) + final String initialCursorField, + final String cursorField, + final String initialCursorValue, + final String endCursorValue, + final List expectedRecordMessages) throws Exception { incrementalCursorCheck(initialCursorField, cursorField, initialCursorValue, endCursorValue, expectedRecordMessages, @@ -714,12 +738,12 @@ private void incrementalCursorCheck( } private void incrementalCursorCheck( - String initialCursorField, - String cursorField, - String initialCursorValue, - String endCursorValue, - List expectedRecordMessages, - ConfiguredAirbyteStream airbyteStream) + final String initialCursorField, + final String cursorField, + final String initialCursorValue, + final String endCursorValue, + final List expectedRecordMessages, + final ConfiguredAirbyteStream airbyteStream) throws Exception { airbyteStream.setSyncMode(SyncMode.INCREMENTAL); airbyteStream.setCursorField(Lists.newArrayList(cursorField)); @@ -856,13 +880,13 @@ protected ConfiguredAirbyteStream createTableWithSpaces() throws SQLException { Field.of(COL_LAST_NAME_WITH_SPACE, JsonSchemaPrimitive.STRING)); } - public String getFullyQualifiedTableName(String tableName) { + public String getFullyQualifiedTableName(final String tableName) { return SourceJdbcUtils.getFullyQualifiedTableName(getDefaultSchemaName(), tableName); } public void createSchemas() throws SQLException { if (supportsSchemas()) { - for (String schemaName : TEST_SCHEMAS) { + for (final String schemaName : TEST_SCHEMAS) { final String createSchemaQuery = String.format("CREATE SCHEMA %s;", schemaName); database.execute(connection -> connection.createStatement().execute(createSchemaQuery)); } @@ -871,7 +895,7 @@ public void createSchemas() throws SQLException { public void dropSchemas() throws SQLException { if (supportsSchemas()) { - for (String schemaName : TEST_SCHEMAS) { + for (final String schemaName : TEST_SCHEMAS) { final String dropSchemaQuery = String .format(DROP_SCHEMA_QUERY, schemaName); database.execute(connection -> connection.createStatement().execute(dropSchemaQuery)); @@ -879,7 +903,7 @@ public void dropSchemas() throws SQLException { } } - private JsonNode convertIdBasedOnDatabase(int idValue) { + private JsonNode convertIdBasedOnDatabase(final int idValue) { if (getDriverClass().toLowerCase().contains("oracle")) { return Jsons.jsonNode(BigDecimal.valueOf(idValue)); } else if (getDriverClass().toLowerCase().contains("snowflake")) { @@ -902,8 +926,8 @@ protected String getDefaultNamespace() { } } - protected static void setEmittedAtToNull(Iterable messages) { - for (AirbyteMessage actualMessage : messages) { + protected static void setEmittedAtToNull(final Iterable messages) { + for (final AirbyteMessage actualMessage : messages) { if (actualMessage.getRecord() != null) { actualMessage.getRecord().setEmittedAt(null); } diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java index e597f9226de0..ff8525b58ac1 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java @@ -74,7 +74,7 @@ public JsonNode getConfig() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new MssqlSource(); } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java index e472e35e6c26..0b024a5ac795 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java @@ -44,7 +44,7 @@ static void init() throws SQLException { .withEnv("MYSQL_ROOT_HOST", "%") .withEnv("MYSQL_ROOT_PASSWORD", TEST_PASSWORD); container.start(); - Connection connection = DriverManager.getConnection(container.getJdbcUrl(), "root", TEST_PASSWORD); + final Connection connection = DriverManager.getConnection(container.getJdbcUrl(), "root", TEST_PASSWORD); connection.createStatement().execute("GRANT ALL PRIVILEGES ON *.* TO '" + TEST_USER + "'@'%';\n"); } @@ -95,7 +95,7 @@ public boolean supportsSchemas() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new MySqlSource(); } diff --git a/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java index e21c58f9eed9..4e2e425fbd9c 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java @@ -97,15 +97,16 @@ public void tearDownOracle() throws Exception { } void cleanUpTables() throws SQLException { - Connection conn = DriverManager.getConnection( + final Connection conn = DriverManager.getConnection( ORACLE_DB.getJdbcUrl(), ORACLE_DB.getUsername(), ORACLE_DB.getPassword()); - for (String schemaName : TEST_SCHEMAS) { - ResultSet resultSet = conn.createStatement().executeQuery(String.format("SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = '%s'", schemaName)); + for (final String schemaName : TEST_SCHEMAS) { + final ResultSet resultSet = + conn.createStatement().executeQuery(String.format("SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = '%s'", schemaName)); while (resultSet.next()) { - String tableName = resultSet.getString("TABLE_NAME"); - String tableNameProcessed = tableName.contains(" ") ? SourceJdbcUtils + final String tableName = resultSet.getString("TABLE_NAME"); + final String tableNameProcessed = tableName.contains(" ") ? SourceJdbcUtils .enquoteIdentifier(conn, tableName) : tableName; conn.createStatement().executeQuery(String.format("DROP TABLE %s.%s", schemaName, tableNameProcessed)); } @@ -121,7 +122,7 @@ public boolean supportsSchemas() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new OracleSource(); } @@ -145,7 +146,7 @@ public void createSchemas() throws SQLException { // In Oracle, `CREATE USER` creates a schema. // See https://www.oratable.com/oracle-user-schema-difference/ if (supportsSchemas()) { - for (String schemaName : TEST_SCHEMAS) { + for (final String schemaName : TEST_SCHEMAS) { executeOracleStatement( String.format( "CREATE USER %s IDENTIFIED BY password DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS", @@ -154,21 +155,21 @@ public void createSchemas() throws SQLException { } } - public void executeOracleStatement(String query) throws SQLException { - Connection conn = DriverManager.getConnection( + public void executeOracleStatement(final String query) throws SQLException { + final Connection conn = DriverManager.getConnection( ORACLE_DB.getJdbcUrl(), ORACLE_DB.getUsername(), ORACLE_DB.getPassword()); - try (Statement stmt = conn.createStatement()) { + try (final Statement stmt = conn.createStatement()) { stmt.execute(query); - } catch (SQLException e) { + } catch (final SQLException e) { logSQLException(e); } conn.close(); } - public static void logSQLException(SQLException ex) { - for (Throwable e : ex) { + public static void logSQLException(final SQLException ex) { + for (final Throwable e : ex) { if (e instanceof SQLException) { if (ignoreSQLException(((SQLException) e).getSQLState()) == false) { e.printStackTrace(System.err); @@ -185,7 +186,7 @@ public static void logSQLException(SQLException ex) { } } - public static boolean ignoreSQLException(String sqlState) { + public static boolean ignoreSQLException(final String sqlState) { // This only ignore cases where other databases won't raise errors // Drop table, schema etc or try to recreate a table; if (sqlState == null) { diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/.dockerignore b/airbyte-integrations/connectors/source-postgres-strict-encrypt/.dockerignore new file mode 100644 index 000000000000..65c7d0ad3e73 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile new file mode 100644 index 000000000000..ae28a1f58fdb --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -0,0 +1,12 @@ +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte + +ENV APPLICATION source-postgres-strict-encrypt + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/acceptance-test-config.yml b/airbyte-integrations/connectors/source-postgres-strict-encrypt/acceptance-test-config.yml new file mode 100644 index 000000000000..a03ef7fed68d --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/acceptance-test-config.yml @@ -0,0 +1,6 @@ +# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) +# for more information about how to configure these tests +connector_image: airbyte/source-postgres-strict-encrypt:dev +tests: + spec: + - spec_path: "src/test/resources/expected_spec.json" diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/build.gradle b/airbyte-integrations/connectors/source-postgres-strict-encrypt/build.gradle new file mode 100644 index 000000000000..3261abeac55d --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/build.gradle @@ -0,0 +1,31 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.source.postgres.PostgresSourceStrictEncrypt' +} + +dependencies { + implementation project(':airbyte-db:lib') + + implementation project(':airbyte-integrations:bases:base-java') + implementation project(':airbyte-integrations:connectors:source-postgres') + implementation project(':airbyte-protocol:models') + // todo (cgardens): why are these needed? + implementation project(':airbyte-integrations:connectors:source-jdbc') + implementation project(':airbyte-integrations:connectors:source-relational-db') + + + testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) + testImplementation project(':airbyte-test-utils') + + testImplementation 'org.testcontainers:postgresql:1.15.1' + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test') + + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrictEncrypt.java b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrictEncrypt.java new file mode 100644 index 000000000000..acb7e1bc3e7b --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrictEncrypt.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.postgres; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.base.spec_modification.SpecModifyingSource; +import io.airbyte.protocol.models.ConnectorSpecification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PostgresSourceStrictEncrypt extends SpecModifyingSource implements Source { + + private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSourceStrictEncrypt.class); + + PostgresSourceStrictEncrypt() { + super(PostgresSource.sshWrappedSource()); + } + + @Override + public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) { + final ConnectorSpecification spec = Jsons.clone(originalSpec); + ((ObjectNode) spec.getConnectionSpecification().get("properties")).remove("ssl"); + return spec; + } + + public static void main(final String[] args) throws Exception { + final Source source = new PostgresSourceStrictEncrypt(); + LOGGER.info("starting source: {}", PostgresSourceStrictEncrypt.class); + new IntegrationRunner(source).run(args); + LOGGER.info("completed source: {}", PostgresSourceStrictEncrypt.class); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictEncryptAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictEncryptAcceptanceTest.java new file mode 100644 index 000000000000..8729d20ab487 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictEncryptAcceptanceTest.java @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.io.airbyte.integration_tests.sources; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.db.Database; +import io.airbyte.db.Databases; +import io.airbyte.integrations.base.ssh.SshHelpers; +import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; +import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.DestinationSyncMode; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaPrimitive; +import io.airbyte.protocol.models.SyncMode; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import org.jooq.SQLDialect; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.DockerImageName; + +public class PostgresSourceStrictEncryptAcceptanceTest extends SourceAcceptanceTest { + + private static final String STREAM_NAME = "public.id_and_name"; + private static final String STREAM_NAME2 = "public.starships"; + + private PostgreSQLContainer container; + private JsonNode config; + + @Override + protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { + container = new PostgreSQLContainer<>(DockerImageName.parse("marcosmarxm/postgres-ssl:dev").asCompatibleSubstituteFor("postgres")) + .withCommand("postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key"); + container.start(); + final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() + .put("method", "Standard") + .build()); + config = Jsons.jsonNode(ImmutableMap.builder() + .put("host", container.getHost()) + .put("port", container.getFirstMappedPort()) + .put("database", container.getDatabaseName()) + .put("username", container.getUsername()) + .put("password", container.getPassword()) + .put("replication_method", replicationMethod) + .build()); + + final Database database = Databases.createDatabase( + config.get("username").asText(), + config.get("password").asText(), + String.format("jdbc:postgresql://%s:%s/%s", + config.get("host").asText(), + config.get("port").asText(), + config.get("database").asText()), + "org.postgresql.Driver", + SQLDialect.POSTGRES); + + database.query(ctx -> { + ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); + ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); + ctx.fetch("CREATE TABLE starships(id INTEGER, name VARCHAR(200));"); + ctx.fetch("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); + return null; + }); + + database.close(); + } + + @Override + protected void tearDown(final TestDestinationEnv testEnv) { + container.close(); + } + + @Override + protected String getImageName() { + return "airbyte/source-postgres-strict-encrypt:dev"; + } + + @Override + protected ConnectorSpecification getSpec() throws Exception { + return SshHelpers.injectSshIntoSpec(Jsons.deserialize(MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class)); + } + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + protected ConfiguredAirbyteCatalog getConfiguredCatalog() { + return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(Lists.newArrayList("id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + STREAM_NAME, + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))), + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(Lists.newArrayList("id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + STREAM_NAME2, + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))); + } + + @Override + protected List getRegexTests() { + return Collections.emptyList(); + } + + @Override + protected JsonNode getState() { + return Jsons.jsonNode(new HashMap<>()); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java new file mode 100644 index 000000000000..c9e431613f00 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.integrations.source.postgres; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.string.Strings; +import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.base.ssh.SshHelpers; +import io.airbyte.integrations.source.jdbc.JdbcSource; +import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.test.utils.PostgreSQLContainerHelper; +import java.util.function.Function; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.MountableFile; + +class PostgresStrictEncryptJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { + + private static PostgreSQLContainer PSQL_DB; + + private JsonNode config; + + @BeforeAll + static void init() { + PSQL_DB = new PostgreSQLContainer<>("postgres:13-alpine"); + PSQL_DB.start(); + } + + @BeforeEach + public void setup() throws Exception { + final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put("host", PSQL_DB.getHost()) + .put("port", PSQL_DB.getFirstMappedPort()) + .put("database", dbName) + .put("username", PSQL_DB.getUsername()) + .put("password", PSQL_DB.getPassword()) + .put("ssl", false) + .build()); + + final String initScriptName = "init_" + dbName.concat(".sql"); + final String tmpFilePath = IOs.writeFileToRandomTmpDir(initScriptName, "CREATE DATABASE " + dbName + ";"); + PostgreSQLContainerHelper.runSqlScript(MountableFile.forHostPath(tmpFilePath), PSQL_DB); + + super.setup(); + } + + @Override + public boolean supportsSchemas() { + return true; + } + + @Override + public JdbcSource getJdbcSource() { + return null; + } + + @Override + public Source getSource() { + return new PostgresSourceStrictEncrypt(); + } + + @Override + public Function getToDatabaseConfigFunction() { + return new PostgresSource()::toDatabaseConfig; + } + + @Override + public JsonNode getConfig() { + return config; + } + + @Override + public String getDriverClass() { + return PostgresSource.DRIVER_CLASS; + } + + @AfterAll + static void cleanUp() { + PSQL_DB.close(); + } + + @Test + void testSpec() throws Exception { + final ConnectorSpecification actual = source.spec(); + final ConnectorSpecification expected = + SshHelpers.injectSshIntoSpec(Jsons.deserialize(MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class)); + + assertEquals(expected, actual); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/resources/expected_spec.json new file mode 100644 index 000000000000..84a4ddda9ff6 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/resources/expected_spec.json @@ -0,0 +1,122 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/sources/postgres", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Postgres Source Spec", + "type": "object", + "required": [ + "host", + "port", + "database", + "username" + ], + "additionalProperties": false, + "properties": { + "host": { + "title": "Host", + "description": "Hostname of the database.", + "type": "string", + "order": 0 + }, + "port": { + "title": "Port", + "description": "Port of the database.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 5432, + "examples": [ + "5432" + ], + "order": 1 + }, + "database": { + "title": "DB Name", + "description": "Name of the database.", + "type": "string", + "order": 2 + }, + "username": { + "title": "User", + "description": "Username to use to access the database.", + "type": "string", + "order": 3 + }, + "password": { + "title": "Password", + "description": "Password associated with the username.", + "type": "string", + "airbyte_secret": true, + "order": 4 + }, + "replication_method": { + "type": "object", + "title": "Replication Method", + "description": "Replication method to use for extracting data from the database.", + "order": 6, + "oneOf": [ + { + "title": "Standard", + "additionalProperties": false, + "description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.", + "required": [ + "method" + ], + "properties": { + "method": { + "type": "string", + "const": "Standard", + "enum": [ + "Standard" + ], + "default": "Standard", + "order": 0 + } + } + }, + { + "title": "Logical Replication (CDC)", + "additionalProperties": false, + "description": "Logical replication uses the Postgres write-ahead log (WAL) to detect inserts, updates, and deletes. This needs to be configured on the source database itself. Only available on Postgres 10 and above. Read the Postgres Source docs for more information.", + "required": [ + "method", + "replication_slot", + "publication" + ], + "properties": { + "method": { + "type": "string", + "const": "CDC", + "enum": [ + "CDC" + ], + "default": "CDC", + "order": 0 + }, + "plugin": { + "type": "string", + "description": "A logical decoding plug-in installed on the PostgreSQL server. `pgoutput` plug-in is used by default.\nIf replication table contains a lot of big jsonb values it is recommended to use `wal2json` plug-in. For more information about `wal2json` plug-in read Postgres Source docs.", + "enum": [ + "pgoutput", + "wal2json" + ], + "default": "pgoutput", + "order": 1 + }, + "replication_slot": { + "type": "string", + "description": "A plug-in logical replication slot.", + "order": 2 + }, + "publication": { + "type": "string", + "description": "A Postgres publication used for consuming changes.", + "order": 3 + } + } + } + ] + } + } + } +} diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 31e8ff8aded3..a688c472bc8c 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -52,14 +52,24 @@ public class PostgresSource extends AbstractJdbcSource implements Source { private final JdbcSourceOperations sourceOperations; - public PostgresSource() { + public static Source sshWrappedSource() { + return new SshWrappedSource(new PostgresSource(), List.of("host"), List.of("port")); + } + + PostgresSource() { super(DRIVER_CLASS, new PostgresJdbcStreamingQueryConfiguration()); this.sourceOperations = JdbcUtils.getDefaultSourceOperations(); } @Override public JsonNode toDatabaseConfig(final JsonNode config) { + return toDatabaseConfigStatic(config); + } + // todo (cgardens) - restructure AbstractJdbcSource so to take this function in the constructor. the + // current structure forces us to declarehave a bunch of pure function methods as instance members + // when they could be static. + public JsonNode toDatabaseConfigStatic(final JsonNode config) { final List additionalParameters = new ArrayList<>(); final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:postgresql://%s:%s/%s?", @@ -67,7 +77,8 @@ public JsonNode toDatabaseConfig(final JsonNode config) { config.get("port").asText(), config.get("database").asText())); - if (config.has("ssl") && config.get("ssl").asBoolean()) { + // assume ssl if not explicitly mentioned. + if (!config.has("ssl") || config.get("ssl").asBoolean()) { additionalParameters.add("ssl=true"); additionalParameters.add("sslmode=require"); } @@ -247,7 +258,7 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) { } public static void main(final String[] args) throws Exception { - final Source source = new SshWrappedSource(new PostgresSource(), List.of("host"), List.of("port")); + final Source source = PostgresSource.sshWrappedSource(); LOGGER.info("starting source: {}", PostgresSource.class); new IntegrationRunner(source).run(args); LOGGER.info("completed source: {}", PostgresSource.class); diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index 1bcde1dd3646..01042bdd3f9a 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -61,7 +61,7 @@ public boolean supportsSchemas() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new PostgresSource(); } diff --git a/airbyte-integrations/connectors/source-redshift/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/RedshiftJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-redshift/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/RedshiftJdbcSourceAcceptanceTest.java index 8c8abd95e08f..69120fef8a15 100644 --- a/airbyte-integrations/connectors/source-redshift/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/RedshiftJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-redshift/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/RedshiftJdbcSourceAcceptanceTest.java @@ -38,7 +38,7 @@ public boolean supportsSchemas() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new RedshiftSource(); } diff --git a/airbyte-integrations/connectors/source-scaffold-java-jdbc/src/test/java/io/airbyte/integrations/source/scaffold_java_jdbc/ScaffoldJavaJdbcJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-scaffold-java-jdbc/src/test/java/io/airbyte/integrations/source/scaffold_java_jdbc/ScaffoldJavaJdbcJdbcSourceAcceptanceTest.java index e023ddf53c13..ed1f59cac3b9 100644 --- a/airbyte-integrations/connectors/source-scaffold-java-jdbc/src/test/java/io/airbyte/integrations/source/scaffold_java_jdbc/ScaffoldJavaJdbcJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-scaffold-java-jdbc/src/test/java/io/airbyte/integrations/source/scaffold_java_jdbc/ScaffoldJavaJdbcJdbcSourceAcceptanceTest.java @@ -40,7 +40,7 @@ public void tearDown() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new ScaffoldJavaJdbcSource(); } diff --git a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java index b61e3aa286a7..5748066ee2cf 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java @@ -73,7 +73,7 @@ public String getDriverClass() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new SnowflakeSource(); }