Skip to content

Commit

Permalink
Fix JdbcSource handling of tables with same names in different schemas
Browse files Browse the repository at this point in the history
* Previously the JdbcSource was combining the columns of any tables with the same name across different schemas into a single stream in the catalog.

* This was caught because in those tables there were columns of the same name with different types which triggered a precondition to check for this.

* The fix makes sure we group by both schema name and table name.

* Adds test to the standard jdbc tests to catch this case.

* This test does NOT run for mysql as, mysql has no concept of schemas.
  • Loading branch information
cgardens committed Jan 19, 2021
1 parent d4ff906 commit ec78ae4
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -277,7 +278,7 @@ private List<TableInfo> getTables(final JdbcDatabase database,
// some databases return multiple copies of the same record for a column (e.g. redshift) because
// they have at least once delivery guarantees. we want to dedupe these, but first we check that the
// records are actually the same and provide a good error message if they are not.
assertColumnsWithSameNameAreSame(t.getName(), t.getFields());
assertColumnsWithSameNameAreSame(t.getSchemaName(), t.getName(), t.getFields());
final List<Field> fields = t.getFields()
.stream()
.map(f -> Field.of(f.getColumnName(), JdbcUtils.getType(f.getColumnType())))
Expand All @@ -289,7 +290,7 @@ private List<TableInfo> getTables(final JdbcDatabase database,
.collect(Collectors.toList());
}

private static void assertColumnsWithSameNameAreSame(String tableName, List<ColumnInfo> columns) {
private static void assertColumnsWithSameNameAreSame(String schemaName, String tableName, List<ColumnInfo> columns) {
columns.stream()
.collect(Collectors.groupingBy(ColumnInfo::getColumnName))
.values()
Expand All @@ -298,8 +299,8 @@ private static void assertColumnsWithSameNameAreSame(String tableName, List<Colu
columnsWithSameName.forEach(column -> {
if (!column.equals(comparisonColumn)) {
throw new RuntimeException(
String.format("Found multiple columns with same name: %s in table: %s but the columns are not the same. columns: %s",
comparisonColumn.getColumnName(), tableName, columns));
String.format("Found multiple columns with same name: %s in table: %s.%s but the columns are not the same. columns: %s",
comparisonColumn.getColumnName(), schemaName, tableName, columns));
}
});
});
Expand Down Expand Up @@ -328,32 +329,30 @@ private List<TableInfoInternal> discoverInternal(final JdbcDatabase database,
.build()))
.stream()
.filter(t -> !internalSchemas.contains(t.get(INTERNAL_SCHEMA_NAME).asText()))
.collect(Collectors.groupingBy(t -> t.get(INTERNAL_TABLE_NAME).asText()))
.entrySet()
// group by schema and table name to handle the case where a table with the same name exists in
// multiple schemas.
.collect(Collectors.groupingBy(t -> ImmutablePair.of(t.get(INTERNAL_SCHEMA_NAME).asText(), t.get(INTERNAL_TABLE_NAME).asText())))
.values()
.stream()
.map(e -> {
final String tableName = e.getKey();
final List<JsonNode> fields = e.getValue();
return new TableInfoInternal(
fields.get(0).get(INTERNAL_SCHEMA_NAME).asText(),
tableName,
fields.stream()
.map(f -> {
JDBCType jdbcType;
try {
jdbcType = JDBCType.valueOf(f.get(INTERNAL_COLUMN_TYPE).asInt());
} catch (IllegalArgumentException ex) {
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.",
f.get(INTERNAL_COLUMN_NAME),
f.get(INTERNAL_SCHEMA_NAME),
f.get(INTERNAL_TABLE_NAME),
f.get(INTERNAL_COLUMN_TYPE)));
jdbcType = JDBCType.VARCHAR;
}
return new ColumnInfo(f.get(INTERNAL_COLUMN_NAME).asText(), jdbcType);
})
.collect(Collectors.toList()));
})
.map(fields -> new TableInfoInternal(
fields.get(0).get(INTERNAL_SCHEMA_NAME).asText(),
fields.get(0).get(INTERNAL_TABLE_NAME).asText(),
fields.stream()
.map(f -> {
JDBCType jdbcType;
try {
jdbcType = JDBCType.valueOf(f.get(INTERNAL_COLUMN_TYPE).asInt());
} catch (IllegalArgumentException ex) {
LOGGER.warn(String.format("Could not convert column: %s from table: %s.%s with type: %s. Casting to VARCHAR.",
f.get(INTERNAL_COLUMN_NAME),
f.get(INTERNAL_SCHEMA_NAME),
f.get(INTERNAL_TABLE_NAME),
f.get(INTERNAL_COLUMN_TYPE)));
jdbcType = JDBCType.VARCHAR;
}
return new ColumnInfo(f.get(INTERNAL_COLUMN_NAME).asText(), jdbcType);
})
.collect(Collectors.toList())))
.collect(Collectors.toList());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
Expand All @@ -57,6 +58,7 @@
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -128,7 +130,7 @@ public void setup() throws Exception {
getDriverClass());

database.execute(connection -> {
connection.createStatement().execute(String.format("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200), updated_at DATE);"));
connection.createStatement().execute("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200), updated_at DATE);");
connection.createStatement().execute(
"INSERT INTO id_and_name (id, name, updated_at) VALUES (1,'picard', '2004-10-19'), (2, 'crusher', '2005-10-19'), (3, 'vash', '2006-10-19');");
});
Expand Down Expand Up @@ -165,6 +167,34 @@ void testDiscover() throws Exception {
assertEquals(getCatalog(), actual);
}

@Test
void testDiscoverWithMultipleSchemas() throws Exception {
// mysql does not have a concept of schemas, so this test does not make sense for it.
if (getDriverClass().toLowerCase().contains("mysql")) {
return;
}

// add table and data to a separate schema.
database.execute(connection -> {
connection.createStatement().execute("CREATE SCHEMA public2;");
connection.createStatement().execute("CREATE TABLE public2.id_and_name(id VARCHAR(200), name VARCHAR(200));");
connection.createStatement().execute(
"INSERT INTO public2.id_and_name (id, name) VALUES ('1','picard'), ('2', 'crusher'), ('3', 'vash');");
});

final AirbyteCatalog actual = source.discover(config);

final AirbyteCatalog expected = getCatalog();
expected.getStreams().add(CatalogHelpers.createAirbyteStream("public2.id_and_name",
Field.of("id", JsonSchemaPrimitive.STRING),
Field.of("name", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)));
// sort streams by name so that we are comparing lists with the same order.
expected.getStreams().sort(Comparator.comparing(AirbyteStream::getName));
actual.getStreams().sort(Comparator.comparing(AirbyteStream::getName));
assertEquals(expected, actual);
}

@Test
void testReadSuccess() throws Exception {
final List<AirbyteMessage> actualMessages = source.read(config, getConfiguredCatalog(), null).collect(Collectors.toList());