diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/IncrementalUtils.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/IncrementalUtils.java index 6a73a03cc6bb..b251b9f18e66 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/IncrementalUtils.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/IncrementalUtils.java @@ -6,6 +6,7 @@ import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.JsonSchemaPrimitive; +import java.util.Optional; public class IncrementalUtils { @@ -22,6 +23,14 @@ public static String getCursorField(final ConfiguredAirbyteStream stream) { } } + public static Optional getCursorFieldOptional(final ConfiguredAirbyteStream stream) { + try { + return Optional.ofNullable(getCursorField(stream)); + } catch (IllegalStateException e) { + return Optional.empty(); + } + } + public static JsonSchemaPrimitive getCursorType(final ConfiguredAirbyteStream stream, final String cursorField) { if (stream.getStream().getJsonSchema().get(PROPERTIES) == null) { throw new IllegalStateException(String.format("No properties found in stream: %s.", stream.getStream().getName())); diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 007abf97f920..d89da496b914 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -371,6 +371,11 @@ protected DataSource createDataSource(final JsonNode config) { return dataSource; } + @Override + protected boolean isValidCursorType(final Datatype cursorType) { + return sourceOperations.isCursorType(cursorType); + } + @Override public JdbcDatabase createDatabase(final JsonNode config) throws SQLException { final DataSource dataSource = createDataSource(config); diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index 226b546ccba9..485dfe11e3c0 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -7,6 +7,8 @@ import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.createRecord; import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.map; import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.setEmittedAtToNull; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -24,16 +26,20 @@ import io.airbyte.db.factory.DSLContextFactory; import io.airbyte.db.factory.DatabaseDriver; import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.source.relationaldb.InvalidCursorException; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.SyncMode; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.math.BigDecimal; +import java.sql.SQLException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -503,4 +509,45 @@ void testGetUsername() { assertEquals(username, PostgresSource.getUsername(azureConfig)); } + + @Test + public void tableWithInvalidCursorShouldThrowException() throws Exception { + try (final PostgreSQLContainer db = new PostgreSQLContainer<>("postgres:13-alpine")) { + db.start(); + final JsonNode config = getConfig(db); + try (final DSLContext dslContext = getDslContext(config)) { + final Database database = new Database(dslContext); + final ConfiguredAirbyteStream tableWithInvalidCursorType = createTableWithInvalidCursorType(database); + final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(tableWithInvalidCursorType)); + + final Throwable throwable = catchThrowable(() -> MoreIterators.toSet(new PostgresSource().read(config, configuredAirbyteCatalog, null))); + assertThat(throwable).isInstanceOf(InvalidCursorException.class) + .hasMessageContaining( + "The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering as a cursor. {tableName='public.test_table', cursorColumnName='id', cursorSqlType=OTHER}"); + } finally { + db.stop(); + } + } + } + + private ConfiguredAirbyteStream createTableWithInvalidCursorType(final Database database) throws SQLException { + database.query(ctx -> { + ctx.fetch("CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";"); + ctx.fetch("CREATE TABLE IF NOT EXISTS public.test_table(id uuid PRIMARY KEY DEFAULT uuid_generate_v4());"); + return null; + }); + + return new ConfiguredAirbyteStream().withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(Lists.newArrayList("id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withSyncMode(SyncMode.INCREMENTAL) + .withStream(CatalogHelpers.createAirbyteStream( + "test_table", + "public", + Field.of("id", JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of("id")))); + + } + } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index e4c5d7724147..19c82513c110 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -23,6 +23,7 @@ import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.source.relationaldb.InvalidCursorException.InvalidCursorInfo; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.state.StateManager; import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory; @@ -50,6 +51,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -136,6 +138,8 @@ public AutoCloseableIterator read(final JsonNode config, .collect(Collectors.toMap(t -> String.format("%s.%s", t.getNameSpace(), t.getName()), Function .identity())); + validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog); + final List> incrementalIterators = getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, emittedAt); final List> fullRefreshIterators = @@ -153,6 +157,42 @@ public AutoCloseableIterator read(final JsonNode config, }); } + private void validateCursorFieldForIncrementalTables(final Map>> tableNameToTable, final ConfiguredAirbyteCatalog catalog) { + final List tablesWithInvalidCursor = new ArrayList<>(); + for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { + final AirbyteStream stream = airbyteStream.getStream(); + final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(), + stream.getName()); + final boolean hasSourceDefinedCursor = + !Objects.isNull(airbyteStream.getStream().getSourceDefinedCursor()) && airbyteStream.getStream().getSourceDefinedCursor(); + if (!tableNameToTable.containsKey(fullyQualifiedTableName) || airbyteStream.getSyncMode() != SyncMode.INCREMENTAL || hasSourceDefinedCursor) { + continue; + } + + final TableInfo> table = tableNameToTable + .get(fullyQualifiedTableName); + final Optional cursorField = IncrementalUtils.getCursorFieldOptional(airbyteStream); + if (cursorField.isEmpty()) { + continue; + } + final DataType cursorType = table.getFields().stream() + .filter(info -> info.getName().equals(cursorField.get())) + .map(CommonField::getType) + .findFirst() + .orElseThrow(); + + if (!isValidCursorType(cursorType)) { + tablesWithInvalidCursor.add(new InvalidCursorInfo(fullyQualifiedTableName, cursorField.get(), cursorType.toString())); + } + } + + if (!tablesWithInvalidCursor.isEmpty()) { + throw new InvalidCursorException(tablesWithInvalidCursor); + } + } + + protected abstract boolean isValidCursorType(final DataType cursorType); + protected List>> discoverWithoutSystemTables(final Database database) throws Exception { final Set systemNameSpaces = getExcludedInternalNameSpaces(); final List>> discoveredTables = discoverInternal(database); diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/InvalidCursorException.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/InvalidCursorException.java new file mode 100644 index 000000000000..97cf102cab35 --- /dev/null +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/InvalidCursorException.java @@ -0,0 +1,26 @@ +package io.airbyte.integrations.source.relationaldb; + +import java.util.List; +import java.util.stream.Collectors; + +public class InvalidCursorException extends RuntimeException { + + public InvalidCursorException(final List tablesWithInvalidCursor) { + super("The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering as a cursor. " + tablesWithInvalidCursor.stream().map(InvalidCursorInfo::toString) + .collect(Collectors.joining(","))); + } + + public record InvalidCursorInfo(String tableName, String cursorColumnName, String cursorSqlType) { + + @Override + public String toString() { + return "{" + + "tableName='" + tableName + '\'' + + ", cursorColumnName='" + cursorColumnName + '\'' + + ", cursorSqlType=" + cursorSqlType + + '}'; + } + } + + +}