From 3670545995a6cbc6579104ac9726f82315b1003d Mon Sep 17 00:00:00 2001 From: Charles Date: Tue, 19 Jan 2021 18:45:53 -0800 Subject: [PATCH] Fix JdbcSource handling of tables with same names in different schemas (#1724) * Fix JdbcSource handling of tables with same names in different schemas * Previously the JdbcSource was combining the columns of any tables with the same name across different schemas into a single stream in the catalog. * This was caught because in those tables there were columns of the same name with different types which triggered a precondition to check for this. * The fix makes sure we group by both schema name and table name. * Adds test to the standard jdbc tests to catch this case. * This test does NOT run for mysql as, mysql has no concept of schemas. --- .../435bb9a5-7887-4809-aa58-28c27df0d7ad.json | 2 +- .../b5ea17b1-f170-46dc-bc31-cc744ca984c1.json | 2 +- .../decd338e-5647-4c0b-adf4-da0e75f5a750.json | 2 +- .../e87ffa8e-a3b5-f69c-9076-6011339de1f6.json | 2 +- .../resources/seed/source_definitions.yaml | 8 +-- .../postgres/PostgresDestination.java | 2 +- .../source/jdbc/AbstractJdbcSource.java | 57 +++++++++---------- .../jdbc/test/JdbcSourceStandardTest.java | 32 ++++++++++- .../connectors/source-mssql/Dockerfile | 2 +- .../connectors/source-mysql/Dockerfile | 2 +- .../connectors/source-postgres/Dockerfile | 2 +- .../connectors/source-redshift/Dockerfile | 2 +- 12 files changed, 72 insertions(+), 43 deletions(-) diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json index 5de9c31dffd6..dbc826d2d967 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad", "name": "MySQL", "dockerRepository": "airbyte/source-mysql", - "dockerImageTag": "0.1.7", + "dockerImageTag": "0.1.8", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b5ea17b1-f170-46dc-bc31-cc744ca984c1.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b5ea17b1-f170-46dc-bc31-cc744ca984c1.json index 80dcc014cecd..bc88f557af3d 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b5ea17b1-f170-46dc-bc31-cc744ca984c1.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b5ea17b1-f170-46dc-bc31-cc744ca984c1.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "b5ea17b1-f170-46dc-bc31-cc744ca984c1", "name": "Microsoft SQL Server (MSSQL)", "dockerRepository": "airbyte/source-mssql", - "dockerImageTag": "0.1.8", + "dockerImageTag": "0.1.9", "documentationUrl": "https://hub.docker.com/r/airbyte/source-mssql" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json index a27ada1faa17..a898c30963a4 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750", "name": "Postgres", "dockerRepository": "airbyte/source-postgres", - "dockerImageTag": "0.1.9", + "dockerImageTag": "0.1.10", "documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e87ffa8e-a3b5-f69c-9076-6011339de1f6.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e87ffa8e-a3b5-f69c-9076-6011339de1f6.json index 05e92ab1df26..ae8f4a4d2f4e 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e87ffa8e-a3b5-f69c-9076-6011339de1f6.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e87ffa8e-a3b5-f69c-9076-6011339de1f6.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "e87ffa8e-a3b5-f69c-9076-6011339de1f6", "name": "Redshift", "dockerRepository": "airbyte/source-redshift", - "dockerImageTag": "0.1.3", + "dockerImageTag": "0.1.4", "documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-redshift" } diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 5a2b7d8ff59c..e6cc56d5fb6b 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -21,12 +21,12 @@ - sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1 name: Microsoft SQL Server (MSSQL) dockerRepository: airbyte/source-mssql - dockerImageTag: 0.1.8 + dockerImageTag: 0.1.9 documentationUrl: https://hub.docker.com/r/airbyte/source-mssql - sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 name: Postgres dockerRepository: airbyte/source-postgres - dockerImageTag: 0.1.9 + dockerImageTag: 0.1.10 documentationUrl: https://hub.docker.com/r/airbyte/source-postgres - sourceDefinitionId: cd42861b-01fc-4658-a8ab-5d11d0510f01 name: Recurly @@ -51,7 +51,7 @@ - sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad name: MySQL dockerRepository: airbyte/source-mysql - dockerImageTag: 0.1.7 + dockerImageTag: 0.1.8 documentationUrl: https://docs.airbyte.io/integrations/sources/mysql - sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77 name: Salesforce @@ -96,7 +96,7 @@ - sourceDefinitionId: e87ffa8e-a3b5-f69c-9076-6011339de1f6 name: Redshift dockerRepository: airbyte/source-redshift - dockerImageTag: 0.1.3 + dockerImageTag: 0.1.4 documentationUrl: https://hub.docker.com/repository/docker/airbyte/source-redshift - sourceDefinitionId: 932e6363-d006-4464-a9f5-102b82e07c06 name: Twilio diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java index f8ab927b30fd..e3927e0a1403 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java @@ -42,7 +42,7 @@ public class PostgresDestination extends AbstractJdbcDestination implements Dest public static final String DRIVER_CLASS = "org.postgresql.Driver"; public PostgresDestination() { - super("org.postgresql.Driver", new PostgresSQLNameTransformer(), new DefaultSqlOperations()); + super(DRIVER_CLASS, new PostgresSQLNameTransformer(), new DefaultSqlOperations()); } @Override diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index deaca0f16a30..f5a1cc9fc9de 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -66,6 +66,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -277,7 +278,7 @@ private List getTables(final JdbcDatabase database, // some databases return multiple copies of the same record for a column (e.g. redshift) because // they have at least once delivery guarantees. we want to dedupe these, but first we check that the // records are actually the same and provide a good error message if they are not. - assertColumnsWithSameNameAreSame(t.getName(), t.getFields()); + assertColumnsWithSameNameAreSame(t.getSchemaName(), t.getName(), t.getFields()); final List fields = t.getFields() .stream() .map(f -> Field.of(f.getColumnName(), JdbcUtils.getType(f.getColumnType()))) @@ -289,7 +290,7 @@ private List getTables(final JdbcDatabase database, .collect(Collectors.toList()); } - private static void assertColumnsWithSameNameAreSame(String tableName, List columns) { + private static void assertColumnsWithSameNameAreSame(String schemaName, String tableName, List columns) { columns.stream() .collect(Collectors.groupingBy(ColumnInfo::getColumnName)) .values() @@ -298,8 +299,8 @@ private static void assertColumnsWithSameNameAreSame(String tableName, List { if (!column.equals(comparisonColumn)) { throw new RuntimeException( - String.format("Found multiple columns with same name: %s in table: %s but the columns are not the same. columns: %s", - comparisonColumn.getColumnName(), tableName, columns)); + String.format("Found multiple columns with same name: %s in table: %s.%s but the columns are not the same. columns: %s", + comparisonColumn.getColumnName(), schemaName, tableName, columns)); } }); }); @@ -328,32 +329,30 @@ private List discoverInternal(final JdbcDatabase database, .build())) .stream() .filter(t -> !internalSchemas.contains(t.get(INTERNAL_SCHEMA_NAME).asText())) - .collect(Collectors.groupingBy(t -> t.get(INTERNAL_TABLE_NAME).asText())) - .entrySet() + // group by schema and table name to handle the case where a table with the same name exists in + // multiple schemas. + .collect(Collectors.groupingBy(t -> ImmutablePair.of(t.get(INTERNAL_SCHEMA_NAME).asText(), t.get(INTERNAL_TABLE_NAME).asText()))) + .values() .stream() - .map(e -> { - final String tableName = e.getKey(); - final List fields = e.getValue(); - return new TableInfoInternal( - fields.get(0).get(INTERNAL_SCHEMA_NAME).asText(), - tableName, - fields.stream() - .map(f -> { - JDBCType jdbcType; - try { - jdbcType = JDBCType.valueOf(f.get(INTERNAL_COLUMN_TYPE).asInt()); - } catch (IllegalArgumentException ex) { - LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.", - f.get(INTERNAL_COLUMN_NAME), - f.get(INTERNAL_SCHEMA_NAME), - f.get(INTERNAL_TABLE_NAME), - f.get(INTERNAL_COLUMN_TYPE))); - jdbcType = JDBCType.VARCHAR; - } - return new ColumnInfo(f.get(INTERNAL_COLUMN_NAME).asText(), jdbcType); - }) - .collect(Collectors.toList())); - }) + .map(fields -> new TableInfoInternal( + fields.get(0).get(INTERNAL_SCHEMA_NAME).asText(), + fields.get(0).get(INTERNAL_TABLE_NAME).asText(), + fields.stream() + .map(f -> { + JDBCType jdbcType; + try { + jdbcType = JDBCType.valueOf(f.get(INTERNAL_COLUMN_TYPE).asInt()); + } catch (IllegalArgumentException ex) { + LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.", + f.get(INTERNAL_COLUMN_NAME), + f.get(INTERNAL_SCHEMA_NAME), + f.get(INTERNAL_TABLE_NAME), + f.get(INTERNAL_COLUMN_TYPE))); + jdbcType = JDBCType.VARCHAR; + } + return new ColumnInfo(f.get(INTERNAL_COLUMN_NAME).asText(), jdbcType); + }) + .collect(Collectors.toList()))) .collect(Collectors.toList()); } diff --git a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceStandardTest.java b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceStandardTest.java index 3101d1d711c9..772ca3fa5a92 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceStandardTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceStandardTest.java @@ -49,6 +49,7 @@ import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -57,6 +58,7 @@ import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; import io.airbyte.protocol.models.SyncMode; import java.util.ArrayList; +import java.util.Comparator; import java.util.List; import java.util.Optional; import java.util.stream.Collectors; @@ -128,7 +130,7 @@ public void setup() throws Exception { getDriverClass()); database.execute(connection -> { - connection.createStatement().execute(String.format("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200), updated_at DATE);")); + connection.createStatement().execute("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200), updated_at DATE);"); connection.createStatement().execute( "INSERT INTO id_and_name (id, name, updated_at) VALUES (1,'picard', '2004-10-19'), (2, 'crusher', '2005-10-19'), (3, 'vash', '2006-10-19');"); }); @@ -165,6 +167,34 @@ void testDiscover() throws Exception { assertEquals(getCatalog(), actual); } + @Test + void testDiscoverWithMultipleSchemas() throws Exception { + // mysql does not have a concept of schemas, so this test does not make sense for it. + if (getDriverClass().toLowerCase().contains("mysql")) { + return; + } + + // add table and data to a separate schema. + database.execute(connection -> { + connection.createStatement().execute("CREATE SCHEMA public2;"); + connection.createStatement().execute("CREATE TABLE public2.id_and_name(id VARCHAR(200), name VARCHAR(200));"); + connection.createStatement().execute( + "INSERT INTO public2.id_and_name (id, name) VALUES ('1','picard'), ('2', 'crusher'), ('3', 'vash');"); + }); + + final AirbyteCatalog actual = source.discover(config); + + final AirbyteCatalog expected = getCatalog(); + expected.getStreams().add(CatalogHelpers.createAirbyteStream("public2.id_and_name", + Field.of("id", JsonSchemaPrimitive.STRING), + Field.of("name", JsonSchemaPrimitive.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))); + // sort streams by name so that we are comparing lists with the same order. + expected.getStreams().sort(Comparator.comparing(AirbyteStream::getName)); + actual.getStreams().sort(Comparator.comparing(AirbyteStream::getName)); + assertEquals(expected, actual); + } + @Test void testReadSuccess() throws Exception { final List actualMessages = source.read(config, getConfiguredCatalog(), null).collect(Collectors.toList()); diff --git a/airbyte-integrations/connectors/source-mssql/Dockerfile b/airbyte-integrations/connectors/source-mssql/Dockerfile index 9d3172de1b3f..ee85196d4400 100644 --- a/airbyte-integrations/connectors/source-mssql/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.8 +LABEL io.airbyte.version=0.1.9 LABEL io.airbyte.name=airbyte/source-mssql diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index c77dfb2f35ac..4f82ce43d1f1 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.7 +LABEL io.airbyte.version=0.1.8 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 06b606d951ed..e2cf91cf4743 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.9 +LABEL io.airbyte.version=0.1.10 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-redshift/Dockerfile b/airbyte-integrations/connectors/source-redshift/Dockerfile index fad7ce4350bc..6b7ea08a57c3 100644 --- a/airbyte-integrations/connectors/source-redshift/Dockerfile +++ b/airbyte-integrations/connectors/source-redshift/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-redshift