Skip to content

Commit

Permalink
source-postgres : Remove FULL_REFRESH mode and include INCREMENTAL mo…
Browse files Browse the repository at this point in the history
…de for all streams when configured in xmin replication mode. (#27771)

* Remove full_refresh as an option for xmin syncs and always include incremental syncs

* Comments and test
  • Loading branch information
akashkulk authored Jun 28, 2023
1 parent 6fb53c6 commit e5949bc
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,4 +122,15 @@ public static Set<AirbyteStreamNameNamespacePair> getPublicizedTables(final Jdbc
return publicizedTables;
}

/**
* This method is used for xmin synsc in order to overwrite sync modes for cursor fields. For xmin, we want streams to only have incremental mode
* enabled.
*
* @param stream - airbyte stream
* @return will return list of sync modes
*/
public static AirbyteStream overrideSyncModesForXmin(final AirbyteStream stream) {
return stream.withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ public AirbyteCatalog discover(final JsonNode config) throws Exception {
// Xmin replication has a source-defined cursor (the xmin column). This is done to prevent the user
// from being able to pick their own cursor.
final List<AirbyteStream> streams = catalog.getStreams().stream()
// We want to make sure every stream can be synced in INCREMENTAL mode and never in FULL_REFRESH mode for xmin.
.map(PostgresCatalogHelper::overrideSyncModesForXmin)
.map(PostgresCatalogHelper::setIncrementalToSourceDefined)
.collect(toList());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class XminPostgresSourceTest {
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING),
Field.of("power", JsonSchemaType.NUMBER))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL))
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("id"))),
CatalogHelpers.createAirbyteStream(
Expand All @@ -81,15 +81,15 @@ class XminPostgresSourceTest {
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING),
Field.of("power", JsonSchemaType.NUMBER))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL))
.withSourceDefinedCursor(true),
CatalogHelpers.createAirbyteStream(
"names",
SCHEMA_NAME,
Field.of("first_name", JsonSchemaType.STRING),
Field.of("last_name", JsonSchemaType.STRING),
Field.of("power", JsonSchemaType.NUMBER))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.INCREMENTAL))
.withSourceDefinedCursor(true)
.withSourceDefinedPrimaryKey(List.of(List.of("first_name"), List.of("last_name")))));

Expand Down

0 comments on commit e5949bc

Please sign in to comment.