Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 Postgres source: retrieve only the tables in the publication under cdc mode #14447

Merged
merged 14 commits into from
Jul 10, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -747,7 +747,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.4.30
dockerImageTag: 0.4.31
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6854,7 +6854,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:0.4.30"
- dockerImage: "airbyte/source-postgres:0.4.31"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

public class DebeziumEventUtils {

public static final String CDC_LSN = "_ab_cdc_lsn";
public static final String CDC_UPDATED_AT = "_ab_cdc_updated_at";
public static final String CDC_DELETED_AT = "_ab_cdc_deleted_at";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.30
LABEL io.airbyte.version=0.4.31
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.30
LABEL io.airbyte.version=0.4.31
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.postgres;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.debezium.internals.DebeziumEventUtils;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PostgresCdcCatalogHelper {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresCdcCatalogHelper.class);

private PostgresCdcCatalogHelper() {}

/*
* It isn't possible to recreate the state of the original database unless we include extra
* information (like an oid) when using logical replication. By limiting to Full Refresh when we
* don't have a primary key we dodge the problem for now. As a work around a CDC and non-CDC source
* could be configured if there's a need to replicate a large non-PK table.
*
* Note: in place mutation.
*/
public static AirbyteStream removeIncrementalWithoutPk(final AirbyteStream stream) {
if (stream.getSourceDefinedPrimaryKey().isEmpty()) {
final List<SyncMode> syncModes = new ArrayList<>(stream.getSupportedSyncModes());
syncModes.remove(SyncMode.INCREMENTAL);
stream.setSupportedSyncModes(syncModes);
}

return stream;
}

/*
* Set all streams that do have incremental to sourceDefined, so that the user cannot set or
* override a cursor field.
*
* Note: in place mutation.
*/
public static AirbyteStream setIncrementalToSourceDefined(final AirbyteStream stream) {
if (stream.getSupportedSyncModes().contains(SyncMode.INCREMENTAL)) {
stream.setSourceDefinedCursor(true);
}

return stream;
}

// Note: in place mutation.
public static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) {
final ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema();
final ObjectNode properties = (ObjectNode) jsonSchema.get("properties");

final JsonNode stringType = Jsons.jsonNode(ImmutableMap.of("type", "string"));
final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number"));
properties.set(DebeziumEventUtils.CDC_LSN, numberType);
properties.set(DebeziumEventUtils.CDC_UPDATED_AT, stringType);
properties.set(DebeziumEventUtils.CDC_DELETED_AT, stringType);

return stream;
}

