From bd9e60538e79750d59e4f4018f07efa76ff19ce9 Mon Sep 17 00:00:00 2001 From: subodh Date: Fri, 17 Jun 2022 12:47:39 +0530 Subject: [PATCH] skip debezium engine startup in case no table is in INCREMENTAL mode --- .../source/postgres/PostgresSource.java | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index cb83f7324c69..cb75a0b2703c 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -34,6 +34,7 @@ import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CommonField; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.SyncMode; import java.sql.Connection; import java.sql.JDBCType; @@ -43,6 +44,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -227,16 +229,8 @@ public List> getIncrementalIterators( final Map>> tableNameToTable, final StateManager stateManager, final Instant emittedAt) { - /** - * If a customer sets up a postgres source with cdc parameters (replication_slot and publication) - * but selects all the tables in FULL_REFRESH mode then we would still end up going through this - * path. We do have a check in place for debezium to make sure only tales in INCREMENTAL mode are - * synced {@link DebeziumRecordPublisher#getTableWhitelist(ConfiguredAirbyteCatalog)} but we should - * have a check here as well to make sure that if no table is in INCREMENTAL mode then skip this - * part - */ final JsonNode sourceConfig = database.getSourceConfig(); - if (isCdc(sourceConfig)) { + if (isCdc(sourceConfig) && shouldUseCDC(catalog)) { final AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig, PostgresCdcTargetPosition.targetPosition(database), PostgresCdcProperties.getDebeziumProperties(sourceConfig), catalog, false); @@ -250,6 +244,12 @@ public List> getIncrementalIterators( } } + private static boolean shouldUseCDC(final ConfiguredAirbyteCatalog catalog) { + final Optional any = catalog.getStreams().stream().map(ConfiguredAirbyteStream::getSyncMode) + .filter(syncMode -> syncMode == SyncMode.INCREMENTAL).findAny(); + return any.isPresent(); + } + @VisibleForTesting static boolean isCdc(final JsonNode config) { final boolean isCdc = config.hasNonNull("replication_method")