Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop sync on a null value in a cursor column #19889

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -621,5 +621,4 @@ protected List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final Configur
.map(Jsons::clone)
.collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
Expand Down Expand Up @@ -70,6 +71,7 @@
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -90,6 +92,14 @@ public class PostgresSource extends AbstractJdbcSource<PostgresType> implements
public static final String SSL_KEY = "sslkey";
public static final String SSL_PASSWORD = "sslpassword";
public static final String MODE = "mode";
public static final String NULL_CURSOR_VALUE_WITH_SCHEMA = "SELECT "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this query also cover VIEWS with null values?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I verified this with views as well.
Good idea to add a test of view. working on it

+ "(EXISTS (SELECT FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s')) "
+ "AND "
+ "(EXISTS (SELECT from %s.\"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s";
public static final String NULL_CURSOR_VALUE_NO_SCHEMA = "SELECT "
+ "(EXISTS (SELECT FROM information_schema.columns WHERE table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s')) "
+ "AND "
+ "(EXISTS (SELECT from \"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s";
private List<String> schemas;
private final FeatureFlags featureFlags;
private static final Set<String> INVALID_CDC_SSL_MODES = ImmutableSet.of("allow", "prefer");
Expand Down Expand Up @@ -467,7 +477,7 @@ public static void main(final String[] args) throws Exception {
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
if (PostgresUtils.isCdc(config)) {
if (config.has(SSL_MODE) && config.get(SSL_MODE).has(MODE)) {
String sslModeValue = config.get(SSL_MODE).get(MODE).asText();
final String sslModeValue = config.get(SSL_MODE).get(MODE).asText();
if (INVALID_CDC_SSL_MODES.contains(sslModeValue)) {
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
Expand Down Expand Up @@ -499,4 +509,24 @@ protected static String toSslJdbcParamInternal(final SslMode sslMode) {
return result;
}

@Override
protected boolean verifyCursorColumnValues(final JdbcDatabase database, final String schema, final String tableName, final String columnName) throws SQLException {
final String query;
final String resultColName = "nullValue";
// Query: Only if cursor column is NULLABLE, query whether it contains a NULL value
if (StringUtils.isNotBlank(schema)) {
query = String.format(NULL_CURSOR_VALUE_WITH_SCHEMA,
schema, tableName, columnName, schema, tableName, columnName, resultColName);
} else {
query = String.format(NULL_CURSOR_VALUE_NO_SCHEMA,
tableName, columnName, tableName, columnName, resultColName);
}
LOGGER.debug("null value query: {}", query);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(query),
akashkulk marked this conversation as resolved.
Show resolved Hide resolved
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
final boolean nullValExist = jsonNodes.get(0).get(resultColName.toLowerCase()).booleanValue(); // For some reason value in node is lowercase
LOGGER.debug("null value exist: {}", nullValExist);
return !nullValExist;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@

public abstract class AbstractPostgresSourceSSLCertificateAcceptanceTest extends SourceAcceptanceTest {

private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "public.testview";

private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "testview";
private static final String SCHEMA_NAME = "public";
private PostgreSQLContainer<?> container;
private JsonNode config;
protected static final String PASSWORD = "Passw0rd";
Expand Down Expand Up @@ -116,28 +116,31 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
STREAM_NAME, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2,
STREAM_NAME2, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME_MATERIALIZED_VIEW,
STREAM_NAME_MATERIALIZED_VIEW, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@

public abstract class AbstractSshPostgresSourceAcceptanceTest extends SourceAcceptanceTest {

private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private static final String SCHEMA_NAME = "public";
private static final Network network = Network.newNetwork();
private static JsonNode config;
private final SshBastionContainer bastion = new SshBastionContainer();
Expand Down Expand Up @@ -118,19 +119,21 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
STREAM_NAME, SCHEMA_NAME,
Field.of("id", JsonSchemaType.INTEGER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2,
STREAM_NAME2, SCHEMA_NAME,
Field.of("id", JsonSchemaType.INTEGER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@

public class PostgresSourceAcceptanceTest extends SourceAcceptanceTest {

private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "public.testview";
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "testview";
private static final String SCHEMA_NAME = "public";

private PostgreSQLContainer<?> container;
private JsonNode config;
Expand Down Expand Up @@ -107,28 +108,31 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
STREAM_NAME, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2,
STREAM_NAME2, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME_MATERIALIZED_VIEW,
STREAM_NAME_MATERIALIZED_VIEW, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.util.HashMap;
import java.util.List;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junitpioneer.jupiter.SetEnvironmentVariable;
Expand All @@ -42,9 +43,9 @@
value = "CLOUD")
public class PostgresSourceStrictEncryptAcceptanceTest extends SourceAcceptanceTest {

private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";

private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private static final String SCHEMA_NAME = "public";
private PostgreSQLContainer<?> container;
private JsonNode config;

Expand Down Expand Up @@ -125,19 +126,21 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
STREAM_NAME, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2,
STREAM_NAME2, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ public void tableWithInvalidCursorShouldThrowException() throws Exception {
final Throwable throwable = catchThrowable(() -> MoreIterators.toSet(new PostgresSource().read(config, configuredAirbyteCatalog, null)));
assertThat(throwable).isInstanceOf(ConfigErrorException.class)
.hasMessageContaining(
"The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering as a cursor. {tableName='public.test_table', cursorColumnName='id', cursorSqlType=OTHER}");
"The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering as a cursor. {tableName='public.test_table', cursorColumnName='id', cursorSqlType=OTHER, cause=Unsupported cursor type}");
} finally {
db.stop();
}
Expand Down Expand Up @@ -544,4 +544,44 @@ private JsonNode buildConfigEscapingNeeded() {
JdbcUtils.SSL_KEY, "false"));
}

@Test
public void tableWithNullValueCursorShouldThrowException() throws SQLException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add a similar test where we have a VIEW with null value for cursor column

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

try (final PostgreSQLContainer<?> db = new PostgreSQLContainer<>("postgres:13-alpine")) {
db.start();
final JsonNode config = getConfig(db);
try (final DSLContext dslContext = getDslContext(config)) {
final Database database = new Database(dslContext);
final ConfiguredAirbyteStream table = createTableWithNullValueCursor(database);
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(table));

final Throwable throwable = catchThrowable(() -> MoreIterators.toSet(new PostgresSource().read(config, catalog, null)));
assertThat(throwable).isInstanceOf(ConfigErrorException.class)
.hasMessageContaining(
"The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering as a cursor. {tableName='public.test_table_null_cursor', cursorColumnName='id', cursorSqlType=INTEGER, cause=Cursor column contains NULL value}");

} finally {
db.stop();
}
}
}

private ConfiguredAirbyteStream createTableWithNullValueCursor(final Database database) throws SQLException {
database.query(ctx -> {
ctx.fetch("CREATE TABLE IF NOT EXISTS public.test_table_null_cursor(id INTEGER NULL);");
ctx.fetch("INSERT INTO public.test_table_null_cursor(id) VALUES (1), (2), (NULL);");
return null;
});

return new ConfiguredAirbyteStream().withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withSyncMode(SyncMode.INCREMENTAL)
.withStream(CatalogHelpers.createAirbyteStream(
"test_table_null_cursor",
"public",
Field.of("id", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
Function
.identity()));

validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog);
validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog, database);

final List<AutoCloseableIterator<AirbyteMessage>> incrementalIterators =
getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager,
Expand All @@ -182,7 +182,7 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,

private void validateCursorFieldForIncrementalTables(
final Map<String, TableInfo<CommonField<DataType>>> tableNameToTable,
final ConfiguredAirbyteCatalog catalog) {
final ConfiguredAirbyteCatalog catalog, final Database database) throws SQLException {
final List<InvalidCursorInfo> tablesWithInvalidCursor = new ArrayList<>();
for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) {
final AirbyteStream stream = airbyteStream.getStream();
Expand Down Expand Up @@ -211,7 +211,15 @@ private void validateCursorFieldForIncrementalTables(
if (!isCursorType(cursorType)) {
tablesWithInvalidCursor.add(
new InvalidCursorInfo(fullyQualifiedTableName, cursorField.get(),
cursorType.toString()));
cursorType.toString(), "Unsupported cursor type"));
continue;
}

if (!verifyCursorColumnValues(database, stream.getNamespace(), stream.getName(), cursorField.get())) {
tablesWithInvalidCursor.add(
new InvalidCursorInfo(fullyQualifiedTableName, cursorField.get(),
cursorType.toString(), "Cursor column contains NULL value"));
continue;
}
}

Expand All @@ -221,6 +229,16 @@ private void validateCursorFieldForIncrementalTables(
}
}

/**
* Verify that cursor column allows syncing to go through.
* @param database database
* @return true if syncing can go through. false otherwise
* @throws SQLException exception
*/
protected boolean verifyCursorColumnValues(final Database database, final String schema, final String tableName, final String columnName) throws SQLException {
/* no-op */
return true;
}
private List<TableInfo<CommonField<DataType>>> discoverWithoutSystemTables(
final Database database)
throws Exception {
Expand Down
Loading