Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve error message for tables with invalid columns as cursor #15317

Merged
merged 12 commits into from
Sep 13, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.util.Optional;

public class IncrementalUtils {

Expand All @@ -22,6 +23,14 @@ public static String getCursorField(final ConfiguredAirbyteStream stream) {
}
}

public static Optional<String> getCursorFieldOptional(final ConfiguredAirbyteStream stream) {
try {
return Optional.ofNullable(getCursorField(stream));
} catch (IllegalStateException e) {
return Optional.empty();
}
}

public static JsonSchemaPrimitive getCursorType(final ConfiguredAirbyteStream stream, final String cursorField) {
if (stream.getStream().getJsonSchema().get(PROPERTIES) == null) {
throw new IllegalStateException(String.format("No properties found in stream: %s.", stream.getStream().getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,11 @@ protected DataSource createDataSource(final JsonNode config) {
return dataSource;
}

@Override
protected boolean isValidCursorType(final Datatype cursorType) {
return sourceOperations.isCursorType(cursorType);
}

@Override
public JdbcDatabase createDatabase(final JsonNode config) throws SQLException {
final DataSource dataSource = createDataSource(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.createRecord;
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.map;
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.setEmittedAtToNull;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -24,16 +26,20 @@
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.source.relationaldb.InvalidCursorException;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -503,4 +509,45 @@ void testGetUsername() {
assertEquals(username, PostgresSource.getUsername(azureConfig));
}


@Test
public void tableWithInvalidCursorShouldThrowException() throws Exception {
try (final PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:13-alpine")) {
db.start();
final JsonNode config = getConfig(db);
try (final DSLContext dslContext = getDslContext(config)) {
final Database database = new Database(dslContext);
final ConfiguredAirbyteStream tableWithInvalidCursorType = createTableWithInvalidCursorType(database);
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(tableWithInvalidCursorType));

final Throwable throwable = catchThrowable(() -> MoreIterators.toSet(new PostgresSource().read(config, configuredAirbyteCatalog, null)));
assertThat(throwable).isInstanceOf(InvalidCursorException.class)
.hasMessageContaining(
"The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering as a cursor. {tableName='public.test_table', cursorColumnName='id', cursorSqlType=OTHER}");
} finally {
db.stop();
}
}
}

private ConfiguredAirbyteStream createTableWithInvalidCursorType(final Database database) throws SQLException {
database.query(ctx -> {
ctx.fetch("CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";");
ctx.fetch("CREATE TABLE IF NOT EXISTS public.test_table(id uuid PRIMARY KEY DEFAULT uuid_generate_v4());");
return null;
});

return new ConfiguredAirbyteStream().withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.INCREMENTAL)
.withStream(CatalogHelpers.createAirbyteStream(
"test_table",
"public",
Field.of("id", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.relationaldb.InvalidCursorException.InvalidCursorInfo;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory;
Expand Down Expand Up @@ -50,6 +51,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -136,6 +138,8 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
.collect(Collectors.toMap(t -> String.format("%s.%s", t.getNameSpace(), t.getName()), Function
.identity()));

validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog);

final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators =
getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, emittedAt);
final List<AutoCloseableIterator<AirbyteMessage>> fullRefreshIterators =
Expand All @@ -153,6 +157,42 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
});
}

private void validateCursorFieldForIncrementalTables(final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable, final ConfiguredAirbyteCatalog catalog) {
Copy link
Contributor

@ryankfu ryankfu Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm mistaken, shouldn't this method have throws InvalidCursorException at the end to indicate that it could throw an Exception?

Another note is can we have a short javadoc comment that says something along the lines of

/**
 * Creates a list of incremental tables with invalid cursor columns (e.g. non-numeric types). Will also throw 
 * `InvalidCursorException` if at least one table includes an invalid cursor type
 */

EDIT: After chatting with Ed on this, since InvalidCursorException extends RuntimeException you don't need to define this in the method, preference would still be to have this defined either as a javadoc comment or within the method to just know in case the future someone wants to catch this Exception

final List<InvalidCursorInfo> tablesWithInvalidCursor = new ArrayList<>();
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
final boolean hasSourceDefinedCursor =
!Objects.isNull(airbyteStream.getStream().getSourceDefinedCursor()) && airbyteStream.getStream().getSourceDefinedCursor();
if (!tableNameToTable.containsKey(fullyQualifiedTableName) || airbyteStream.getSyncMode() != SyncMode.INCREMENTAL || hasSourceDefinedCursor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my understanding: is there any situation where tableNameToTable.containsKey(fullyQualifiedTableName) isn't true? (totally fair to have this condition either way, I'm just curious how tableNameToTable gets populated)

continue;
}

final TableInfo<CommonField<DataType>> table = tableNameToTable
.get(fullyQualifiedTableName);
final Optional<String> cursorField = IncrementalUtils.getCursorFieldOptional(airbyteStream);
if (cursorField.isEmpty()) {
continue;
}
final DataType cursorType = table.getFields().stream()
.filter(info -> info.getName().equals(cursorField.get()))
.map(CommonField::getType)
.findFirst()
.orElseThrow();

if (!isValidCursorType(cursorType)) {
tablesWithInvalidCursor.add(new InvalidCursorInfo(fullyQualifiedTableName, cursorField.get(), cursorType.toString()));
}
}

if (!tablesWithInvalidCursor.isEmpty()) {
throw new InvalidCursorException(tablesWithInvalidCursor);
}
}

protected abstract boolean isValidCursorType(final DataType cursorType);

protected List<TableInfo<CommonField<DataType>>> discoverWithoutSystemTables(final Database database) throws Exception {
final Set<String> systemNameSpaces = getExcludedInternalNameSpaces();
final List<TableInfo<CommonField<DataType>>> discoveredTables = discoverInternal(database);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.airbyte.integrations.source.relationaldb;

import java.util.List;
import java.util.stream.Collectors;

public class InvalidCursorException extends RuntimeException {

public InvalidCursorException(final List<InvalidCursorInfo> tablesWithInvalidCursor) {
super("The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering as a cursor. " + tablesWithInvalidCursor.stream().map(InvalidCursorInfo::toString)
.collect(Collectors.joining(",")));
}

public record InvalidCursorInfo(String tableName, String cursorColumnName, String cursorSqlType) {

@Override
public String toString() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nitpick: I'd prefer to define a new method prettyString instead of overriding toString (mostly in case someone actually wants to print one of these out for debugging or something)

return "{" +
"tableName='" + tableName + '\'' +
", cursorColumnName='" + cursorColumnName + '\'' +
", cursorSqlType=" + cursorSqlType +
'}';
}
}


}