Skip to content

Commit

Permalink
🎉 Postgres source: sync data from beginning if lsn is no longer valid…
Browse files Browse the repository at this point in the history
… in cdc (#15077)

* work in progress

* cleanup

* add test

* introduce tests for state parsing util class

* enable test via feature flag

* review comments

* Bump versions

* auto-bump connector version [ci skip]

Co-authored-by: Liren Tu <tuliren.git@outlook.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 3, 2022
1 parent d57a6a6 commit f4b4863
Show file tree
Hide file tree
Showing 13 changed files with 499 additions and 111 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -762,7 +762,7 @@
- name: Postgres
sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.4.40
dockerImageTag: 0.4.41
documentationUrl: https://docs.airbyte.io/integrations/sources/postgres
icon: postgresql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7140,7 +7140,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-postgres:0.4.40"
- dockerImage: "airbyte/source-postgres:0.4.41"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/postgres"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.codehaus.plexus.util.StringUtils;

public class DebeziumPropertiesManager {

private final JsonNode config;
private final AirbyteFileOffsetBackingStore offsetManager;
private final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager;

private final Properties properties;
private final ConfiguredAirbyteCatalog catalog;

public DebeziumPropertiesManager(final Properties properties,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final AirbyteFileOffsetBackingStore offsetManager,
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager) {
this.properties = properties;
this.config = config;
this.catalog = catalog;
this.offsetManager = offsetManager;
this.schemaHistoryManager = schemaHistoryManager;
}

protected Properties getDebeziumProperties() {
final Properties props = new Properties();
props.putAll(properties);

// debezium engine configuration
props.setProperty("name", "engine");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString());
props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer
// default values from debezium CommonConnectorConfig
props.setProperty("max.batch.size", "2048");
props.setProperty("max.queue.size", "8192");

if (schemaHistoryManager.isPresent()) {
// https://debezium.io/documentation/reference/1.9/operations/debezium-server.html#debezium-source-database-history-class
// https://debezium.io/documentation/reference/development/engine.html#_in_the_code
// As mentioned in the documents above, debezium connector for MySQL needs to track the schema
// changes. If we don't do this, we can't fetch records for the table.
props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
props.setProperty("database.history.file.filename", schemaHistoryManager.get().getPath().toString());
}

// https://debezium.io/documentation/reference/configuration/avro.html
props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("value.converter.schemas.enable", "false");

// debezium names
props.setProperty("name", config.get(JdbcUtils.DATABASE_KEY).asText());
props.setProperty("database.server.name", config.get(JdbcUtils.DATABASE_KEY).asText());

// db connection configuration
props.setProperty("database.hostname", config.get(JdbcUtils.HOST_KEY).asText());
props.setProperty("database.port", config.get(JdbcUtils.PORT_KEY).asText());
props.setProperty("database.user", config.get(JdbcUtils.USERNAME_KEY).asText());
props.setProperty("database.dbname", config.get(JdbcUtils.DATABASE_KEY).asText());

if (config.has(JdbcUtils.PASSWORD_KEY)) {
props.setProperty("database.password", config.get(JdbcUtils.PASSWORD_KEY).asText());
}

// By default "decimal.handing.mode=precise" which's caused returning this value as a binary.
// The "double" type may cause a loss of precision, so set Debezium's config to store it as a String
// explicitly in its Kafka messages for more details see:
// https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-decimal-types
// https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation
props.setProperty("decimal.handling.mode", "string");

// table selection
final String tableWhitelist = getTableWhitelist(catalog);
props.setProperty("table.include.list", tableWhitelist);

return props;
}

public static String getTableWhitelist(final ConfiguredAirbyteCatalog catalog) {
return catalog.getStreams().stream()
.filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL)
.map(ConfiguredAirbyteStream::getStream)
.map(stream -> stream.getNamespace() + "." + stream.getName())
// debezium needs commas escaped to split properly
.map(x -> StringUtils.escape(x, new char[] {','}, "\\,"))
.collect(Collectors.joining(","));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,7 @@
package io.airbyte.integrations.debezium.internals;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
Expand All @@ -23,42 +19,31 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.codehaus.plexus.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The purpose of this class is to intiliaze and spawn the debezium engine with the right properties
* to fetch records
* The purpose of this class is to initialize and spawn the debezium engine with the right
* properties to fetch records
*/
public class DebeziumRecordPublisher implements AutoCloseable {

private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumRecordPublisher.class);
private final ExecutorService executor;
private DebeziumEngine<ChangeEvent<String, String>> engine;

private final JsonNode config;
private final AirbyteFileOffsetBackingStore offsetManager;
private final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager;

private final AtomicBoolean hasClosed;
private final AtomicBoolean isClosing;
private final AtomicReference<Throwable> thrownError;
private final CountDownLatch engineLatch;
private final Properties properties;
private final ConfiguredAirbyteCatalog catalog;
private final DebeziumPropertiesManager debeziumPropertiesManager;

public DebeziumRecordPublisher(final Properties properties,
final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final AirbyteFileOffsetBackingStore offsetManager,
final Optional<AirbyteSchemaHistoryStorage> schemaHistoryManager) {
this.properties = properties;
this.config = config;
this.catalog = catalog;
this.offsetManager = offsetManager;
this.schemaHistoryManager = schemaHistoryManager;
this.debeziumPropertiesManager = new DebeziumPropertiesManager(properties, config, catalog, offsetManager,
schemaHistoryManager);
this.hasClosed = new AtomicBoolean(false);
this.isClosing = new AtomicBoolean(false);
this.thrownError = new AtomicReference<>();
Expand All @@ -68,7 +53,7 @@ public DebeziumRecordPublisher(final Properties properties,

public void start(final Queue<ChangeEvent<String, String>> queue) {
engine = DebeziumEngine.create(Json.class)
.using(getDebeziumProperties())
.using(debeziumPropertiesManager.getDebeziumProperties())
.using(new OffsetCommitPolicy.AlwaysCommitOffsetPolicy())
.notifying(e -> {
// debezium outputs a tombstone event that has a value of null. this is an artifact of how it
Expand Down Expand Up @@ -120,69 +105,4 @@ public void close() throws Exception {
}
}

protected Properties getDebeziumProperties() {
final Properties props = new Properties();
props.putAll(properties);

// debezium engine configuration
props.setProperty("name", "engine");
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore");
props.setProperty("offset.storage.file.filename", offsetManager.getOffsetFilePath().toString());
props.setProperty("offset.flush.interval.ms", "1000"); // todo: make this longer
// default values from debezium CommonConnectorConfig
props.setProperty("max.batch.size", "2048");
props.setProperty("max.queue.size", "8192");

if (schemaHistoryManager.isPresent()) {
// https://debezium.io/documentation/reference/1.9/operations/debezium-server.html#debezium-source-database-history-class
// https://debezium.io/documentation/reference/development/engine.html#_in_the_code
// As mentioned in the documents above, debezium connector for MySQL needs to track the schema
// changes. If we don't do this, we can't fetch records for the table.
props.setProperty("database.history", "io.debezium.relational.history.FileDatabaseHistory");
props.setProperty("database.history.file.filename", schemaHistoryManager.get().getPath().toString());
}

// https://debezium.io/documentation/reference/configuration/avro.html
props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("value.converter.schemas.enable", "false");

// debezium names
props.setProperty("name", config.get(JdbcUtils.DATABASE_KEY).asText());
props.setProperty("database.server.name", config.get(JdbcUtils.DATABASE_KEY).asText());

// db connection configuration
props.setProperty("database.hostname", config.get(JdbcUtils.HOST_KEY).asText());
props.setProperty("database.port", config.get(JdbcUtils.PORT_KEY).asText());
props.setProperty("database.user", config.get(JdbcUtils.USERNAME_KEY).asText());
props.setProperty("database.dbname", config.get(JdbcUtils.DATABASE_KEY).asText());

if (config.has(JdbcUtils.PASSWORD_KEY)) {
props.setProperty("database.password", config.get(JdbcUtils.PASSWORD_KEY).asText());
}

// By default "decimal.handing.mode=precise" which's caused returning this value as a binary.
// The "double" type may cause a loss of precision, so set Debezium's config to store it as a String
// explicitly in its Kafka messages for more details see:
// https://debezium.io/documentation/reference/1.9/connectors/postgresql.html#postgresql-decimal-types
// https://debezium.io/documentation/faq/#how_to_retrieve_decimal_field_from_binary_representation
props.setProperty("decimal.handling.mode", "string");

// table selection
final String tableWhitelist = getTableWhitelist(catalog);
props.setProperty("table.include.list", tableWhitelist);

return props;
}

@VisibleForTesting
public static String getTableWhitelist(final ConfiguredAirbyteCatalog catalog) {
return catalog.getStreams().stream()
.filter(s -> s.getSyncMode() == SyncMode.INCREMENTAL)
.map(ConfiguredAirbyteStream::getStream)
.map(stream -> stream.getNamespace() + "." + stream.getName())
// debezium needs commas escaped to split properly
.map(x -> StringUtils.escape(x, new char[] {','}, "\\,"))
.collect(Collectors.joining(","));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.debezium.internals;

import io.airbyte.commons.json.Jsons;
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.PostgresOffsetContext;
import io.debezium.connector.postgresql.PostgresOffsetContext.Loader;
import java.util.Collections;
import java.util.Map;

public class PostgresCustomLoader extends Loader {

private Map<String, ?> offset;

public PostgresCustomLoader(PostgresConnectorConfig connectorConfig) {
super(connectorConfig);
}

@Override
public PostgresOffsetContext load(Map<String, ?> offset) {
this.offset = Jsons.clone(offset);
return super.load(offset);
}

public Map<String, ?> getRawOffset() {
return Collections.unmodifiableMap(offset);
}

}
Loading

0 comments on commit f4b4863

Please sign in to comment.