Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve error message for tables with invalid columns as cursor #15317

Merged
merged 12 commits into from
Sep 13, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ void setStatementField(final PreparedStatement preparedStatement,
*/
String getFullyQualifiedTableNameWithQuoting(final Connection connection, final String schemaName, final String tableName) throws SQLException;

boolean isValidCursorType(final SourceType cursorType);

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,14 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
}
}

@Override
public boolean isValidCursorType(final JDBCType cursorType) {
return switch (cursorType) {
case TIMESTAMP, TIME, DATE, BIT, BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL, CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR, BINARY, BLOB -> true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as previous comment, would like to see if this can be reordered lexicographically to more easily know if a cursor type is supported

default -> false;
};
}

@Override
public void setStatementField(final PreparedStatement preparedStatement,
final int parameterIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,11 @@ protected DataSource createDataSource(final JsonNode config) {
return dataSource;
}

@Override
protected boolean isValidCursorType(final Datatype cursorType) {
return sourceOperations.isValidCursorType(cursorType);
}

@Override
public JdbcDatabase createDatabase(final JsonNode config) throws SQLException {
final DataSource dataSource = createDataSource(config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ protected void putBoolean(final ObjectNode node, final String columnName, final
node.put(columnName, resultSet.getInt(index) > 0);
}

@Override
public boolean isValidCursorType(final MysqlType cursorType) {
return switch (cursorType) {
case BIT, BOOLEAN, TINYINT, TINYINT_UNSIGNED, SMALLINT, SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED, INT, INT_UNSIGNED, BIGINT, BIGINT_UNSIGNED, FLOAT, FLOAT_UNSIGNED, DOUBLE, DOUBLE_UNSIGNED, DECIMAL, DECIMAL_UNSIGNED, DATE, DATETIME, TIMESTAMP, TIME, YEAR, CHAR, VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, ENUM, SET, TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY -> true;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you think of this style?

return switch (cursorType) {
  case <mysql-specific types> -> true;
  default -> super.isValidCursorType(cursorType);
};

I.e. only defining the types that aren't handled by JdbcSourceOperations already. (not a huge win in this case, but I think it would make e.g. getJsonType much nicer)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach as well, reduces replication of cursor types but it does make it harder to know the full list of supported cursor types without looking into the super class. For extensibility though this seems better

Copy link
Contributor

@ryankfu ryankfu Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: reorder values lexicographically for easier parsing and also to have the

// since cursor are expected to be comparable, ...

comment in the abstractDbSource method since that's the top-level where these methods override from

// since cursor are expected to be comparable, handle cursor typing strictly and error on
// unrecognized types
default -> false;
};
}

@Override
public void setStatementField(final PreparedStatement preparedStatement,
final int parameterIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
edgao marked this conversation as resolved.
Show resolved Hide resolved
import com.google.common.collect.Sets;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,14 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
return jsonNode;
}

@Override
public boolean isValidCursorType(final JDBCType cursorType) {
return switch (cursorType) {
case TIMESTAMP, TIMESTAMP_WITH_TIMEZONE, TIME, TIME_WITH_TIMEZONE, DATE, BIT, BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, REAL, NUMERIC, DECIMAL, CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR, BINARY, BLOB -> true;
default -> false;
};
}

@Override
public void setStatementField(final PreparedStatement preparedStatement,
final int parameterIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.createRecord;
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.map;
import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.setEmittedAtToNull;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand All @@ -24,16 +26,20 @@
import io.airbyte.db.factory.DSLContextFactory;
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.source.relationaldb.InvalidCursorException;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.math.BigDecimal;
import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -502,4 +508,45 @@ void testGetUsername() {
assertEquals(username, PostgresSource.getUsername(azureConfig));
}


@Test
public void tableWithInvalidCursorShouldThrowException() throws Exception {
try (final PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:13-alpine")) {
db.start();
final JsonNode config = getConfig(db);
try (final DSLContext dslContext = getDslContext(config)) {
final Database database = new Database(dslContext);
final ConfiguredAirbyteStream tableWithInvalidCursorType = createTableWithInvalidCursorType(database);
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(tableWithInvalidCursorType));

final Throwable throwable = catchThrowable(() -> MoreIterators.toSet(new PostgresSource().read(config, configuredAirbyteCatalog, null)));
assertThat(throwable).isInstanceOf(InvalidCursorException.class)
.hasMessageContaining(
"The following tables have invalid columns selected as cursor, please select a valid column as a cursor. {tableName='public.test_table', cursorColumnName='id', cursorSqlType=OTHER}");
} finally {
db.stop();
}
}
}

private ConfiguredAirbyteStream createTableWithInvalidCursorType(final Database database) throws SQLException {
database.query(ctx -> {
ctx.fetch("CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";");
ctx.fetch("CREATE TABLE IF NOT EXISTS public.test_table(id uuid PRIMARY KEY DEFAULT uuid_generate_v4());");
return null;
});

return new ConfiguredAirbyteStream().withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.INCREMENTAL)
.withStream(CatalogHelpers.createAirbyteStream(
"test_table",
"public",
Field.of("id", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))));

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.relationaldb.InvalidCursorException.InvalidCursorInfo;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory;
Expand Down Expand Up @@ -50,6 +51,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -125,6 +127,8 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
.collect(Collectors.toMap(t -> String.format("%s.%s", t.getNameSpace(), t.getName()), Function
.identity()));

validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog);

final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators =
getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, emittedAt);
final List<AutoCloseableIterator<AirbyteMessage>> fullRefreshIterators =
Expand All @@ -142,6 +146,41 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
});
}

