Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stop sync on a null value in a cursor column #19889

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1249,7 +1249,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 1.0.30
dockerImageTag: 1.0.31
documentationUrl: https://docs.airbyte.com/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
4 changes: 2 additions & 2 deletions airbyte-config/init/src/main/resources/seed/source_specs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11351,7 +11351,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:1.0.30"
- dockerImage: "airbyte/source-postgres:1.0.31"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down Expand Up @@ -11600,7 +11600,7 @@
enum:
- "pgoutput"
- "wal2json"
const: "pgoutput"
default: "pgoutput"
order: 2
replication_slot:
type: "string"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,5 +621,4 @@ protected List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final Configur
.map(Jsons::clone)
.collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.30

LABEL io.airbyte.version=1.0.31
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.30

LABEL io.airbyte.version=1.0.31
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
Expand Down Expand Up @@ -70,6 +71,7 @@
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -90,6 +92,20 @@ public class PostgresSource extends AbstractJdbcSource<PostgresType> implements
public static final String SSL_KEY = "sslkey";
public static final String SSL_PASSWORD = "sslpassword";
public static final String MODE = "mode";
public static final String NULL_CURSOR_VALUE_WITH_SCHEMA =
"""
SELECT
(EXISTS (SELECT FROM information_schema.columns WHERE table_schema = '%s' AND table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s'))
AND
(EXISTS (SELECT from %s.\"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s
""";
public static final String NULL_CURSOR_VALUE_NO_SCHEMA =
"""
SELECT
(EXISTS (SELECT FROM information_schema.columns WHERE table_name = '%s' AND is_nullable = 'YES' AND column_name = '%s'))
AND
(EXISTS (SELECT from \"%s\" where \"%s\" IS NULL LIMIT 1)) AS %s
""";
private List<String> schemas;
private final FeatureFlags featureFlags;
private static final Set<String> INVALID_CDC_SSL_MODES = ImmutableSet.of("allow", "prefer");
Expand Down Expand Up @@ -468,7 +484,7 @@ public static void main(final String[] args) throws Exception {
public AirbyteConnectionStatus check(final JsonNode config) throws Exception {
if (PostgresUtils.isCdc(config)) {
if (config.has(SSL_MODE) && config.get(SSL_MODE).has(MODE)) {
String sslModeValue = config.get(SSL_MODE).get(MODE).asText();
final String sslModeValue = config.get(SSL_MODE).get(MODE).asText();
if (INVALID_CDC_SSL_MODES.contains(sslModeValue)) {
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
Expand Down Expand Up @@ -500,4 +516,24 @@ protected static String toSslJdbcParamInternal(final SslMode sslMode) {
return result;
}

@Override
protected boolean verifyCursorColumnValues(final JdbcDatabase database, final String schema, final String tableName, final String columnName) throws SQLException {
final String query;
final String resultColName = "nullValue";
// Query: Only if cursor column allows null values, query whether it contains one
if (StringUtils.isNotBlank(schema)) {
query = String.format(NULL_CURSOR_VALUE_WITH_SCHEMA,
schema, tableName, columnName, schema, tableName, columnName, resultColName);
} else {
query = String.format(NULL_CURSOR_VALUE_NO_SCHEMA,
tableName, columnName, tableName, columnName, resultColName);
}
LOGGER.debug("null value query: {}", query);
final List<JsonNode> jsonNodes = database.bufferedResultSetQuery(conn -> conn.createStatement().executeQuery(query),
akashkulk marked this conversation as resolved.
Show resolved Hide resolved
resultSet -> JdbcUtils.getDefaultSourceOperations().rowToJson(resultSet));
Preconditions.checkState(jsonNodes.size() == 1);
final boolean nullValExist = jsonNodes.get(0).get(resultColName.toLowerCase()).booleanValue(); // For some reason value in node is lowercase
LOGGER.debug("null value exist: {}", nullValExist);
return !nullValExist;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,12 @@
@ExtendWith(SystemStubsExtension.class)
public abstract class AbstractPostgresSourceSSLCertificateAcceptanceTest extends SourceAcceptanceTest {

private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "testview";
private static final String SCHEMA_NAME = "public";
@SystemStub
private EnvironmentVariables environmentVariables;

private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "public.testview";

private PostgreSQLContainer<?> container;
private JsonNode config;
protected static final String PASSWORD = "Passw0rd";
Expand Down Expand Up @@ -127,28 +126,31 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
STREAM_NAME, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2,
STREAM_NAME2, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME_MATERIALIZED_VIEW,
STREAM_NAME_MATERIALIZED_VIEW, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@
@ExtendWith(SystemStubsExtension.class)
public abstract class AbstractSshPostgresSourceAcceptanceTest extends SourceAcceptanceTest {

private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private static final String SCHEMA_NAME = "public";
@SystemStub
private EnvironmentVariables environmentVariables;

private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private static final Network network = Network.newNetwork();
private static JsonNode config;
private final SshBastionContainer bastion = new SshBastionContainer();
Expand Down Expand Up @@ -128,19 +128,21 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
STREAM_NAME, SCHEMA_NAME,
Field.of("id", JsonSchemaType.INTEGER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2,
STREAM_NAME2, SCHEMA_NAME,
Field.of("id", JsonSchemaType.INTEGER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,12 @@

@ExtendWith(SystemStubsExtension.class)
public class PostgresSourceAcceptanceTest extends SourceAcceptanceTest {

private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "testview";
private static final String SCHEMA_NAME = "public";
@SystemStub
private EnvironmentVariables environmentVariables;

private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private static final String STREAM_NAME_MATERIALIZED_VIEW = "public.testview";
public static final String LIMIT_PERMISSION_SCHEMA = "limit_perm_schema";
public static final String LIMIT_PERMISSION_ROLE = "limit_perm_role";
public static final String LIMIT_PERMISSION_ROLE_PASSWORD = "test";
Expand All @@ -66,9 +65,9 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc

container = new PostgreSQLContainer<>("postgres:13-alpine");
container.start();
String username = container.getUsername();
String password = container.getPassword();
List<String> schemas = List.of("public");
final String username = container.getUsername();
final String password = container.getPassword();
final List<String> schemas = List.of("public");
config = getConfig(username, password, schemas);
try (final DSLContext dslContext = DSLContextFactory.create(
config.get(JdbcUtils.USERNAME_KEY).asText(),
Expand All @@ -93,7 +92,7 @@ protected void setupEnvironment(final TestDestinationEnv environment) throws Exc
}
}

private JsonNode getConfig(String username, String password, List<String> schemas) {
private JsonNode getConfig(final String username, final String password, final List<String> schemas) {
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "Standard")
.build());
Expand Down Expand Up @@ -170,19 +169,19 @@ public void testDiscoverWithRevokingSchemaPermissions() throws Exception {
config = getConfig(LIMIT_PERMISSION_ROLE, LIMIT_PERMISSION_ROLE_PASSWORD, List.of(LIMIT_PERMISSION_SCHEMA));

runDiscover();
AirbyteCatalog lastPersistedCatalogSecond = getLastPersistedCatalog();
final AirbyteCatalog lastPersistedCatalogSecond = getLastPersistedCatalog();
final String assertionMessageWithoutPermission = "Expected no streams after discover for user without schema permissions";
assertTrue(lastPersistedCatalogSecond.getStreams().isEmpty(), assertionMessageWithoutPermission);
}

private void revokeSchemaPermissions(Database database) throws SQLException {
private void revokeSchemaPermissions(final Database database) throws SQLException {
database.query(ctx -> {
ctx.fetch(String.format("REVOKE USAGE ON schema %s FROM %s;", LIMIT_PERMISSION_SCHEMA, LIMIT_PERMISSION_ROLE));
return null;
});
}

private void prepareEnvForUserWithoutPermissions(Database database) throws SQLException {
private void prepareEnvForUserWithoutPermissions(final Database database) throws SQLException {
database.query(ctx -> {
ctx.fetch(String.format("CREATE ROLE %s WITH LOGIN PASSWORD '%s';", LIMIT_PERMISSION_ROLE, LIMIT_PERMISSION_ROLE_PASSWORD));
ctx.fetch(String.format("CREATE SCHEMA %s;", LIMIT_PERMISSION_SCHEMA));
Expand All @@ -202,28 +201,31 @@ private ConfiguredAirbyteCatalog getCommonConfigCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
STREAM_NAME, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2,
STREAM_NAME2, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME_MATERIALIZED_VIEW,
STREAM_NAME_MATERIALIZED_VIEW, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))))));
}

private ConfiguredAirbyteCatalog getLimitPermissionConfiguredCatalog() {
Expand All @@ -233,7 +235,7 @@ private ConfiguredAirbyteCatalog getLimitPermissionConfiguredCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
LIMIT_PERMISSION_SCHEMA + "." + "id_and_name",
"id_and_name", LIMIT_PERMISSION_SCHEMA,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.util.HashMap;
import java.util.List;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.extension.ExtendWith;
Expand All @@ -45,12 +46,11 @@
@ExtendWith(SystemStubsExtension.class)
public class PostgresSourceStrictEncryptAcceptanceTest extends SourceAcceptanceTest {

private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "starships";
private static final String SCHEMA_NAME = "public";
@SystemStub
private EnvironmentVariables environmentVariables;

private static final String STREAM_NAME = "public.id_and_name";
private static final String STREAM_NAME2 = "public.starships";

private PostgreSQLContainer<?> container;
private JsonNode config;

Expand Down Expand Up @@ -133,19 +133,21 @@ protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME,
STREAM_NAME, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))),
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))),
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("id"))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
STREAM_NAME2,
STREAM_NAME2, SCHEMA_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id"))))));
}

@Override
Expand Down
Loading