/**
* @return tables included in the publication. When it is not CDC mode, returns an empty set.
*/
public static Set<AirbyteStreamNameNamespacePair> getPublicizedTables(final JdbcDatabase database) throws SQLException {
final JsonNode sourceConfig = database.getSourceConfig();
if (sourceConfig == null || !PostgresUtils.isCdc(sourceConfig)) {
return Collections.emptySet();
}

final String publication = sourceConfig.get("replication_method").get("publication").asText();
final List<JsonNode> tablesInPublication = database.queryJsons(
"SELECT schemaname, tablename FROM pg_publication_tables WHERE pubname = ?", publication);
final Set<AirbyteStreamNameNamespacePair> publicizedTables = tablesInPublication.stream()
.map(table -> new AirbyteStreamNameNamespacePair(table.get("tablename").asText(), table.get("schemaname").asText()))
.collect(Collectors.toSet());
LOGGER.info("For CDC, only tables in publication {} will be included in the sync: {}", publication,
publicizedTables.stream().map(pair -> pair.getNamespace() + "." + pair.getName()).toList());

return publicizedTables;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,17 @@

package io.airbyte.integrations.source.postgres;

import static io.airbyte.integrations.source.postgres.PostgresSource.CDC_LSN;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.integrations.debezium.CdcMetadataInjector;
import io.airbyte.integrations.debezium.internals.DebeziumEventUtils;

public class PostgresCdcConnectorMetadataInjector implements CdcMetadataInjector {

@Override
public void addMetaData(final ObjectNode event, final JsonNode source) {
final long lsn = source.get("lsn").asLong();
event.put(CDC_LSN, lsn);
event.put(DebeziumEventUtils.CDC_LSN, lsn);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,10 @@
package io.airbyte.integrations.source.postgres;

import static io.airbyte.integrations.debezium.AirbyteDebeziumHandler.shouldUseCDC;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT;
import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toSet;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
Expand All @@ -36,8 +33,8 @@
import io.airbyte.integrations.source.relationaldb.state.StateManager;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
Expand All @@ -46,7 +43,6 @@
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import java.sql.Connection;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
Expand All @@ -66,7 +62,6 @@
public class PostgresSource extends AbstractJdbcSource<JDBCType> implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSource.class);
public static final String CDC_LSN = "_ab_cdc_lsn";

static final String DRIVER_CLASS = DatabaseDriver.POSTGRESQL.getDriverClassName();
private List<String> schemas;
Expand Down Expand Up @@ -138,11 +133,11 @@ public Set<String> getExcludedInternalNameSpaces() {
public AirbyteCatalog discover(final JsonNode config) throws Exception {
final AirbyteCatalog catalog = super.discover(config);

if (isCdc(config)) {
if (PostgresUtils.isCdc(config)) {
final List<AirbyteStream> streams = catalog.getStreams().stream()
.map(PostgresSource::removeIncrementalWithoutPk)
.map(PostgresSource::setIncrementalToSourceDefined)
.map(PostgresSource::addCdcMetadataColumns)
.map(PostgresCdcCatalogHelper::removeIncrementalWithoutPk)
.map(PostgresCdcCatalogHelper::setIncrementalToSourceDefined)
.map(PostgresCdcCatalogHelper::addCdcMetadataColumns)
.collect(toList());

catalog.setStreams(streams);
Expand All @@ -153,6 +148,19 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception {

@Override
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(final JdbcDatabase database) throws Exception {
final List<TableInfo<CommonField<JDBCType>>> rawTables = discoverRawTables(database);
final Set<AirbyteStreamNameNamespacePair> publicizedTablesInCdc = PostgresCdcCatalogHelper.getPublicizedTables(database);

if (publicizedTablesInCdc.isEmpty()) {
return rawTables;
}
// under cdc mode, only return tables that are in the publication
return rawTables.stream()
.filter(table -> publicizedTablesInCdc.contains(new AirbyteStreamNameNamespacePair(table.getName(), table.getNameSpace())))
.collect(toList());
}

public List<TableInfo<CommonField<JDBCType>>> discoverRawTables(final JdbcDatabase database) throws Exception {
if (schemas != null && !schemas.isEmpty()) {
// process explicitly selected (from UI) schemas
final List<TableInfo<CommonField<JDBCType>>> internals = new ArrayList<>();
Expand All @@ -177,7 +185,7 @@ public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final J
final List<CheckedConsumer<JdbcDatabase, Exception>> checkOperations = new ArrayList<>(
super.getCheckOperations(config));

if (isCdc(config)) {
if (PostgresUtils.isCdc(config)) {
checkOperations.add(database -> {
final List<JsonNode> matchingSlots = database.queryJsons(connection -> {
final String sql = "SELECT * FROM pg_replication_slots WHERE slot_name = ? AND plugin = ? AND database = ?";
Expand All @@ -186,8 +194,7 @@ public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final J
ps.setString(2, PostgresUtils.getPluginValue(config.get("replication_method")));
ps.setString(3, config.get("database").asText());

LOGGER.info(
"Attempting to find the named replication slot using the query: " + ps.toString());
LOGGER.info("Attempting to find the named replication slot using the query: {}", ps);

return ps;
}, sourceOperations::rowToJson);
Expand Down Expand Up @@ -244,7 +251,7 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(
final StateManager stateManager,
final Instant emittedAt) {
final JsonNode sourceConfig = database.getSourceConfig();
if (isCdc(sourceConfig) && shouldUseCDC(catalog)) {
if (PostgresUtils.isCdc(sourceConfig) && shouldUseCDC(catalog)) {
final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig,
PostgresCdcTargetPosition.targetPosition(database), false);
final PostgresCdcStateHandler postgresCdcStateHandler = new PostgresCdcStateHandler(stateManager);
Expand Down Expand Up @@ -286,31 +293,6 @@ protected List<ConfiguredAirbyteStream> identifyStreamsToSnapshot(final Configur
.collect(Collectors.toList());
}

@VisibleForTesting
static boolean isCdc(final JsonNode config) {
final boolean isCdc = config.hasNonNull("replication_method")
&& config.get("replication_method").hasNonNull("replication_slot")
&& config.get("replication_method").hasNonNull("publication");
LOGGER.info("using CDC: {}", isCdc);
return isCdc;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is moved to PostgresUtils.


/*
* It isn't possible to recreate the state of the original database unless we include extra
* information (like an oid) when using logical replication. By limiting to Full Refresh when we
* don't have a primary key we dodge the problem for now. As a work around a CDC and non-CDC source
* could be configured if there's a need to replicate a large non-PK table.
*
* Note: in place mutation.
*/
private static AirbyteStream removeIncrementalWithoutPk(final AirbyteStream stream) {
if (stream.getSourceDefinedPrimaryKey().isEmpty()) {
stream.getSupportedSyncModes().remove(SyncMode.INCREMENTAL);
}

return stream;
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These methods related to CDC catalog manipulation are moved to PostgresCdcCatalogHelper.


@Override
public Set<JdbcPrivilegeDto> getPrivilegesTableForCurrentUser(final JdbcDatabase database,
final String schema)
Expand Down Expand Up @@ -364,34 +346,6 @@ protected boolean isNotInternalSchema(final JsonNode jsonNode, final Set<String>
return false;
}

/*
* Set all streams that do have incremental to sourceDefined, so that the user cannot set or
* override a cursor field.
*
* Note: in place mutation.
*/
private static AirbyteStream setIncrementalToSourceDefined(final AirbyteStream stream) {
if (stream.getSupportedSyncModes().contains(SyncMode.INCREMENTAL)) {
stream.setSourceDefinedCursor(true);
}

return stream;
}

// Note: in place mutation.
private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) {
final ObjectNode jsonSchema = (ObjectNode) stream.getJsonSchema();
final ObjectNode properties = (ObjectNode) jsonSchema.get("properties");

final JsonNode stringType = Jsons.jsonNode(ImmutableMap.of("type", "string"));
final JsonNode numberType = Jsons.jsonNode(ImmutableMap.of("type", "number"));
properties.set(CDC_LSN, numberType);
properties.set(CDC_UPDATED_AT, stringType);
properties.set(CDC_DELETED_AT, stringType);

return stream;
}

// TODO This is a temporary override so that the Postgres source can take advantage of per-stream
// state
@Override
Expand All @@ -410,7 +364,7 @@ protected List<AirbyteStateMessage> generateEmptyInitialState(final JsonNode con

@Override
protected AirbyteStateType getSupportedStateType(final JsonNode config) {
return isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM;
return PostgresUtils.isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM;
}

public static void main(final String[] args) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,25 @@
package io.airbyte.integrations.source.postgres;

import com.fasterxml.jackson.databind.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PostgresUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(PostgresUtils.class);

private static final String PGOUTPUT_PLUGIN = "pgoutput";

public static String getPluginValue(final JsonNode field) {
return field.has("plugin") ? field.get("plugin").asText() : PGOUTPUT_PLUGIN;
}

public static boolean isCdc(final JsonNode config) {
final boolean isCdc = config.hasNonNull("replication_method")
&& config.get("replication_method").hasNonNull("replication_slot")
&& config.get("replication_method").hasNonNull("publication");
LOGGER.info("using CDC: {}", isCdc);
return isCdc;
}

}
Loading