private void validateCursorFieldForIncrementalTables(final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable, final ConfiguredAirbyteCatalog catalog) {
Copy link
Contributor

@ryankfu ryankfu Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless I'm mistaken, shouldn't this method have throws InvalidCursorException at the end to indicate that it could throw an Exception?

Another note is can we have a short javadoc comment that says something along the lines of

/**
 * Creates a list of incremental tables with invalid cursor columns (e.g. non-numeric types). Will also throw 
 * `InvalidCursorException` if at least one table includes an invalid cursor type
 */

EDIT: After chatting with Ed on this, since InvalidCursorException extends RuntimeException you don't need to define this in the method, preference would still be to have this defined either as a javadoc comment or within the method to just know in case the future someone wants to catch this Exception

final List<InvalidCursorInfo> tablesWithInvalidCursor = new ArrayList<>();
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(),
stream.getName());
final boolean hasSourceDefinedCursor =
!Objects.isNull(airbyteStream.getStream().getSourceDefinedCursor()) && airbyteStream.getStream().getSourceDefinedCursor();
if (!tableNameToTable.containsKey(fullyQualifiedTableName) || airbyteStream.getSyncMode() != SyncMode.INCREMENTAL || hasSourceDefinedCursor) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for my understanding: is there any situation where tableNameToTable.containsKey(fullyQualifiedTableName) isn't true? (totally fair to have this condition either way, I'm just curious how tableNameToTable gets populated)

continue;
}

final TableInfo<CommonField<DataType>> table = tableNameToTable
.get(fullyQualifiedTableName);
final String cursorField = IncrementalUtils.getCursorField(airbyteStream);
final DataType cursorType = table.getFields().stream()
.filter(info -> info.getName().equals(cursorField))
.map(CommonField::getType)
.findFirst()
.orElseThrow();

if (isValidCursorType(cursorType)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: this seems a little bit nicer (avoids using continue)

if (!isValidCursorType(cursorType)) {
  tablesWithInvalidCursor.add(new InvalidCursorInfo(fullyQualifiedTableName, cursorField, cursorType.toString()));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on this comment

continue;
}

tablesWithInvalidCursor.add(new InvalidCursorInfo(fullyQualifiedTableName, cursorField, cursorType.toString()));
}

if (!tablesWithInvalidCursor.isEmpty()) {
throw new InvalidCursorException(tablesWithInvalidCursor);
}
}

protected abstract boolean isValidCursorType(final DataType cursorType);

protected List<TableInfo<CommonField<DataType>>> discoverWithoutSystemTables(final Database database) throws Exception {
final Set<String> systemNameSpaces = getExcludedInternalNameSpaces();
final List<TableInfo<CommonField<DataType>>> discoveredTables = discoverInternal(database);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.airbyte.integrations.source.relationaldb;

import java.util.List;
import java.util.stream.Collectors;

public class InvalidCursorException extends RuntimeException {

public InvalidCursorException(final List<InvalidCursorInfo> tablesWithInvalidCursor) {
super("The following tables have invalid columns selected as cursor, please select a valid column as a cursor. " + tablesWithInvalidCursor.stream().map(InvalidCursorInfo::toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: maybe replace "a valid column" with "a column with a well-defined ordering"? So that it's clear why the cursor isn't valid

.collect(Collectors.joining(",")));
}

public record InvalidCursorInfo(String tableName, String cursorColumnName, String cursorSqlType) {

@Override
public String toString() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nitpick: I'd prefer to define a new method prettyString instead of overriding toString (mostly in case someone actually wants to print one of these out for debugging or something)

return "{" +
"tableName='" + tableName + '\'' +
", cursorColumnName='" + cursorColumnName + '\'' +
", cursorSqlType=" + cursorSqlType +
'}';
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ public void setJsonField(ResultSet resultSet, int colIndex, ObjectNode json) thr
}
}

@Override
public boolean isValidCursorType(final MysqlType cursorType) {
return switch (cursorType) {
case BIT, BOOLEAN, TINYINT, TINYINT_UNSIGNED, SMALLINT, SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED, INT, INT_UNSIGNED, BIGINT, BIGINT_UNSIGNED, FLOAT, FLOAT_UNSIGNED, DOUBLE, DOUBLE_UNSIGNED, DECIMAL, DECIMAL_UNSIGNED, DATE, DATETIME, TIMESTAMP, TIME, YEAR, CHAR, VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, ENUM, SET, TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY -> true;
Copy link
Contributor

@ryankfu ryankfu Aug 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a large case, would like to see if this can simply be reordered lexicographically to more easily parse out the options

EDIT: chatting with Ed on this I'm not hard set on lexicographically sorting since it can also be easier for people that look at the data as grouped sets of values, may like to still consider how "readable" this would be for someone looking to understand which types are supported though

default -> false;
};
}

@Override
public void setStatementField(PreparedStatement preparedStatement, int parameterIndex, MysqlType cursorFieldType, String value)
throws SQLException {
Expand Down