Skip to content

Commit

Permalink
improve error message for tables with invalid columns as cursor (airb…
Browse files Browse the repository at this point in the history
…ytehq#15317)

* implement validation for cursor type before reading data

* rename class

* add test

* fix merge conflicts

* address review comments

* fix test
  • Loading branch information
subodh1810 authored and jhammarstedt committed Oct 31, 2022
1 parent 904f096 commit f5d0c23
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import java.util.Optional;

public class IncrementalUtils {

Expand All @@ -22,6 +23,14 @@ public static String getCursorField(final ConfiguredAirbyteStream stream) {
}
}

public static Optional<String> getCursorFieldOptional(final ConfiguredAirbyteStream stream) {
try {
return Optional.ofNullable(getCursorField(stream));
} catch (IllegalStateException e) {
return Optional.empty();
}
}

public static JsonSchemaPrimitive getCursorType(final ConfiguredAirbyteStream stream, final String cursorField) {
if (stream.getStream().getJsonSchema().get(PROPERTIES) == null) {
throw new IllegalStateException(String.format("No properties found in stream: %s.", stream.getStream().getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,11 @@ protected DataSource createDataSource(final JsonNode config) {
return dataSource;
}

@Override
protected boolean isValidCursorType(final Datatype cursorType) {
return sourceOperations.isCursorType(cursorType);
}

@Override
public JdbcDatabase createDatabase(final JsonNode config) throws SQLException {
final DataSource dataSource = createDataSource(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.createRecord;
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.map;
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.setEmittedAtToNull;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -24,16 +26,20 @@
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.source.relationaldb.InvalidCursorException;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -503,4 +509,45 @@ void testGetUsername() {
assertEquals(username, PostgresSource.getUsername(azureConfig));
}


@Test
public void tableWithInvalidCursorShouldThrowException() throws Exception {
try (final PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:13-alpine")) {
db.start();
final JsonNode config = getConfig(db);
try (final DSLContext dslContext = getDslContext(config)) {
final Database database = new Database(dslContext);
final ConfiguredAirbyteStream tableWithInvalidCursorType = createTableWithInvalidCursorType(database);
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(tableWithInvalidCursorType));

final Throwable throwable = catchThrowable(() -> MoreIterators.toSet(new PostgresSource().read(config, configuredAirbyteCatalog, null)));
assertThat(throwable).isInstanceOf(InvalidCursorException.class)
.hasMessageContaining(
"The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering as a cursor. {tableName='public.test_table', cursorColumnName='id', cursorSqlType=OTHER}");
} finally {
db.stop();
}
}
}

private ConfiguredAirbyteStream createTableWithInvalidCursorType(final Database database) throws SQLException {
database.query(ctx -> {
ctx.fetch("CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";");
ctx.fetch("CREATE TABLE IF NOT EXISTS public.test_table(id uuid PRIMARY KEY DEFAULT uuid_generate_v4());");
return null;
});

return new ConfiguredAirbyteStream().withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.INCREMENTAL)
.withStream(CatalogHelpers.createAirbyteStream(
"test_table",
"public",
Field.of("id", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.relationaldb.InvalidCursorException.InvalidCursorInfo;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory;
Expand Down Expand Up @@ -54,6 +55,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -146,6 +148,8 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
.collect(Collectors.toMap(t -> String.format("%s.%s", t.getNameSpace(), t.getName()), Function
.identity()));

validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog);

final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators =
getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, emittedAt);
final List<AutoCloseableIterator<AirbyteMessage>> fullRefreshIterators =
Expand All @@ -163,6 +167,42 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
});
}

private void validateCursorFieldForIncrementalTables(final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable, final ConfiguredAirbyteCatalog catalog) {
final List<InvalidCursorInfo> tablesWithInvalidCursor = new ArrayList<>();
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
final boolean hasSourceDefinedCursor =
!Objects.isNull(airbyteStream.getStream().getSourceDefinedCursor()) && airbyteStream.getStream().getSourceDefinedCursor();
if (!tableNameToTable.containsKey(fullyQualifiedTableName) || airbyteStream.getSyncMode() != SyncMode.INCREMENTAL || hasSourceDefinedCursor) {
continue;
}

final TableInfo<CommonField<DataType>> table = tableNameToTable
.get(fullyQualifiedTableName);
final Optional<String> cursorField = IncrementalUtils.getCursorFieldOptional(airbyteStream);
if (cursorField.isEmpty()) {
continue;
}
final DataType cursorType = table.getFields().stream()
.filter(info -> info.getName().equals(cursorField.get()))
.map(CommonField::getType)
.findFirst()
.orElseThrow();

if (!isValidCursorType(cursorType)) {
tablesWithInvalidCursor.add(new InvalidCursorInfo(fullyQualifiedTableName, cursorField.get(), cursorType.toString()));
}
}

if (!tablesWithInvalidCursor.isEmpty()) {
throw new InvalidCursorException(tablesWithInvalidCursor);
}
}

protected abstract boolean isValidCursorType(final DataType cursorType);

protected List<TableInfo<CommonField<DataType>>> discoverWithoutSystemTables(final Database database) throws Exception {
final Set<String> systemNameSpaces = getExcludedInternalNameSpaces();
final List<TableInfo<CommonField<DataType>>> discoveredTables = discoverInternal(database);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.airbyte.integrations.source.relationaldb;

import java.util.List;
import java.util.stream.Collectors;

public class InvalidCursorException extends RuntimeException {

public InvalidCursorException(final List<InvalidCursorInfo> tablesWithInvalidCursor) {
super("The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering as a cursor. " + tablesWithInvalidCursor.stream().map(InvalidCursorInfo::toString)
.collect(Collectors.joining(",")));
}

public record InvalidCursorInfo(String tableName, String cursorColumnName, String cursorSqlType) {

@Override
public String toString() {
return "{" +
"tableName='" + tableName + '\'' +
", cursorColumnName='" + cursorColumnName + '\'' +
", cursorSqlType=" + cursorSqlType +
'}';
}
}


}

0 comments on commit f5d0c23

Please sign in to comment.