Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix JdbcSource handling of tables with same names in different schemas #1724

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad",
"name": "MySQL",
"dockerRepository": "airbyte/source-mysql",
"dockerImageTag": "0.1.7",
"dockerImageTag": "0.1.8",
"documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "b5ea17b1-f170-46dc-bc31-cc744ca984c1",
"name": "Microsoft SQL Server (MSSQL)",
"dockerRepository": "airbyte/source-mssql",
"dockerImageTag": "0.1.8",
"dockerImageTag": "0.1.9",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-mssql"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.1.9",
"dockerImageTag": "0.1.10",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "e87ffa8e-a3b5-f69c-9076-6011339de1f6",
"name": "Redshift",
"dockerRepository": "airbyte/source-redshift",
"dockerImageTag": "0.1.3",
"dockerImageTag": "0.1.4",
"documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-redshift"
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
- sourceDefinitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
name: Microsoft SQL Server (MSSQL)
dockerRepository: airbyte/source-mssql
dockerImageTag: 0.1.8
dockerImageTag: 0.1.9
documentationUrl: https://hub.docker.com/r/airbyte/source-mssql
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
name: Postgres
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.1.9
dockerImageTag: 0.1.10
documentationUrl: https://hub.docker.com/r/airbyte/source-postgres
- sourceDefinitionId: cd42861b-01fc-4658-a8ab-5d11d0510f01
name: Recurly
Expand All @@ -51,7 +51,7 @@
- sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
name: MySQL
dockerRepository: airbyte/source-mysql
dockerImageTag: 0.1.7
dockerImageTag: 0.1.8
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
- sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77
name: Salesforce
Expand Down Expand Up @@ -96,7 +96,7 @@
- sourceDefinitionId: e87ffa8e-a3b5-f69c-9076-6011339de1f6
name: Redshift
dockerRepository: airbyte/source-redshift
dockerImageTag: 0.1.3
dockerImageTag: 0.1.4
documentationUrl: https://hub.docker.com/repository/docker/airbyte/source-redshift
- sourceDefinitionId: 932e6363-d006-4464-a9f5-102b82e07c06
name: Twilio
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class PostgresDestination extends AbstractJdbcDestination implements Dest
public static final String DRIVER_CLASS = "org.postgresql.Driver";

