From 9aaae36a98c728481b2c2349feb7e75bb25d0dd3 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 21 Sep 2021 17:16:17 -0700 Subject: [PATCH 01/12] first pass --- .gitignore | 3 + .../source-postgres-strict/.dockerignore | 3 + .../source-postgres-strict/Dockerfile | 12 ++ .../acceptance-test-config.yml | 6 + .../source-postgres-strict/build.gradle | 75 +++++++++ .../source/postgres/PostgresSourceStrict.java | 108 +++++++++++++ .../sources/PostgresSourceAcceptanceTest.java | 153 ++++++++++++++++++ .../PostgresJdbcSourceAcceptanceTest.java | 122 ++++++++++++++ 8 files changed, 482 insertions(+) create mode 100644 airbyte-integrations/connectors/source-postgres-strict/.dockerignore create mode 100644 airbyte-integrations/connectors/source-postgres-strict/Dockerfile create mode 100644 airbyte-integrations/connectors/source-postgres-strict/acceptance-test-config.yml create mode 100644 airbyte-integrations/connectors/source-postgres-strict/build.gradle create mode 100644 airbyte-integrations/connectors/source-postgres-strict/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrict.java create mode 100644 airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java create mode 100644 airbyte-integrations/connectors/source-postgres-strict/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java diff --git a/.gitignore b/.gitignore index f02f5b77b68b..62ec9f54b7aa 100644 --- a/.gitignore +++ b/.gitignore @@ -60,3 +60,6 @@ resources/examples/airflow/logs/* # Cloud Demo !airbyte-webapp/src/packages/cloud/data + +# Strict Specs +airbyte-integrations/**/strict_spec.json \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-postgres-strict/.dockerignore b/airbyte-integrations/connectors/source-postgres-strict/.dockerignore new file mode 100644 index 000000000000..65c7d0ad3e73 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres-strict/.dockerignore @@ -0,0 +1,3 @@ +* +!Dockerfile +!build diff --git a/airbyte-integrations/connectors/source-postgres-strict/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict/Dockerfile new file mode 100644 index 000000000000..9b758f3d3b11 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres-strict/Dockerfile @@ -0,0 +1,12 @@ +FROM airbyte/integration-base-java:dev + +WORKDIR /airbyte + +ENV APPLICATION source-postgres-strict + +COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar + +RUN tar xf ${APPLICATION}.tar --strip-components=1 + +LABEL io.airbyte.version=0.3.11 +LABEL io.airbyte.name=airbyte/source-postgres-strict diff --git a/airbyte-integrations/connectors/source-postgres-strict/acceptance-test-config.yml b/airbyte-integrations/connectors/source-postgres-strict/acceptance-test-config.yml new file mode 100644 index 000000000000..94a547fa654f --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres-strict/acceptance-test-config.yml @@ -0,0 +1,6 @@ +# See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) +# for more information about how to configure these tests +connector_image: airbyte/source-postgres:dev +tests: + spec: + - spec_path: "src/main/resources/spec.json" diff --git a/airbyte-integrations/connectors/source-postgres-strict/build.gradle b/airbyte-integrations/connectors/source-postgres-strict/build.gradle new file mode 100644 index 000000000000..789a463b237d --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres-strict/build.gradle @@ -0,0 +1,75 @@ +plugins { + id 'application' + id 'airbyte-docker' + id 'airbyte-integration-test-java' +} + +application { + mainClass = 'io.airbyte.integrations.source.postgres.PostgresSourceStrict' +} + +// options +// 1. reuse same java application, just have postgres source switch on spec based on a flag passed to the java application. this is passed via a separate docker container +// 2. build the java application separately with a different spec. +// 3. move all of this into a separate submodule. + +// if we do 1. then just need: +// 1. separate docker file with different flag +// 2. alter postgres source to switch on spec +// do we need to run integration tests on new image? +dependencies { + // need any of these? + implementation project(':airbyte-db:lib') + implementation project(':airbyte-integrations:bases:base-java') + implementation project(':airbyte-integrations:bases:debezium') + implementation project(':airbyte-protocol:models') + implementation project(':airbyte-integrations:connectors:source-jdbc') + implementation project(':airbyte-integrations:connectors:source-relational-db') + // + implementation project(':airbyte-integrations:connectors:source-postgres') + + implementation 'org.apache.commons:commons-lang3:3.11' + implementation 'org.postgresql:postgresql:42.2.18' + + testImplementation testFixtures(project(':airbyte-integrations:bases:debezium')) + testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) + testImplementation project(":airbyte-json-validation") + testImplementation project(':airbyte-test-utils') + + testImplementation 'org.testcontainers:postgresql:1.15.1' + + integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test') + + implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) + integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) +} + +task mutateSpec(type: Exec) { + def originalSpec = new File(rootProject.findProject(":airbyte-integrations:connectors:source-postgres").projectDir, "/src/main/resources/spec.json") +// def strictSpec = new File(project.buildDir, "resources/strict_spec.json") + def strictSpec = new File(project.projectDir, "src/main/resources/strict_spec.json") + commandLine "jq" + args 'del(.connectionSpecification.properties.ssl)', originalSpec + inputs.file originalSpec + outputs.file strictSpec + doFirst { + // uncomment if you want to see STDOUT in CLI. + // standardOutput = new org.apache.tools.ant.util.TeeOutputStream(new FileOutputStream(strictSpec), System.out); + standardOutput = new FileOutputStream(strictSpec); + } +} +processResources.dependsOn(mutateSpec) + +//task strictAirbyteDocker(type: Task) { +// dependsOn project.mutateSpec +// airbyteDocker { +// dockerfileName="strict.Dockerfile" +// } +//} +// +//task airbyteDockerStrict(type: Task) { +// dependsOn project.strictAirbyteDocker +// dependsOn project.mutateSpec +//} + +//assemble.dependsOn(project.airbyteDockerStrict) \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-postgres-strict/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrict.java b/airbyte-integrations/connectors/source-postgres-strict/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrict.java new file mode 100644 index 000000000000..ee22a8d45bc1 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres-strict/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrict.java @@ -0,0 +1,108 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.postgres; + +import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT; +import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT; +import static java.util.stream.Collectors.toList; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.functional.CheckedConsumer; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.db.jdbc.JdbcSourceOperations; +import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration; +import io.airbyte.integrations.base.IntegrationRunner; +import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.base.ssh.SshWrappedSource; +import io.airbyte.integrations.debezium.AirbyteDebeziumHandler; +import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; +import io.airbyte.integrations.source.relationaldb.StateManager; +import io.airbyte.integrations.source.relationaldb.TableInfo; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStream; +import io.airbyte.protocol.models.CommonField; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.SyncMode; +import java.sql.JDBCType; +import java.sql.PreparedStatement; +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class PostgresSourceStrict implements Source { + private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSourceStrict.class); + + private final Source source; + + PostgresSourceStrict() { + this(PostgresSource.sshWrappedSource()); + } + PostgresSourceStrict(final Source source) { + this.source = source; + } + + @Override + public ConnectorSpecification spec() throws Exception { + final String resourceString = MoreResources.readResource("strict_spec.json"); + return Jsons.deserialize(resourceString, ConnectorSpecification.class); + } + + @Override + public AirbyteConnectionStatus check(final JsonNode config) throws Exception { + return source.check(config); + } + + @Override + public AirbyteCatalog discover(final JsonNode config) throws Exception { + return source.discover(config); + } + + @Override + public AutoCloseableIterator read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state) + throws Exception { + return source.read(config, catalog, state); + } + + public static void main(final String[] args) throws Exception { + final Source source = new SshWrappedSource(new PostgresSourceStrict(), List.of("host"), List.of("port")); + LOGGER.info("starting source: {}", PostgresSourceStrict.class); + new IntegrationRunner(source).run(args); + LOGGER.info("completed source: {}", PostgresSourceStrict.class); + } +} diff --git a/airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java new file mode 100644 index 000000000000..b483bc3bf4be --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java @@ -0,0 +1,153 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.io.airbyte.integration_tests.sources; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.db.Database; +import io.airbyte.db.Databases; +import io.airbyte.integrations.base.ssh.SshHelpers; +import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest; +import io.airbyte.integrations.standardtest.source.TestDestinationEnv; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.protocol.models.DestinationSyncMode; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.JsonSchemaPrimitive; +import io.airbyte.protocol.models.SyncMode; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import org.jooq.SQLDialect; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.DockerImageName; + +public class PostgresSourceAcceptanceTest extends SourceAcceptanceTest { + + private static final String STREAM_NAME = "public.id_and_name"; + private static final String STREAM_NAME2 = "public.starships"; + + private PostgreSQLContainer container; + private JsonNode config; + + @Override + protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { + container = new PostgreSQLContainer<>(DockerImageName.parse("marcosmarxm/postgres-ssl:dev").asCompatibleSubstituteFor("postgres")) + .withCommand("postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key"); +// container = new PostgreSQLContainer<>("postgres:13-alpine"); + container.start(); + final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() + .put("method", "Standard") + .build()); + config = Jsons.jsonNode(ImmutableMap.builder() + .put("host", container.getHost()) + .put("port", container.getFirstMappedPort()) + .put("database", container.getDatabaseName()) + .put("username", container.getUsername()) + .put("password", container.getPassword()) +// .put("ssl", false) + .put("replication_method", replicationMethod) + .build()); + + final Database database = Databases.createDatabase( + config.get("username").asText(), + config.get("password").asText(), + String.format("jdbc:postgresql://%s:%s/%s", + config.get("host").asText(), + config.get("port").asText(), + config.get("database").asText()), + "org.postgresql.Driver", + SQLDialect.POSTGRES); + + database.query(ctx -> { + ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); + ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');"); + ctx.fetch("CREATE TABLE starships(id INTEGER, name VARCHAR(200));"); + ctx.fetch("INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');"); + return null; + }); + + database.close(); + } + + @Override + protected void tearDown(final TestDestinationEnv testEnv) { + container.close(); + } + + @Override + protected String getImageName() { + return "airbyte/source-postgres-strict:dev"; + } + + @Override + protected ConnectorSpecification getSpec() throws Exception { + return SshHelpers.injectSshIntoSpec(Jsons.deserialize(MoreResources.readResource("strict_spec.json"), ConnectorSpecification.class)); + } + + @Override + protected JsonNode getConfig() { + return config; + } + + @Override + protected ConfiguredAirbyteCatalog getConfiguredCatalog() { + return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList( + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(Lists.newArrayList("id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + STREAM_NAME, + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))), + new ConfiguredAirbyteStream() + .withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(Lists.newArrayList("id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withStream(CatalogHelpers.createAirbyteStream( + STREAM_NAME2, + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))))); + } + + @Override + protected List getRegexTests() { + return Collections.emptyList(); + } + + @Override + protected JsonNode getState() { + return Jsons.jsonNode(new HashMap<>()); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres-strict/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres-strict/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java new file mode 100644 index 000000000000..cd82e995def7 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres-strict/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -0,0 +1,122 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.source.postgres; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.ImmutableMap; +import io.airbyte.commons.io.IOs; +import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; +import io.airbyte.commons.string.Strings; +import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; +import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; +import io.airbyte.protocol.models.ConnectorSpecification; +import io.airbyte.test.utils.PostgreSQLContainerHelper; +import java.util.function.Function; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.PostgreSQLContainer; +import org.testcontainers.utility.MountableFile; + +class PostgresJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { + + private static PostgreSQLContainer PSQL_DB; + + private JsonNode config; + + @BeforeAll + static void init() { + PSQL_DB = new PostgreSQLContainer<>("postgres:13-alpine"); + PSQL_DB.start(); + } + + @BeforeEach + public void setup() throws Exception { + final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase(); + + config = Jsons.jsonNode(ImmutableMap.builder() + .put("host", PSQL_DB.getHost()) + .put("port", PSQL_DB.getFirstMappedPort()) + .put("database", dbName) + .put("username", PSQL_DB.getUsername()) + .put("password", PSQL_DB.getPassword()) + .put("ssl", false) + .build()); + + final String initScriptName = "init_" + dbName.concat(".sql"); + final String tmpFilePath = IOs.writeFileToRandomTmpDir(initScriptName, "CREATE DATABASE " + dbName + ";"); + PostgreSQLContainerHelper.runSqlScript(MountableFile.forHostPath(tmpFilePath), PSQL_DB); + + super.setup(); + } + + @Override + public boolean supportsSchemas() { + return true; + } + + @Override + public AbstractJdbcSource getSource() { +// return new PostgresSourceStrict(); + return null; + } + + @Override + public ImmutablePair> toDatabaseConfigOverride() { + return ImmutablePair.of(new PostgresSourceStrict(), new PostgresSource()::toDatabaseConfig); + } + + + @Override + public JsonNode getConfig() { + return config; + } + + @Override + public String getDriverClass() { + return PostgresSource.DRIVER_CLASS; + } + + @AfterAll + static void cleanUp() { + PSQL_DB.close(); + } + + @Test + void testSpec() throws Exception { + final ConnectorSpecification actual = source.spec(); + final ConnectorSpecification expected = Jsons.deserialize(MoreResources.readResource("strict_spec.json"), ConnectorSpecification.class); + + assertEquals(expected, actual); + } + +} From 9f352a46878cccb6ef9906562d298b42c61b5e5b Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 21 Sep 2021 18:12:24 -0700 Subject: [PATCH 02/12] checkpoint for feedback --- .gitignore | 3 - .../SpecModifyingSource.java | 67 ++++++++++ .../source-postgres-strict/build.gradle | 53 +------- .../source/postgres/PostgresSourceStrict.java | 71 ++-------- .../src/main/resources/expected_spec.json | 122 ++++++++++++++++++ .../sources/PostgresSourceAcceptanceTest.java | 6 +- ...stgresStrictJdbcSourceAcceptanceTest.java} | 10 +- .../source/postgres/PostgresSource.java | 11 +- 8 files changed, 221 insertions(+), 122 deletions(-) create mode 100644 airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java create mode 100644 airbyte-integrations/connectors/source-postgres-strict/src/main/resources/expected_spec.json rename airbyte-integrations/connectors/source-postgres-strict/src/test/java/io/airbyte/integrations/source/postgres/{PostgresJdbcSourceAcceptanceTest.java => PostgresStrictJdbcSourceAcceptanceTest.java} (91%) diff --git a/.gitignore b/.gitignore index 62ec9f54b7aa..f02f5b77b68b 100644 --- a/.gitignore +++ b/.gitignore @@ -60,6 +60,3 @@ resources/examples/airflow/logs/* # Cloud Demo !airbyte-webapp/src/packages/cloud/data - -# Strict Specs -airbyte-integrations/**/strict_spec.json \ No newline at end of file diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java new file mode 100644 index 000000000000..d51e5fab5c89 --- /dev/null +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java @@ -0,0 +1,67 @@ +/* + * MIT License + * + * Copyright (c) 2020 Airbyte + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package io.airbyte.integrations.base.spec_modification; + +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.util.AutoCloseableIterator; +import io.airbyte.integrations.base.Source; +import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteConnectionStatus; +import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConnectorSpecification; + +public abstract class SpecModifyingSource implements Source { + + private final Source source; + + public SpecModifyingSource(final Source source) { + this.source = source; + } + + public abstract ConnectorSpecification modifySpec(ConnectorSpecification originalSpec) throws Exception; + + @Override + public ConnectorSpecification spec() throws Exception { + return modifySpec(source.spec()); + } + + @Override + public AirbyteConnectionStatus check(final JsonNode config) throws Exception { + return source.check(config); + } + + @Override + public AirbyteCatalog discover(final JsonNode config) throws Exception { + return source.discover(config); + } + + @Override + public AutoCloseableIterator read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state) + throws Exception { + return source.read(config, catalog, state); + } + +} diff --git a/airbyte-integrations/connectors/source-postgres-strict/build.gradle b/airbyte-integrations/connectors/source-postgres-strict/build.gradle index 789a463b237d..ecf0a3355134 100644 --- a/airbyte-integrations/connectors/source-postgres-strict/build.gradle +++ b/airbyte-integrations/connectors/source-postgres-strict/build.gradle @@ -8,32 +8,19 @@ application { mainClass = 'io.airbyte.integrations.source.postgres.PostgresSourceStrict' } -// options -// 1. reuse same java application, just have postgres source switch on spec based on a flag passed to the java application. this is passed via a separate docker container -// 2. build the java application separately with a different spec. -// 3. move all of this into a separate submodule. - -// if we do 1. then just need: -// 1. separate docker file with different flag -// 2. alter postgres source to switch on spec -// do we need to run integration tests on new image? dependencies { - // need any of these? implementation project(':airbyte-db:lib') + + // need any of these? implementation project(':airbyte-integrations:bases:base-java') - implementation project(':airbyte-integrations:bases:debezium') + implementation project(':airbyte-integrations:connectors:source-postgres') implementation project(':airbyte-protocol:models') + // todo (cgardens): why are these needed? implementation project(':airbyte-integrations:connectors:source-jdbc') implementation project(':airbyte-integrations:connectors:source-relational-db') - // - implementation project(':airbyte-integrations:connectors:source-postgres') - implementation 'org.apache.commons:commons-lang3:3.11' - implementation 'org.postgresql:postgresql:42.2.18' - testImplementation testFixtures(project(':airbyte-integrations:bases:debezium')) testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc')) - testImplementation project(":airbyte-json-validation") testImplementation project(':airbyte-test-utils') testImplementation 'org.testcontainers:postgresql:1.15.1' @@ -42,34 +29,4 @@ dependencies { implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs) -} - -task mutateSpec(type: Exec) { - def originalSpec = new File(rootProject.findProject(":airbyte-integrations:connectors:source-postgres").projectDir, "/src/main/resources/spec.json") -// def strictSpec = new File(project.buildDir, "resources/strict_spec.json") - def strictSpec = new File(project.projectDir, "src/main/resources/strict_spec.json") - commandLine "jq" - args 'del(.connectionSpecification.properties.ssl)', originalSpec - inputs.file originalSpec - outputs.file strictSpec - doFirst { - // uncomment if you want to see STDOUT in CLI. - // standardOutput = new org.apache.tools.ant.util.TeeOutputStream(new FileOutputStream(strictSpec), System.out); - standardOutput = new FileOutputStream(strictSpec); - } -} -processResources.dependsOn(mutateSpec) - -//task strictAirbyteDocker(type: Task) { -// dependsOn project.mutateSpec -// airbyteDocker { -// dockerfileName="strict.Dockerfile" -// } -//} -// -//task airbyteDockerStrict(type: Task) { -// dependsOn project.strictAirbyteDocker -// dependsOn project.mutateSpec -//} - -//assemble.dependsOn(project.airbyteDockerStrict) \ No newline at end of file +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-postgres-strict/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrict.java b/airbyte-integrations/connectors/source-postgres-strict/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrict.java index ee22a8d45bc1..992724175506 100644 --- a/airbyte-integrations/connectors/source-postgres-strict/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrict.java +++ b/airbyte-integrations/connectors/source-postgres-strict/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrict.java @@ -24,85 +24,36 @@ package io.airbyte.integrations.source.postgres; -import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT; -import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT; -import static java.util.stream.Collectors.toList; - -import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; -import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; -import io.airbyte.commons.util.AutoCloseableIterator; -import io.airbyte.db.jdbc.JdbcDatabase; -import io.airbyte.db.jdbc.JdbcSourceOperations; -import io.airbyte.db.jdbc.JdbcUtils; -import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; -import io.airbyte.integrations.base.ssh.SshWrappedSource; -import io.airbyte.integrations.debezium.AirbyteDebeziumHandler; -import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; -import io.airbyte.integrations.source.relationaldb.StateManager; -import io.airbyte.integrations.source.relationaldb.TableInfo; -import io.airbyte.protocol.models.AirbyteCatalog; -import io.airbyte.protocol.models.AirbyteConnectionStatus; -import io.airbyte.protocol.models.AirbyteMessage; -import io.airbyte.protocol.models.AirbyteStream; -import io.airbyte.protocol.models.CommonField; -import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.integrations.base.spec_modification.SpecModifyingSource; import io.airbyte.protocol.models.ConnectorSpecification; -import io.airbyte.protocol.models.SyncMode; -import java.sql.JDBCType; -import java.sql.PreparedStatement; -import java.time.Instant; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PostgresSourceStrict implements Source { - private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSourceStrict.class); +public class PostgresSourceStrict extends SpecModifyingSource implements Source { - private final Source source; + private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSourceStrict.class); PostgresSourceStrict() { - this(PostgresSource.sshWrappedSource()); - } - PostgresSourceStrict(final Source source) { - this.source = source; - } - - @Override - public ConnectorSpecification spec() throws Exception { - final String resourceString = MoreResources.readResource("strict_spec.json"); - return Jsons.deserialize(resourceString, ConnectorSpecification.class); + super(PostgresSource.sshWrappedSource()); } @Override - public AirbyteConnectionStatus check(final JsonNode config) throws Exception { - return source.check(config); - } - - @Override - public AirbyteCatalog discover(final JsonNode config) throws Exception { - return source.discover(config); - } - - @Override - public AutoCloseableIterator read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state) - throws Exception { - return source.read(config, catalog, state); + public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) { + final ConnectorSpecification spec = Jsons.clone(originalSpec); + ((ObjectNode) spec.getConnectionSpecification().get("properties")).remove("ssl"); + System.out.println("spec = " + spec); + return spec; } public static void main(final String[] args) throws Exception { - final Source source = new SshWrappedSource(new PostgresSourceStrict(), List.of("host"), List.of("port")); + final Source source = new PostgresSourceStrict(); LOGGER.info("starting source: {}", PostgresSourceStrict.class); new IntegrationRunner(source).run(args); LOGGER.info("completed source: {}", PostgresSourceStrict.class); } + } diff --git a/airbyte-integrations/connectors/source-postgres-strict/src/main/resources/expected_spec.json b/airbyte-integrations/connectors/source-postgres-strict/src/main/resources/expected_spec.json new file mode 100644 index 000000000000..84a4ddda9ff6 --- /dev/null +++ b/airbyte-integrations/connectors/source-postgres-strict/src/main/resources/expected_spec.json @@ -0,0 +1,122 @@ +{ + "documentationUrl": "https://docs.airbyte.io/integrations/sources/postgres", + "connectionSpecification": { + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Postgres Source Spec", + "type": "object", + "required": [ + "host", + "port", + "database", + "username" + ], + "additionalProperties": false, + "properties": { + "host": { + "title": "Host", + "description": "Hostname of the database.", + "type": "string", + "order": 0 + }, + "port": { + "title": "Port", + "description": "Port of the database.", + "type": "integer", + "minimum": 0, + "maximum": 65536, + "default": 5432, + "examples": [ + "5432" + ], + "order": 1 + }, + "database": { + "title": "DB Name", + "description": "Name of the database.", + "type": "string", + "order": 2 + }, + "username": { + "title": "User", + "description": "Username to use to access the database.", + "type": "string", + "order": 3 + }, + "password": { + "title": "Password", + "description": "Password associated with the username.", + "type": "string", + "airbyte_secret": true, + "order": 4 + }, + "replication_method": { + "type": "object", + "title": "Replication Method", + "description": "Replication method to use for extracting data from the database.", + "order": 6, + "oneOf": [ + { + "title": "Standard", + "additionalProperties": false, + "description": "Standard replication requires no setup on the DB side but will not be able to represent deletions incrementally.", + "required": [ + "method" + ], + "properties": { + "method": { + "type": "string", + "const": "Standard", + "enum": [ + "Standard" + ], + "default": "Standard", + "order": 0 + } + } + }, + { + "title": "Logical Replication (CDC)", + "additionalProperties": false, + "description": "Logical replication uses the Postgres write-ahead log (WAL) to detect inserts, updates, and deletes. This needs to be configured on the source database itself. Only available on Postgres 10 and above. Read the Postgres Source docs for more information.", + "required": [ + "method", + "replication_slot", + "publication" + ], + "properties": { + "method": { + "type": "string", + "const": "CDC", + "enum": [ + "CDC" + ], + "default": "CDC", + "order": 0 + }, + "plugin": { + "type": "string", + "description": "A logical decoding plug-in installed on the PostgreSQL server. `pgoutput` plug-in is used by default.\nIf replication table contains a lot of big jsonb values it is recommended to use `wal2json` plug-in. For more information about `wal2json` plug-in read Postgres Source docs.", + "enum": [ + "pgoutput", + "wal2json" + ], + "default": "pgoutput", + "order": 1 + }, + "replication_slot": { + "type": "string", + "description": "A plug-in logical replication slot.", + "order": 2 + }, + "publication": { + "type": "string", + "description": "A Postgres publication used for consuming changes.", + "order": 3 + } + } + } + ] + } + } + } +} diff --git a/airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java index b483bc3bf4be..c2e70c8e8157 100644 --- a/airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java @@ -61,7 +61,7 @@ public class PostgresSourceAcceptanceTest extends SourceAcceptanceTest { protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { container = new PostgreSQLContainer<>(DockerImageName.parse("marcosmarxm/postgres-ssl:dev").asCompatibleSubstituteFor("postgres")) .withCommand("postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key"); -// container = new PostgreSQLContainer<>("postgres:13-alpine"); + // container = new PostgreSQLContainer<>("postgres:13-alpine"); container.start(); final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() .put("method", "Standard") @@ -72,7 +72,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc .put("database", container.getDatabaseName()) .put("username", container.getUsername()) .put("password", container.getPassword()) -// .put("ssl", false) + // .put("ssl", false) .put("replication_method", replicationMethod) .build()); @@ -109,7 +109,7 @@ protected String getImageName() { @Override protected ConnectorSpecification getSpec() throws Exception { - return SshHelpers.injectSshIntoSpec(Jsons.deserialize(MoreResources.readResource("strict_spec.json"), ConnectorSpecification.class)); + return SshHelpers.injectSshIntoSpec(Jsons.deserialize(MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class)); } @Override diff --git a/airbyte-integrations/connectors/source-postgres-strict/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres-strict/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictJdbcSourceAcceptanceTest.java similarity index 91% rename from airbyte-integrations/connectors/source-postgres-strict/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java rename to airbyte-integrations/connectors/source-postgres-strict/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictJdbcSourceAcceptanceTest.java index cd82e995def7..efb0ae3ca3c3 100644 --- a/airbyte-integrations/connectors/source-postgres-strict/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres-strict/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictJdbcSourceAcceptanceTest.java @@ -33,6 +33,7 @@ import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.string.Strings; import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.base.ssh.SshHelpers; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.protocol.models.ConnectorSpecification; @@ -42,12 +43,11 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.MountableFile; -class PostgresJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { +class PostgresStrictJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { private static PostgreSQLContainer PSQL_DB; @@ -86,7 +86,7 @@ public boolean supportsSchemas() { @Override public AbstractJdbcSource getSource() { -// return new PostgresSourceStrict(); + // return new PostgresSourceStrict(); return null; } @@ -95,7 +95,6 @@ public ImmutablePair> toDatabaseConfigOverr return ImmutablePair.of(new PostgresSourceStrict(), new PostgresSource()::toDatabaseConfig); } - @Override public JsonNode getConfig() { return config; @@ -114,7 +113,8 @@ static void cleanUp() { @Test void testSpec() throws Exception { final ConnectorSpecification actual = source.spec(); - final ConnectorSpecification expected = Jsons.deserialize(MoreResources.readResource("strict_spec.json"), ConnectorSpecification.class); + final ConnectorSpecification expected = + SshHelpers.injectSshIntoSpec(Jsons.deserialize(MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class)); assertEquals(expected, actual); } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 31e8ff8aded3..657db6996a31 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -52,7 +52,11 @@ public class PostgresSource extends AbstractJdbcSource implements Source { private final JdbcSourceOperations sourceOperations; - public PostgresSource() { + public static Source sshWrappedSource() { + return new SshWrappedSource(new PostgresSource(), List.of("host"), List.of("port")); + } + + PostgresSource() { super(DRIVER_CLASS, new PostgresJdbcStreamingQueryConfiguration()); this.sourceOperations = JdbcUtils.getDefaultSourceOperations(); } @@ -67,7 +71,8 @@ public JsonNode toDatabaseConfig(final JsonNode config) { config.get("port").asText(), config.get("database").asText())); - if (config.has("ssl") && config.get("ssl").asBoolean()) { + // assume ssl if not explicitly mentioned. + if (!config.has("ssl") || config.get("ssl").asBoolean()) { additionalParameters.add("ssl=true"); additionalParameters.add("sslmode=require"); } @@ -247,7 +252,7 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) { } public static void main(final String[] args) throws Exception { - final Source source = new SshWrappedSource(new PostgresSource(), List.of("host"), List.of("port")); + final Source source = PostgresSource.sshWrappedSource(); LOGGER.info("starting source: {}", PostgresSource.class); new IntegrationRunner(source).run(args); LOGGER.info("completed source: {}", PostgresSource.class); From 71b2183b16582de68cea1050d101201b4ec14e6b Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 21 Sep 2021 18:13:27 -0700 Subject: [PATCH 03/12] hack --- .../jdbc/test/JdbcSourceAcceptanceTest.java | 76 +++++++++++-------- 1 file changed, 44 insertions(+), 32 deletions(-) diff --git a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index 7b7b14a8f072..abeeb53cde27 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -22,6 +22,7 @@ import io.airbyte.commons.util.MoreIterators; import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; +import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.SourceJdbcUtils; import io.airbyte.integrations.source.relationaldb.models.DbState; @@ -50,7 +51,9 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; @@ -95,7 +98,8 @@ public abstract class JdbcSourceAcceptanceTest { public JsonNode config; public JdbcDatabase database; - public AbstractJdbcSource source; + public Source source; + public Function toDatabaseConfig; public static String streamName; /** @@ -130,17 +134,22 @@ public abstract class JdbcSourceAcceptanceTest { */ public abstract AbstractJdbcSource getSource(); - protected String createTableQuery(String tableName, String columnClause, String primaryKeyClause) { + public ImmutablePair> toDatabaseConfigOverride() { + final AbstractJdbcSource source = getSource(); + return ImmutablePair.of(source, source::toDatabaseConfig); + } + + protected String createTableQuery(final String tableName, final String columnClause, final String primaryKeyClause) { return String.format("CREATE TABLE %s(%s %s %s)", tableName, columnClause, primaryKeyClause.equals("") ? "" : ",", primaryKeyClause); } - protected String primaryKeyClause(List columns) { + protected String primaryKeyClause(final List columns) { if (columns.isEmpty()) { return ""; } - StringBuilder clause = new StringBuilder(); + final StringBuilder clause = new StringBuilder(); clause.append("PRIMARY KEY ("); for (int i = 0; i < columns.size(); i++) { clause.append(columns.get(i)); @@ -153,9 +162,12 @@ protected String primaryKeyClause(List columns) { } public void setup() throws Exception { - source = getSource(); + final ImmutablePair> sourceFunctionImmutablePair = toDatabaseConfigOverride(); + source = sourceFunctionImmutablePair.getLeft(); config = getConfig(); - final JsonNode jdbcConfig = source.toDatabaseConfig(config); + toDatabaseConfig = sourceFunctionImmutablePair.getRight(); + // final JsonNode jdbcConfig = source.toDatabaseConfig(config); + final JsonNode jdbcConfig = toDatabaseConfig.apply(config); streamName = TABLE_NAME; @@ -253,7 +265,7 @@ void testCheckFailure() throws Exception { @Test void testDiscover() throws Exception { final AirbyteCatalog actual = filterOutOtherSchemas(source.discover(config)); - AirbyteCatalog expected = getCatalog(getDefaultNamespace()); + final AirbyteCatalog expected = getCatalog(getDefaultNamespace()); assertEquals(expected.getStreams().size(), actual.getStreams().size()); actual.getStreams().forEach(actualStream -> { final Optional expectedStream = @@ -265,7 +277,7 @@ void testDiscover() throws Exception { }); } - protected AirbyteCatalog filterOutOtherSchemas(AirbyteCatalog catalog) { + protected AirbyteCatalog filterOutOtherSchemas(final AirbyteCatalog catalog) { if (supportsSchemas()) { final AirbyteCatalog filteredCatalog = Jsons.clone(catalog); filteredCatalog.setStreams(filteredCatalog.getStreams() @@ -312,7 +324,7 @@ void testDiscoverWithMultipleSchemas() throws Exception { Field.of(COL_NAME, JsonSchemaPrimitive.STRING)) .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))); // sort streams by name so that we are comparing lists with the same order. - Comparator schemaTableCompare = Comparator.comparing(stream -> stream.getNamespace() + "." + stream.getName()); + final Comparator schemaTableCompare = Comparator.comparing(stream -> stream.getNamespace() + "." + stream.getName()); expected.getStreams().sort(schemaTableCompare); actual.getStreams().sort(schemaTableCompare); assertEquals(expected, filterOutOtherSchemas(actual)); @@ -325,7 +337,7 @@ void testReadSuccess() throws Exception { source.read(config, getConfiguredCatalogWithOneStream(getDefaultNamespace()), null)); setEmittedAtToNull(actualMessages); - List expectedMessages = getTestMessages(); + final List expectedMessages = getTestMessages(); assertThat(expectedMessages, Matchers.containsInAnyOrder(actualMessages.toArray())); assertThat(actualMessages, Matchers.containsInAnyOrder(expectedMessages.toArray())); } @@ -596,7 +608,7 @@ void testReadOneTableIncrementallyTwice() throws Exception { @Test void testReadMultipleTablesIncrementally() throws Exception { final String tableName2 = TABLE_NAME + 2; - String streamName2 = streamName + 2; + final String streamName2 = streamName + 2; database.execute(ctx -> { ctx.createStatement().execute( createTableQuery(getFullyQualifiedTableName(tableName2), "id INTEGER, name VARCHAR(200)", "")); @@ -692,21 +704,21 @@ void testReadMultipleTablesIncrementally() throws Exception { // when initial and final cursor fields are the same. private void incrementalCursorCheck( - String cursorField, - String initialCursorValue, - String endCursorValue, - List expectedRecordMessages) + final String cursorField, + final String initialCursorValue, + final String endCursorValue, + final List expectedRecordMessages) throws Exception { incrementalCursorCheck(cursorField, cursorField, initialCursorValue, endCursorValue, expectedRecordMessages); } private void incrementalCursorCheck( - String initialCursorField, - String cursorField, - String initialCursorValue, - String endCursorValue, - List expectedRecordMessages) + final String initialCursorField, + final String cursorField, + final String initialCursorValue, + final String endCursorValue, + final List expectedRecordMessages) throws Exception { incrementalCursorCheck(initialCursorField, cursorField, initialCursorValue, endCursorValue, expectedRecordMessages, @@ -714,12 +726,12 @@ private void incrementalCursorCheck( } private void incrementalCursorCheck( - String initialCursorField, - String cursorField, - String initialCursorValue, - String endCursorValue, - List expectedRecordMessages, - ConfiguredAirbyteStream airbyteStream) + final String initialCursorField, + final String cursorField, + final String initialCursorValue, + final String endCursorValue, + final List expectedRecordMessages, + final ConfiguredAirbyteStream airbyteStream) throws Exception { airbyteStream.setSyncMode(SyncMode.INCREMENTAL); airbyteStream.setCursorField(Lists.newArrayList(cursorField)); @@ -856,13 +868,13 @@ protected ConfiguredAirbyteStream createTableWithSpaces() throws SQLException { Field.of(COL_LAST_NAME_WITH_SPACE, JsonSchemaPrimitive.STRING)); } - public String getFullyQualifiedTableName(String tableName) { + public String getFullyQualifiedTableName(final String tableName) { return SourceJdbcUtils.getFullyQualifiedTableName(getDefaultSchemaName(), tableName); } public void createSchemas() throws SQLException { if (supportsSchemas()) { - for (String schemaName : TEST_SCHEMAS) { + for (final String schemaName : TEST_SCHEMAS) { final String createSchemaQuery = String.format("CREATE SCHEMA %s;", schemaName); database.execute(connection -> connection.createStatement().execute(createSchemaQuery)); } @@ -871,7 +883,7 @@ public void createSchemas() throws SQLException { public void dropSchemas() throws SQLException { if (supportsSchemas()) { - for (String schemaName : TEST_SCHEMAS) { + for (final String schemaName : TEST_SCHEMAS) { final String dropSchemaQuery = String .format(DROP_SCHEMA_QUERY, schemaName); database.execute(connection -> connection.createStatement().execute(dropSchemaQuery)); @@ -879,7 +891,7 @@ public void dropSchemas() throws SQLException { } } - private JsonNode convertIdBasedOnDatabase(int idValue) { + private JsonNode convertIdBasedOnDatabase(final int idValue) { if (getDriverClass().toLowerCase().contains("oracle")) { return Jsons.jsonNode(BigDecimal.valueOf(idValue)); } else if (getDriverClass().toLowerCase().contains("snowflake")) { @@ -902,8 +914,8 @@ protected String getDefaultNamespace() { } } - protected static void setEmittedAtToNull(Iterable messages) { - for (AirbyteMessage actualMessage : messages) { + protected static void setEmittedAtToNull(final Iterable messages) { + for (final AirbyteMessage actualMessage : messages) { if (actualMessage.getRecord() != null) { actualMessage.getRecord().setEmittedAt(null); } From d567fdc59516d406e0919d618fab13a33e50b575 Mon Sep 17 00:00:00 2001 From: cgardens Date: Tue, 21 Sep 2021 18:31:58 -0700 Subject: [PATCH 04/12] fix image name in acceptance tests --- .../source-postgres-strict/acceptance-test-config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres-strict/acceptance-test-config.yml b/airbyte-integrations/connectors/source-postgres-strict/acceptance-test-config.yml index 94a547fa654f..03d893e809d6 100644 --- a/airbyte-integrations/connectors/source-postgres-strict/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-postgres-strict/acceptance-test-config.yml @@ -1,6 +1,6 @@ # See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) # for more information about how to configure these tests -connector_image: airbyte/source-postgres:dev +connector_image: airbyte/source-postgres-strict:dev tests: spec: - spec_path: "src/main/resources/spec.json" From b5a556cd2ef01456522f451fb789c75219a63b91 Mon Sep 17 00:00:00 2001 From: cgardens Date: Mon, 27 Sep 2021 15:05:53 -0700 Subject: [PATCH 05/12] enforce ssl for postgres normalization --- .../normalization/transform_config/transform.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py b/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py index daeb24df4a28..8e9a574c3895 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py @@ -174,6 +174,10 @@ def transform_postgres(config: Dict[str, Any]): "threads": 32, } + # if unset, we assume true. + if config.get("ssl", True): + config["sslmode"] = "require" + return dbt_config @staticmethod From 5f4b68ade2ce358a0a33e852a02b59a6b9bac18c Mon Sep 17 00:00:00 2001 From: cgardens Date: Mon, 27 Sep 2021 15:33:30 -0700 Subject: [PATCH 06/12] various clean --- .../base/spec_modification/SpecModifyingSource.java | 3 +++ .../source-postgres-strict/acceptance-test-config.yml | 2 +- .../integrations/source/postgres/PostgresSourceStrict.java | 1 - ...tanceTest.java => PostgresSourceStrictAcceptanceTest.java} | 4 +--- .../src/{main => test}/resources/expected_spec.json | 0 5 files changed, 5 insertions(+), 5 deletions(-) rename airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/{PostgresSourceAcceptanceTest.java => PostgresSourceStrictAcceptanceTest.java} (97%) rename airbyte-integrations/connectors/source-postgres-strict/src/{main => test}/resources/expected_spec.json (100%) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java index d51e5fab5c89..734c9a274e6a 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java @@ -33,6 +33,9 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; +/** + * In some cases we want to prune or mutate the spec for an existing source. The common case is that we want to remove features that are not appropriate for some reason. e.g. In cloud, we do not want to allow users to send data unencrypted. + */ public abstract class SpecModifyingSource implements Source { private final Source source; diff --git a/airbyte-integrations/connectors/source-postgres-strict/acceptance-test-config.yml b/airbyte-integrations/connectors/source-postgres-strict/acceptance-test-config.yml index 03d893e809d6..32dc45e7d6ee 100644 --- a/airbyte-integrations/connectors/source-postgres-strict/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-postgres-strict/acceptance-test-config.yml @@ -3,4 +3,4 @@ connector_image: airbyte/source-postgres-strict:dev tests: spec: - - spec_path: "src/main/resources/spec.json" + - spec_path: "src/test/resources/expected_spec.json" diff --git a/airbyte-integrations/connectors/source-postgres-strict/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrict.java b/airbyte-integrations/connectors/source-postgres-strict/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrict.java index 992724175506..40b77f2fc344 100644 --- a/airbyte-integrations/connectors/source-postgres-strict/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrict.java +++ b/airbyte-integrations/connectors/source-postgres-strict/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrict.java @@ -45,7 +45,6 @@ public class PostgresSourceStrict extends SpecModifyingSource implements Source public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) { final ConnectorSpecification spec = Jsons.clone(originalSpec); ((ObjectNode) spec.getConnectionSpecification().get("properties")).remove("ssl"); - System.out.println("spec = " + spec); return spec; } diff --git a/airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictAcceptanceTest.java similarity index 97% rename from airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java rename to airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictAcceptanceTest.java index c2e70c8e8157..5579269ec38e 100644 --- a/airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictAcceptanceTest.java @@ -49,7 +49,7 @@ import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.DockerImageName; -public class PostgresSourceAcceptanceTest extends SourceAcceptanceTest { +public class PostgresSourceStrictAcceptanceTest extends SourceAcceptanceTest { private static final String STREAM_NAME = "public.id_and_name"; private static final String STREAM_NAME2 = "public.starships"; @@ -61,7 +61,6 @@ public class PostgresSourceAcceptanceTest extends SourceAcceptanceTest { protected void setupEnvironment(final TestDestinationEnv environment) throws Exception { container = new PostgreSQLContainer<>(DockerImageName.parse("marcosmarxm/postgres-ssl:dev").asCompatibleSubstituteFor("postgres")) .withCommand("postgres -c ssl=on -c ssl_cert_file=/var/lib/postgresql/server.crt -c ssl_key_file=/var/lib/postgresql/server.key"); - // container = new PostgreSQLContainer<>("postgres:13-alpine"); container.start(); final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder() .put("method", "Standard") @@ -72,7 +71,6 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc .put("database", container.getDatabaseName()) .put("username", container.getUsername()) .put("password", container.getPassword()) - // .put("ssl", false) .put("replication_method", replicationMethod) .build()); diff --git a/airbyte-integrations/connectors/source-postgres-strict/src/main/resources/expected_spec.json b/airbyte-integrations/connectors/source-postgres-strict/src/test/resources/expected_spec.json similarity index 100% rename from airbyte-integrations/connectors/source-postgres-strict/src/main/resources/expected_spec.json rename to airbyte-integrations/connectors/source-postgres-strict/src/test/resources/expected_spec.json From 50ad2032096825576a967a9bcd72601027cea6a8 Mon Sep 17 00:00:00 2001 From: cgardens Date: Mon, 27 Sep 2021 16:16:23 -0700 Subject: [PATCH 07/12] fix test --- .../transform_config/transform.py | 4 --- .../ClickHouseJdbcSourceAcceptanceTest.java | 8 +++--- .../CockroachDbJdbcSourceAcceptanceTest.java | 6 ++-- .../Db2JdbcSourceAcceptanceTest.java | 4 +-- .../AbstractJdbcSourceAcceptanceTest.java | 8 +++--- .../jdbc/test/JdbcSourceAcceptanceTest.java | 19 ++++++------- .../mssql/MssqlJdbcSourceAcceptanceTest.java | 2 +- .../mysql/MySqlJdbcSourceAcceptanceTest.java | 4 +-- .../OracleJdbcSourceAcceptanceTest.java | 28 +++++++++---------- .../.dockerignore | 0 .../Dockerfile | 2 +- .../acceptance-test-config.yml | 2 +- .../build.gradle | 0 .../PostgresSourceStrictEncrypt.java} | 12 ++++---- ...resSourceStrictEncryptAcceptanceTest.java} | 4 +-- ...trictEncryptJdbcSourceAcceptanceTest.java} | 17 ++++++----- .../src/test/resources/expected_spec.json | 0 .../source/postgres/PostgresSource.java | 4 +++ .../PostgresJdbcSourceAcceptanceTest.java | 2 +- .../RedshiftJdbcSourceAcceptanceTest.java | 2 +- ...ffoldJavaJdbcJdbcSourceAcceptanceTest.java | 2 +- .../SnowflakeJdbcSourceAcceptanceTest.java | 2 +- 22 files changed, 67 insertions(+), 65 deletions(-) rename airbyte-integrations/connectors/{source-postgres-strict => source-postgres-strict-encrypt}/.dockerignore (100%) rename airbyte-integrations/connectors/{source-postgres-strict => source-postgres-strict-encrypt}/Dockerfile (80%) rename airbyte-integrations/connectors/{source-postgres-strict => source-postgres-strict-encrypt}/acceptance-test-config.yml (81%) rename airbyte-integrations/connectors/{source-postgres-strict => source-postgres-strict-encrypt}/build.gradle (100%) rename airbyte-integrations/connectors/{source-postgres-strict/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrict.java => source-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrictEncrypt.java} (84%) rename airbyte-integrations/connectors/{source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictAcceptanceTest.java => source-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictEncryptAcceptanceTest.java} (97%) rename airbyte-integrations/connectors/{source-postgres-strict/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictJdbcSourceAcceptanceTest.java => source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java} (88%) rename airbyte-integrations/connectors/{source-postgres-strict => source-postgres-strict-encrypt}/src/test/resources/expected_spec.json (100%) diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py b/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py index 8e9a574c3895..daeb24df4a28 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_config/transform.py @@ -174,10 +174,6 @@ def transform_postgres(config: Dict[str, Any]): "threads": 32, } - # if unset, we assume true. - if config.get("ssl", True): - config["sslmode"] = "require" - return dbt_config @staticmethod diff --git a/airbyte-integrations/connectors/source-clickhouse/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/ClickHouseJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-clickhouse/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/ClickHouseJdbcSourceAcceptanceTest.java index 3d35b30a2864..c7388a29eca7 100644 --- a/airbyte-integrations/connectors/source-clickhouse/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/ClickHouseJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-clickhouse/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/ClickHouseJdbcSourceAcceptanceTest.java @@ -38,7 +38,7 @@ public String getDriverClass() { } @Override - public String createTableQuery(String tableName, String columnClause, String primaryKeyClause) { + public String createTableQuery(final String tableName, final String columnClause, final String primaryKeyClause) { // ClickHouse requires Engine to be mentioned as part of create table query. // Refer : https://clickhouse.tech/docs/en/engines/table-engines/ for more information return String.format("CREATE TABLE %s(%s) %s", @@ -56,12 +56,12 @@ public void tearDown() throws SQLException { } @Override - public String primaryKeyClause(List columns) { + public String primaryKeyClause(final List columns) { if (columns.isEmpty()) { return ""; } - StringBuilder clause = new StringBuilder(); + final StringBuilder clause = new StringBuilder(); clause.append("("); for (int i = 0; i < columns.size(); i++) { clause.append(columns.get(i)); @@ -91,7 +91,7 @@ public void setup() throws Exception { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new ClickHouseSource(); } diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/test/java/io/airbyte/integrations/source/cockroachdb/CockroachDbJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-cockroachdb/src/test/java/io/airbyte/integrations/source/cockroachdb/CockroachDbJdbcSourceAcceptanceTest.java index 632984d75169..66affd568aea 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/src/test/java/io/airbyte/integrations/source/cockroachdb/CockroachDbJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-cockroachdb/src/test/java/io/airbyte/integrations/source/cockroachdb/CockroachDbJdbcSourceAcceptanceTest.java @@ -95,7 +95,7 @@ public boolean supportsSchemas() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new CockroachDbSource(); } @@ -360,7 +360,7 @@ void testReadMultipleTables() throws Exception { @Test void testReadMultipleTablesIncrementally() throws Exception { final String tableName2 = TABLE_NAME + 2; - String streamName2 = streamName + 2; + final String streamName2 = streamName + 2; database.execute(ctx -> { ctx.createStatement().execute( createTableQuery(getFullyQualifiedTableName(tableName2), "id INTEGER, name VARCHAR(200)", @@ -493,7 +493,7 @@ void testDiscoverWithMultipleSchemas() throws Exception { .withSourceDefinedPrimaryKey(List.of(List.of(COL_ROW_ID)))); // sort streams by name so that we are comparing lists with the same order. - Comparator schemaTableCompare = Comparator + final Comparator schemaTableCompare = Comparator .comparing(stream -> stream.getNamespace() + "." + stream.getName()); expected.getStreams().sort(schemaTableCompare); actual.getStreams().sort(schemaTableCompare); diff --git a/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java index 88ebf4052614..e0b9203b2b2b 100644 --- a/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java @@ -72,7 +72,7 @@ public void setup() throws Exception { public void clean() throws Exception { // In Db2 before dropping a schema, all objects that were in that schema must be dropped or moved to // another schema. - for (String tableName : TEST_TABLES) { + for (final String tableName : TEST_TABLES) { final String dropTableQuery = String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME, tableName); super.database.execute(connection -> connection.createStatement().execute(dropTableQuery)); @@ -116,7 +116,7 @@ public String getDriverClass() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new Db2Source(); } diff --git a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java index 135abc5e52aa..65c4a4000c94 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSourceAcceptanceTest.java @@ -65,7 +65,7 @@ public boolean supportsSchemas() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new PostgresTestSource(); } @@ -95,8 +95,8 @@ public PostgresTestSource() { } @Override - public JsonNode toDatabaseConfig(JsonNode config) { - ImmutableMap.Builder configBuilder = ImmutableMap.builder() + public JsonNode toDatabaseConfig(final JsonNode config) { + final ImmutableMap.Builder configBuilder = ImmutableMap.builder() .put("username", config.get("username").asText()) .put("jdbc_url", String.format("jdbc:postgresql://%s:%s/%s", config.get("host").asText(), @@ -115,7 +115,7 @@ public Set getExcludedInternalNameSpaces() { return Set.of("information_schema", "pg_catalog", "pg_internal", "catalog_history"); } - public static void main(String[] args) throws Exception { + public static void main(final String[] args) throws Exception { final Source source = new PostgresTestSource(); LOGGER.info("starting source: {}", PostgresTestSource.class); new IntegrationRunner(source).run(args); diff --git a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index abeeb53cde27..255844a37784 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -99,7 +99,6 @@ public abstract class JdbcSourceAcceptanceTest { public JsonNode config; public JdbcDatabase database; public Source source; - public Function toDatabaseConfig; public static String streamName; /** @@ -132,11 +131,14 @@ public abstract class JdbcSourceAcceptanceTest { * * @return source */ - public abstract AbstractJdbcSource getSource(); + public abstract AbstractJdbcSource getJdbcSource(); - public ImmutablePair> toDatabaseConfigOverride() { - final AbstractJdbcSource source = getSource(); - return ImmutablePair.of(source, source::toDatabaseConfig); + public Source getSource() { + return getJdbcSource(); + } + + public Function getToDatabaseConfigFunction() { + return getJdbcSource()::toDatabaseConfig; } protected String createTableQuery(final String tableName, final String columnClause, final String primaryKeyClause) { @@ -162,12 +164,9 @@ protected String primaryKeyClause(final List columns) { } public void setup() throws Exception { - final ImmutablePair> sourceFunctionImmutablePair = toDatabaseConfigOverride(); - source = sourceFunctionImmutablePair.getLeft(); + source = getSource(); config = getConfig(); - toDatabaseConfig = sourceFunctionImmutablePair.getRight(); - // final JsonNode jdbcConfig = source.toDatabaseConfig(config); - final JsonNode jdbcConfig = toDatabaseConfig.apply(config); + final JsonNode jdbcConfig = getToDatabaseConfigFunction().apply(config); streamName = TABLE_NAME; diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java index e597f9226de0..ff8525b58ac1 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlJdbcSourceAcceptanceTest.java @@ -74,7 +74,7 @@ public JsonNode getConfig() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new MssqlSource(); } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java index e472e35e6c26..0b024a5ac795 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlJdbcSourceAcceptanceTest.java @@ -44,7 +44,7 @@ static void init() throws SQLException { .withEnv("MYSQL_ROOT_HOST", "%") .withEnv("MYSQL_ROOT_PASSWORD", TEST_PASSWORD); container.start(); - Connection connection = DriverManager.getConnection(container.getJdbcUrl(), "root", TEST_PASSWORD); + final Connection connection = DriverManager.getConnection(container.getJdbcUrl(), "root", TEST_PASSWORD); connection.createStatement().execute("GRANT ALL PRIVILEGES ON *.* TO '" + TEST_USER + "'@'%';\n"); } @@ -95,7 +95,7 @@ public boolean supportsSchemas() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new MySqlSource(); } diff --git a/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java index e21c58f9eed9..3b263a80a5b4 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java @@ -97,15 +97,15 @@ public void tearDownOracle() throws Exception { } void cleanUpTables() throws SQLException { - Connection conn = DriverManager.getConnection( + final Connection conn = DriverManager.getConnection( ORACLE_DB.getJdbcUrl(), ORACLE_DB.getUsername(), ORACLE_DB.getPassword()); - for (String schemaName : TEST_SCHEMAS) { - ResultSet resultSet = conn.createStatement().executeQuery(String.format("SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = '%s'", schemaName)); + for (final String schemaName : TEST_SCHEMAS) { + final ResultSet resultSet = conn.createStatement().executeQuery(String.format("SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = '%s'", schemaName)); while (resultSet.next()) { - String tableName = resultSet.getString("TABLE_NAME"); - String tableNameProcessed = tableName.contains(" ") ? SourceJdbcUtils + final String tableName = resultSet.getString("TABLE_NAME"); + final String tableNameProcessed = tableName.contains(" ") ? SourceJdbcUtils .enquoteIdentifier(conn, tableName) : tableName; conn.createStatement().executeQuery(String.format("DROP TABLE %s.%s", schemaName, tableNameProcessed)); } @@ -121,7 +121,7 @@ public boolean supportsSchemas() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new OracleSource(); } @@ -145,7 +145,7 @@ public void createSchemas() throws SQLException { // In Oracle, `CREATE USER` creates a schema. // See https://www.oratable.com/oracle-user-schema-difference/ if (supportsSchemas()) { - for (String schemaName : TEST_SCHEMAS) { + for (final String schemaName : TEST_SCHEMAS) { executeOracleStatement( String.format( "CREATE USER %s IDENTIFIED BY password DEFAULT TABLESPACE USERS QUOTA UNLIMITED ON USERS", @@ -154,21 +154,21 @@ public void createSchemas() throws SQLException { } } - public void executeOracleStatement(String query) throws SQLException { - Connection conn = DriverManager.getConnection( + public void executeOracleStatement(final String query) throws SQLException { + final Connection conn = DriverManager.getConnection( ORACLE_DB.getJdbcUrl(), ORACLE_DB.getUsername(), ORACLE_DB.getPassword()); - try (Statement stmt = conn.createStatement()) { + try (final Statement stmt = conn.createStatement()) { stmt.execute(query); - } catch (SQLException e) { + } catch (final SQLException e) { logSQLException(e); } conn.close(); } - public static void logSQLException(SQLException ex) { - for (Throwable e : ex) { + public static void logSQLException(final SQLException ex) { + for (final Throwable e : ex) { if (e instanceof SQLException) { if (ignoreSQLException(((SQLException) e).getSQLState()) == false) { e.printStackTrace(System.err); @@ -185,7 +185,7 @@ public static void logSQLException(SQLException ex) { } } - public static boolean ignoreSQLException(String sqlState) { + public static boolean ignoreSQLException(final String sqlState) { // This only ignore cases where other databases won't raise errors // Drop table, schema etc or try to recreate a table; if (sqlState == null) { diff --git a/airbyte-integrations/connectors/source-postgres-strict/.dockerignore b/airbyte-integrations/connectors/source-postgres-strict-encrypt/.dockerignore similarity index 100% rename from airbyte-integrations/connectors/source-postgres-strict/.dockerignore rename to airbyte-integrations/connectors/source-postgres-strict-encrypt/.dockerignore diff --git a/airbyte-integrations/connectors/source-postgres-strict/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile similarity index 80% rename from airbyte-integrations/connectors/source-postgres-strict/Dockerfile rename to airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index 9b758f3d3b11..4c8f8a5a9857 100644 --- a/airbyte-integrations/connectors/source-postgres-strict/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -9,4 +9,4 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 LABEL io.airbyte.version=0.3.11 -LABEL io.airbyte.name=airbyte/source-postgres-strict +LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres-strict/acceptance-test-config.yml b/airbyte-integrations/connectors/source-postgres-strict-encrypt/acceptance-test-config.yml similarity index 81% rename from airbyte-integrations/connectors/source-postgres-strict/acceptance-test-config.yml rename to airbyte-integrations/connectors/source-postgres-strict-encrypt/acceptance-test-config.yml index 32dc45e7d6ee..a03ef7fed68d 100644 --- a/airbyte-integrations/connectors/source-postgres-strict/acceptance-test-config.yml +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/acceptance-test-config.yml @@ -1,6 +1,6 @@ # See [Source Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/source-acceptance-tests-reference) # for more information about how to configure these tests -connector_image: airbyte/source-postgres-strict:dev +connector_image: airbyte/source-postgres-strict-encrypt:dev tests: spec: - spec_path: "src/test/resources/expected_spec.json" diff --git a/airbyte-integrations/connectors/source-postgres-strict/build.gradle b/airbyte-integrations/connectors/source-postgres-strict-encrypt/build.gradle similarity index 100% rename from airbyte-integrations/connectors/source-postgres-strict/build.gradle rename to airbyte-integrations/connectors/source-postgres-strict-encrypt/build.gradle diff --git a/airbyte-integrations/connectors/source-postgres-strict/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrict.java b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrictEncrypt.java similarity index 84% rename from airbyte-integrations/connectors/source-postgres-strict/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrict.java rename to airbyte-integrations/connectors/source-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrictEncrypt.java index 40b77f2fc344..314dfef921ca 100644 --- a/airbyte-integrations/connectors/source-postgres-strict/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrict.java +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrictEncrypt.java @@ -33,11 +33,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PostgresSourceStrict extends SpecModifyingSource implements Source { +public class PostgresSourceStrictEncrypt extends SpecModifyingSource implements Source { - private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSourceStrict.class); + private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSourceStrictEncrypt.class); - PostgresSourceStrict() { + PostgresSourceStrictEncrypt() { super(PostgresSource.sshWrappedSource()); } @@ -49,10 +49,10 @@ public ConnectorSpecification modifySpec(final ConnectorSpecification originalSp } public static void main(final String[] args) throws Exception { - final Source source = new PostgresSourceStrict(); - LOGGER.info("starting source: {}", PostgresSourceStrict.class); + final Source source = new PostgresSourceStrictEncrypt(); + LOGGER.info("starting source: {}", PostgresSourceStrictEncrypt.class); new IntegrationRunner(source).run(args); - LOGGER.info("completed source: {}", PostgresSourceStrict.class); + LOGGER.info("completed source: {}", PostgresSourceStrictEncrypt.class); } } diff --git a/airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictEncryptAcceptanceTest.java similarity index 97% rename from airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictAcceptanceTest.java rename to airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictEncryptAcceptanceTest.java index 5579269ec38e..cd9ee1dec19c 100644 --- a/airbyte-integrations/connectors/source-postgres-strict/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictEncryptAcceptanceTest.java @@ -49,7 +49,7 @@ import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.DockerImageName; -public class PostgresSourceStrictAcceptanceTest extends SourceAcceptanceTest { +public class PostgresSourceStrictEncryptAcceptanceTest extends SourceAcceptanceTest { private static final String STREAM_NAME = "public.id_and_name"; private static final String STREAM_NAME2 = "public.starships"; @@ -102,7 +102,7 @@ protected void tearDown(final TestDestinationEnv testEnv) { @Override protected String getImageName() { - return "airbyte/source-postgres-strict:dev"; + return "airbyte/source-postgres-strict-encrypt:dev"; } @Override diff --git a/airbyte-integrations/connectors/source-postgres-strict/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java similarity index 88% rename from airbyte-integrations/connectors/source-postgres-strict/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictJdbcSourceAcceptanceTest.java rename to airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java index efb0ae3ca3c3..2609fb50cc49 100644 --- a/airbyte-integrations/connectors/source-postgres-strict/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java @@ -34,12 +34,11 @@ import io.airbyte.commons.string.Strings; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.base.ssh.SshHelpers; -import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; +import io.airbyte.integrations.source.jdbc.JdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.util.function.Function; -import org.apache.commons.lang3.tuple.ImmutablePair; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; @@ -47,7 +46,7 @@ import org.testcontainers.containers.PostgreSQLContainer; import org.testcontainers.utility.MountableFile; -class PostgresStrictJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { +class PostgresStrictEncryptJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest { private static PostgreSQLContainer PSQL_DB; @@ -85,14 +84,18 @@ public boolean supportsSchemas() { } @Override - public AbstractJdbcSource getSource() { - // return new PostgresSourceStrict(); + public JdbcSource getJdbcSource() { return null; } @Override - public ImmutablePair> toDatabaseConfigOverride() { - return ImmutablePair.of(new PostgresSourceStrict(), new PostgresSource()::toDatabaseConfig); + public Source getSource() { + return new PostgresSourceStrictEncrypt(); + } + + @Override + public Function getToDatabaseConfigFunction() { + return new PostgresSource()::toDatabaseConfig; } @Override diff --git a/airbyte-integrations/connectors/source-postgres-strict/src/test/resources/expected_spec.json b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/resources/expected_spec.json similarity index 100% rename from airbyte-integrations/connectors/source-postgres-strict/src/test/resources/expected_spec.json rename to airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/resources/expected_spec.json diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 657db6996a31..36fc955f90ea 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -63,7 +63,11 @@ public static Source sshWrappedSource() { @Override public JsonNode toDatabaseConfig(final JsonNode config) { + return toDatabaseConfigStatic(config); + } + // todo (cgardens) - restructure AbstractJdbcSource so to take this function in the constructor. the current structure forces us to declarehave a bunch of pure function methods as instance members when they could be static. + public JsonNode toDatabaseConfigStatic(final JsonNode config) { final List additionalParameters = new ArrayList<>(); final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:postgresql://%s:%s/%s?", diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java index 1bcde1dd3646..01042bdd3f9a 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresJdbcSourceAcceptanceTest.java @@ -61,7 +61,7 @@ public boolean supportsSchemas() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new PostgresSource(); } diff --git a/airbyte-integrations/connectors/source-redshift/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/RedshiftJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-redshift/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/RedshiftJdbcSourceAcceptanceTest.java index 8c8abd95e08f..69120fef8a15 100644 --- a/airbyte-integrations/connectors/source-redshift/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/RedshiftJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-redshift/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/RedshiftJdbcSourceAcceptanceTest.java @@ -38,7 +38,7 @@ public boolean supportsSchemas() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new RedshiftSource(); } diff --git a/airbyte-integrations/connectors/source-scaffold-java-jdbc/src/test/java/io/airbyte/integrations/source/scaffold_java_jdbc/ScaffoldJavaJdbcJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-scaffold-java-jdbc/src/test/java/io/airbyte/integrations/source/scaffold_java_jdbc/ScaffoldJavaJdbcJdbcSourceAcceptanceTest.java index e023ddf53c13..ed1f59cac3b9 100644 --- a/airbyte-integrations/connectors/source-scaffold-java-jdbc/src/test/java/io/airbyte/integrations/source/scaffold_java_jdbc/ScaffoldJavaJdbcJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-scaffold-java-jdbc/src/test/java/io/airbyte/integrations/source/scaffold_java_jdbc/ScaffoldJavaJdbcJdbcSourceAcceptanceTest.java @@ -40,7 +40,7 @@ public void tearDown() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new ScaffoldJavaJdbcSource(); } diff --git a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java index b61e3aa286a7..5748066ee2cf 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-snowflake/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/SnowflakeJdbcSourceAcceptanceTest.java @@ -73,7 +73,7 @@ public String getDriverClass() { } @Override - public AbstractJdbcSource getSource() { + public AbstractJdbcSource getJdbcSource() { return new SnowflakeSource(); } From 72069d4e33ef66dc37430a65385e2cd1e6bbc52a Mon Sep 17 00:00:00 2001 From: cgardens Date: Mon, 27 Sep 2021 16:16:57 -0700 Subject: [PATCH 08/12] so it goes --- .../base/spec_modification/SpecModifyingSource.java | 4 +++- .../source/jdbc/test/JdbcSourceAcceptanceTest.java | 1 - .../source/oracle/OracleJdbcSourceAcceptanceTest.java | 3 ++- .../PostgresStrictEncryptJdbcSourceAcceptanceTest.java | 2 +- .../airbyte/integrations/source/postgres/PostgresSource.java | 4 +++- 5 files changed, 9 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java index 734c9a274e6a..7bc022c2ecb9 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java @@ -34,7 +34,9 @@ import io.airbyte.protocol.models.ConnectorSpecification; /** - * In some cases we want to prune or mutate the spec for an existing source. The common case is that we want to remove features that are not appropriate for some reason. e.g. In cloud, we do not want to allow users to send data unencrypted. + * In some cases we want to prune or mutate the spec for an existing source. The common case is that + * we want to remove features that are not appropriate for some reason. e.g. In cloud, we do not + * want to allow users to send data unencrypted. */ public abstract class SpecModifyingSource implements Source { diff --git a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index 255844a37784..886dee529d85 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -53,7 +53,6 @@ import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; -import org.apache.commons.lang3.tuple.ImmutablePair; import org.hamcrest.Matchers; import org.junit.jupiter.api.Test; diff --git a/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java index 3b263a80a5b4..4e2e425fbd9c 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle/src/test/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java @@ -102,7 +102,8 @@ void cleanUpTables() throws SQLException { ORACLE_DB.getUsername(), ORACLE_DB.getPassword()); for (final String schemaName : TEST_SCHEMAS) { - final ResultSet resultSet = conn.createStatement().executeQuery(String.format("SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = '%s'", schemaName)); + final ResultSet resultSet = + conn.createStatement().executeQuery(String.format("SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = '%s'", schemaName)); while (resultSet.next()) { final String tableName = resultSet.getString("TABLE_NAME"); final String tableNameProcessed = tableName.contains(" ") ? SourceJdbcUtils diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java index 2609fb50cc49..8c15fee082de 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java @@ -90,7 +90,7 @@ public JdbcSource getJdbcSource() { @Override public Source getSource() { - return new PostgresSourceStrictEncrypt(); + return new PostgresSourceStrictEncrypt(); } @Override diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 36fc955f90ea..a688c472bc8c 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -66,7 +66,9 @@ public JsonNode toDatabaseConfig(final JsonNode config) { return toDatabaseConfigStatic(config); } - // todo (cgardens) - restructure AbstractJdbcSource so to take this function in the constructor. the current structure forces us to declarehave a bunch of pure function methods as instance members when they could be static. + // todo (cgardens) - restructure AbstractJdbcSource so to take this function in the constructor. the + // current structure forces us to declarehave a bunch of pure function methods as instance members + // when they could be static. public JsonNode toDatabaseConfigStatic(final JsonNode config) { final List additionalParameters = new ArrayList<>(); From 275096c07e05d6a28d77382267e421befe67d013 Mon Sep 17 00:00:00 2001 From: cgardens Date: Mon, 27 Sep 2021 16:18:06 -0700 Subject: [PATCH 09/12] go --- .../connectors/source-postgres-strict-encrypt/Dockerfile | 2 +- .../connectors/source-postgres-strict-encrypt/build.gradle | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index 4c8f8a5a9857..a423accc75e2 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.3.11 +LABEL io.airbyte.version=0.1.0 LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/build.gradle b/airbyte-integrations/connectors/source-postgres-strict-encrypt/build.gradle index ecf0a3355134..3e36298b36e7 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/build.gradle +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/build.gradle @@ -11,7 +11,6 @@ application { dependencies { implementation project(':airbyte-db:lib') - // need any of these? implementation project(':airbyte-integrations:bases:base-java') implementation project(':airbyte-integrations:connectors:source-postgres') implementation project(':airbyte-protocol:models') From 36e11d719b3c9a4ea1622e5d03e2eb4dc1edf7ef Mon Sep 17 00:00:00 2001 From: cgardens Date: Mon, 27 Sep 2021 16:24:59 -0700 Subject: [PATCH 10/12] fix main method --- .../connectors/source-postgres-strict-encrypt/build.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/build.gradle b/airbyte-integrations/connectors/source-postgres-strict-encrypt/build.gradle index 3e36298b36e7..3261abeac55d 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/build.gradle +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/build.gradle @@ -5,7 +5,7 @@ plugins { } application { - mainClass = 'io.airbyte.integrations.source.postgres.PostgresSourceStrict' + mainClass = 'io.airbyte.integrations.source.postgres.PostgresSourceStrictEncrypt' } dependencies { From aaf3eaa3667abdf3a110bfe09d5e9a72bde43a02 Mon Sep 17 00:00:00 2001 From: cgardens Date: Mon, 27 Sep 2021 16:32:25 -0700 Subject: [PATCH 11/12] fix the noise --- .../SpecModifyingSource.java | 22 +------------------ .../source-postgres-strict-encrypt/Dockerfile | 2 +- .../postgres/PostgresSourceStrictEncrypt.java | 22 +------------------ ...gresSourceStrictEncryptAcceptanceTest.java | 22 +------------------ ...StrictEncryptJdbcSourceAcceptanceTest.java | 22 +------------------ 5 files changed, 5 insertions(+), 85 deletions(-) diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java index 7bc022c2ecb9..afa0373865b0 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/spec_modification/SpecModifyingSource.java @@ -1,25 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. */ package io.airbyte.integrations.base.spec_modification; diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile index a423accc75e2..ae28a1f58fdb 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/Dockerfile @@ -2,7 +2,7 @@ FROM airbyte/integration-base-java:dev WORKDIR /airbyte -ENV APPLICATION source-postgres-strict +ENV APPLICATION source-postgres-strict-encrypt COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrictEncrypt.java b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrictEncrypt.java index 314dfef921ca..acb7e1bc3e7b 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrictEncrypt.java +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceStrictEncrypt.java @@ -1,25 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. */ package io.airbyte.integrations.source.postgres; diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictEncryptAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictEncryptAcceptanceTest.java index cd9ee1dec19c..8729d20ab487 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictEncryptAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresSourceStrictEncryptAcceptanceTest.java @@ -1,25 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. */ package io.airbyte.integrations.io.airbyte.integration_tests.sources; diff --git a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java index 8c15fee082de..c9e431613f00 100644 --- a/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-postgres-strict-encrypt/src/test/java/io/airbyte/integrations/source/postgres/PostgresStrictEncryptJdbcSourceAcceptanceTest.java @@ -1,25 +1,5 @@ /* - * MIT License - * - * Copyright (c) 2020 Airbyte - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in all - * copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE - * SOFTWARE. + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. */ package io.airbyte.integrations.source.postgres; From 72ba74ba4fd628ff222c927bbc4a5ed14a9816e7 Mon Sep 17 00:00:00 2001 From: cgardens Date: Mon, 27 Sep 2021 16:45:11 -0700 Subject: [PATCH 12/12] add java doc comments --- .../jdbc/test/JdbcSourceAcceptanceTest.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index 886dee529d85..dc1578bfbf6d 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -128,14 +128,28 @@ public abstract class JdbcSourceAcceptanceTest { /** * An instance of the source that should be tests. * - * @return source + * @return abstract jdbc source */ public abstract AbstractJdbcSource getJdbcSource(); + /** + * In some cases the Source that is being tested may be an AbstractJdbcSource, but because it is + * decorated, Java cannot recognize it as such. In these cases, as a workaround a user can choose to + * override getJdbcSource and have it return null. Then they can override this method with the + * decorated source AND override getToDatabaseConfigFunction with the appropriate + * toDatabaseConfigFunction that is hidden behind the decorator. + * + * @return source + */ public Source getSource() { return getJdbcSource(); } + /** + * See getSource() for when to override this method. + * + * @return a function that maps a source's config to a jdbc config. + */ public Function getToDatabaseConfigFunction() { return getJdbcSource()::toDatabaseConfig; }