public PostgresDestination() {
super("org.postgresql.Driver", new PostgresSQLNameTransformer(), new DefaultSqlOperations());
super(DRIVER_CLASS, new PostgresSQLNameTransformer(), new DefaultSqlOperations());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -277,7 +278,7 @@ private List<TableInfo> getTables(final JdbcDatabase database,
// some databases return multiple copies of the same record for a column (e.g. redshift) because
// they have at least once delivery guarantees. we want to dedupe these, but first we check that the
// records are actually the same and provide a good error message if they are not.
assertColumnsWithSameNameAreSame(t.getName(), t.getFields());
assertColumnsWithSameNameAreSame(t.getSchemaName(), t.getName(), t.getFields());
final List<Field> fields = t.getFields()
.stream()
.map(f -> Field.of(f.getColumnName(), JdbcUtils.getType(f.getColumnType())))
Expand All @@ -289,7 +290,7 @@ private List<TableInfo> getTables(final JdbcDatabase database,
.collect(Collectors.toList());
}

private static void assertColumnsWithSameNameAreSame(String tableName, List<ColumnInfo> columns) {
private static void assertColumnsWithSameNameAreSame(String schemaName, String tableName, List<ColumnInfo> columns) {
columns.stream()
.collect(Collectors.groupingBy(ColumnInfo::getColumnName))
.values()
Expand All @@ -298,8 +299,8 @@ private static void assertColumnsWithSameNameAreSame(String tableName, List<Colu
columnsWithSameName.forEach(column -> {
if (!column.equals(comparisonColumn)) {
throw new RuntimeException(
String.format("Found multiple columns with same name: %s in table: %s but the columns are not the same. columns: %s",
comparisonColumn.getColumnName(), tableName, columns));
String.format("Found multiple columns with same name: %s in table: %s.%s but the columns are not the same. columns: %s",
comparisonColumn.getColumnName(), schemaName, tableName, columns));
}
});
});
Expand Down Expand Up @@ -328,32 +329,30 @@ private List<TableInfoInternal> discoverInternal(final JdbcDatabase database,
.build()))
.stream()
.filter(t -> !internalSchemas.contains(t.get(INTERNAL_SCHEMA_NAME).asText()))
.collect(Collectors.groupingBy(t -> t.get(INTERNAL_TABLE_NAME).asText()))
.entrySet()
// group by schema and table name to handle the case where a table with the same name exists in
// multiple schemas.
.collect(Collectors.groupingBy(t -> ImmutablePair.of(t.get(INTERNAL_SCHEMA_NAME).asText(), t.get(INTERNAL_TABLE_NAME).asText())))
.values()
.stream()
.map(e -> {
final String tableName = e.getKey();
final List<JsonNode> fields = e.getValue();
return new TableInfoInternal(
fields.get(0).get(INTERNAL_SCHEMA_NAME).asText(),
tableName,
fields.stream()
.map(f -> {
JDBCType jdbcType;
try {
jdbcType = JDBCType.valueOf(f.get(INTERNAL_COLUMN_TYPE).asInt());
} catch (IllegalArgumentException ex) {
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.",
f.get(INTERNAL_COLUMN_NAME),
f.get(INTERNAL_SCHEMA_NAME),
f.get(INTERNAL_TABLE_NAME),
f.get(INTERNAL_COLUMN_TYPE)));
jdbcType = JDBCType.VARCHAR;
}
return new ColumnInfo(f.get(INTERNAL_COLUMN_NAME).asText(), jdbcType);
})
.collect(Collectors.toList()));
})
.map(fields -> new TableInfoInternal(
fields.get(0).get(INTERNAL_SCHEMA_NAME).asText(),
fields.get(0).get(INTERNAL_TABLE_NAME).asText(),
fields.stream()
.map(f -> {
JDBCType jdbcType;
try {
jdbcType = JDBCType.valueOf(f.get(INTERNAL_COLUMN_TYPE).asInt());
} catch (IllegalArgumentException ex) {
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.",
f.get(INTERNAL_COLUMN_NAME),
f.get(INTERNAL_SCHEMA_NAME),
f.get(INTERNAL_TABLE_NAME),
f.get(INTERNAL_COLUMN_TYPE)));
jdbcType = JDBCType.VARCHAR;
}
return new ColumnInfo(f.get(INTERNAL_COLUMN_NAME).asText(), jdbcType);
})
.collect(Collectors.toList())))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand All @@ -57,6 +58,7 @@
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -128,7 +130,7 @@ public void setup() throws Exception {
getDriverClass());

database.execute(connection -> {
connection.createStatement().execute(String.format("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200), updated_at DATE);"));
connection.createStatement().execute("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200), updated_at DATE);");
connection.createStatement().execute(
"INSERT INTO id_and_name (id, name, updated_at) VALUES (1,'picard', '2004-10-19'), (2, 'crusher', '2005-10-19'), (3, 'vash', '2006-10-19');");
});
Expand Down Expand Up @@ -165,6 +167,34 @@ void testDiscover() throws Exception {
assertEquals(getCatalog(), actual);
}

@Test
void testDiscoverWithMultipleSchemas() throws Exception {
// mysql does not have a concept of schemas, so this test does not make sense for it.
if (getDriverClass().toLowerCase().contains("mysql")) {
return;
}

// add table and data to a separate schema.
database.execute(connection -> {
connection.createStatement().execute("CREATE SCHEMA public2;");
connection.createStatement().execute("CREATE TABLE public2.id_and_name(id VARCHAR(200), name VARCHAR(200));");
connection.createStatement().execute(
"INSERT INTO public2.id_and_name (id, name) VALUES ('1','picard'), ('2', 'crusher'), ('3', 'vash');");
});

final AirbyteCatalog actual = source.discover(config);

final AirbyteCatalog expected = getCatalog();
expected.getStreams().add(CatalogHelpers.createAirbyteStream("public2.id_and_name",
Field.of("id", JsonSchemaPrimitive.STRING),
Field.of("name", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)));
// sort streams by name so that we are comparing lists with the same order.
expected.getStreams().sort(Comparator.comparing(AirbyteStream::getName));
actual.getStreams().sort(Comparator.comparing(AirbyteStream::getName));
assertEquals(expected, actual);
}

@Test
void testReadSuccess() throws Exception {
final List<AirbyteMessage> actualMessages = source.read(config, getConfiguredCatalog(), null).collect(Collectors.toList());
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.name=airbyte/source-mssql
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.7
LABEL io.airbyte.version=0.1.8
LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.9
LABEL io.airbyte.version=0.1.10
LABEL io.airbyte.name=airbyte/source-postgres
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-redshift/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-redshift