diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 645d1c2a797f..8919f526e6bc 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -50,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -415,7 +416,7 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) { @Override protected List deserializeState(final JsonNode stateJson, final JsonNode config) { if (stateJson == null) { - if (isCdc(config)) { + if (supportedStateTypeSupplier(config).get() == AirbyteStateType.GLOBAL) { final AirbyteGlobalState globalState = new AirbyteGlobalState() .withSharedState(Jsons.jsonNode(new CdcState())) .withStreamStates(List.of()); @@ -435,6 +436,11 @@ protected List deserializeState(final JsonNode stateJson, f } } + @Override + protected Supplier supportedStateTypeSupplier(final JsonNode config) { + return () -> isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM; + } + public static void main(final String[] args) throws Exception { final Source source = PostgresSource.sshWrappedSource(); LOGGER.info("starting source: {}", PostgresSource.class); diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index ebffc59ba18a..fca7d885aa95 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -109,7 +109,8 @@ public AutoCloseableIterator read(final JsonNode config, final ConfiguredAirbyteCatalog catalog, final JsonNode state) throws Exception { - final StateManager stateManager = StateManagerFactory.createStateManager(deserializeState(state, config), catalog, supportsGlobalState(config)); + final StateManager stateManager = + StateManagerFactory.createStateManager(deserializeState(state, config), catalog, supportedStateTypeSupplier(config)); final Instant emittedAt = Instant.now(); final Database database = createDatabaseInternal(config); @@ -535,14 +536,14 @@ protected List deserializeState(final JsonNode stateJson, f } /** - * Generates a {@link Supplier} that can be used to determine if the global state manager should be - * selected for use by this connector. + * Generates a {@link Supplier} that can be used to determine which state manager should be selected + * for use by this connector. * * @param config The connector configuration. * @return A {@link Supplier}. */ - protected Supplier supportsGlobalState(final JsonNode config) { - return () -> false; + protected Supplier supportedStateTypeSupplier(final JsonNode config) { + return () -> AirbyteStateType.LEGACY; } } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java index ecfa8c412732..479239983512 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java @@ -10,6 +10,9 @@ import io.airbyte.integrations.source.relationaldb.CursorInfo; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; +import io.airbyte.protocol.models.AirbyteGlobalState; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; import io.airbyte.protocol.models.AirbyteStreamState; import io.airbyte.protocol.models.StreamDescriptor; import java.util.Collections; @@ -157,4 +160,54 @@ public static boolean isValidStreamDescriptor(final StreamDescriptor streamDescr } } + /** + * Converts a {@link AirbyteStateType#LEGACY} state message into a {@link AirbyteStateType#GLOBAL} + * message. + * + * @param airbyteStateMessage A {@link AirbyteStateType#LEGACY} state message. + * @return A {@link AirbyteStateType#GLOBAL} state message. + */ + public static AirbyteStateMessage convertLegacyStateToGlobalState(final AirbyteStateMessage airbyteStateMessage) { + final DbState dbState = Jsons.object(airbyteStateMessage.getData(), DbState.class); + final AirbyteGlobalState globalState = new AirbyteGlobalState() + .withSharedState(Jsons.jsonNode(dbState.getCdcState())) + .withStreamStates(dbState.getStreams().stream() + .map(s -> new AirbyteStreamState() + .withStreamDescriptor(new StreamDescriptor().withName(s.getStreamName()).withNamespace(s.getStreamNamespace())) + .withStreamState(Jsons.jsonNode(s))) + .collect( + Collectors.toList())); + return new AirbyteStateMessage().withStateType(AirbyteStateType.GLOBAL).withGlobal(globalState); + } + + /** + * Converts a {@link AirbyteStateType#GLOBAL} state message into a list of + * {@link AirbyteStateType#STREAM} messages. + * + * @param airbyteStateMessage A {@link AirbyteStateType#GLOBAL} state message. + * @return A list {@link AirbyteStateType#STREAM} state messages. + */ + public static List convertGlobalStateToStreamState(final AirbyteStateMessage airbyteStateMessage) { + return airbyteStateMessage.getGlobal().getStreamStates().stream() + .map(s -> new AirbyteStateMessage().withStateType(AirbyteStateType.STREAM) + .withStream(new AirbyteStreamState().withStreamDescriptor(s.getStreamDescriptor()).withStreamState(s.getStreamState()))) + .collect(Collectors.toList()); + } + + /** + * Converts a {@link AirbyteStateType#LEGACY} state message into a list of + * {@link AirbyteStateType#STREAM} messages. + * + * @param airbyteStateMessage A {@link AirbyteStateType#LEGACY} state message. + * @return A list {@link AirbyteStateType#STREAM} state messages. + */ + public static List convertLegacyStateToStreamState(final AirbyteStateMessage airbyteStateMessage) { + return Jsons.object(airbyteStateMessage.getData(), DbState.class).getStreams().stream() + .map(s -> new AirbyteStateMessage().withStateType(AirbyteStateType.STREAM) + .withStream(new AirbyteStreamState() + .withStreamDescriptor(new StreamDescriptor().withNamespace(s.getStreamNamespace()).withName(s.getStreamName())) + .withStreamState(Jsons.jsonNode(s)))) + .collect(Collectors.toList()); + } + } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateManagerFactory.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateManagerFactory.java index 7abe7639c272..eda12425fc7b 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateManagerFactory.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateManagerFactory.java @@ -7,7 +7,9 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.ArrayList; import java.util.List; import java.util.function.Supplier; import org.slf4j.Logger; @@ -26,32 +28,99 @@ public class StateManagerFactory { private StateManagerFactory() {} /** - * Creates a {@link StateManager} based on the provided state object and catalog. + * Creates a {@link StateManager} based on the provided state object and catalog. This method will handle the + * conversion of the provided state to match the requested state manager based on the provided {@link AirbyteStateType}. * * @param state The deserialized state. * @param catalog The {@link ConfiguredAirbyteCatalog} for the connector that will utilize the state * manager. - * @param usesGlobalState {@link Supplier} that determines if global state is used by the connector. + * @param stateTypeSupplier {@link Supplier} that provides the {@link AirbyteStateType} that will be + * used to select the correct state manager. * @return A newly created {@link StateManager} implementation based on the provided state. */ public static StateManager createStateManager(final List state, final ConfiguredAirbyteCatalog catalog, - final Supplier usesGlobalState) { + final Supplier stateTypeSupplier) { if (state != null && !state.isEmpty()) { final AirbyteStateMessage airbyteStateMessage = state.get(0); - if (usesGlobalState.get()) { - LOGGER.info("Global state manager selected to manage state object with type {}.", airbyteStateMessage.getStateType()); - return new GlobalStateManager(airbyteStateMessage, catalog); - } else if (airbyteStateMessage.getData() != null && airbyteStateMessage.getStream() == null) { - LOGGER.info("Legacy state manager selected to manage state object with type {}.", airbyteStateMessage.getStateType()); - return new LegacyStateManager(Jsons.object(airbyteStateMessage.getData(), DbState.class), catalog); - } else { - LOGGER.info("Stream state manager selected to manage state object with type {}.", airbyteStateMessage.getStateType()); - return new StreamStateManager(state, catalog); + switch (stateTypeSupplier.get()) { + case LEGACY: + LOGGER.info("Legacy state manager selected to manage state object with type {}.", airbyteStateMessage.getStateType()); + return new LegacyStateManager(Jsons.object(airbyteStateMessage.getData(), DbState.class), catalog); + case GLOBAL: + LOGGER.info("Global state manager selected to manage state object with type {}.", airbyteStateMessage.getStateType()); + return new GlobalStateManager(generateGlobalState(airbyteStateMessage), catalog); + case STREAM: + default: + LOGGER.info("Stream state manager selected to manage state object with type {}.", airbyteStateMessage.getStateType()); + return new StreamStateManager(generateStreamState(state), catalog); } } else { throw new IllegalArgumentException("Failed to create state manager due to empty state list."); } } + /** + * Handles the conversion between a different state type and the global state. This method handles + * the following transitions: + *
    + *
  • Stream -> Global (not supported, results in {@link IllegalArgumentException}
  • + *
  • Legacy -> Global (supported)
  • + *
  • Global -> Glboal (supported/no conversion required)
  • + *
+ * + * @param airbyteStateMessage The current state that is to be converted to global state. + * @return The converted state message. + * @throws IllegalArgumentException if unable to convert between the given state type and global. + */ + private static AirbyteStateMessage generateGlobalState(final AirbyteStateMessage airbyteStateMessage) { + AirbyteStateMessage globalStateMessage = airbyteStateMessage; + + switch (airbyteStateMessage.getStateType()) { + case STREAM: + throw new IllegalArgumentException("Unable to convert connector state from per-stream to global. Please reset the connection to continue."); + case LEGACY: + globalStateMessage = StateGeneratorUtils.convertLegacyStateToGlobalState(airbyteStateMessage); + LOGGER.info("Legacy state converted to global state.", airbyteStateMessage.getStateType()); + break; + case GLOBAL: + default: + break; + } + + return globalStateMessage; + } + + /** + * Handles the conversion between a different state type and the stream state. This method handles + * the following transitions: + *
    + *
  • Global -> Stream (supported/shared state discarded)
  • + *
  • Legacy -> Stream (supported)
  • + *
  • Stream -> Stream (supported/no conversion required)
  • + *
+ * + * @param states The list of current states. + * @return The converted state messages. + */ + private static List generateStreamState(final List states) { + final AirbyteStateMessage airbyteStateMessage = states.get(0); + final List streamStates = new ArrayList<>(); + switch (airbyteStateMessage.getStateType()) { + case GLOBAL: + streamStates.addAll(StateGeneratorUtils.convertGlobalStateToStreamState(airbyteStateMessage)); + LOGGER.info("Global state converted to stream state.", airbyteStateMessage.getStateType()); + break; + case LEGACY: + streamStates.addAll(StateGeneratorUtils.convertLegacyStateToStreamState(airbyteStateMessage)); + break; + case STREAM: + default: + streamStates.addAll(states); + break; + } + + return streamStates; + } + } diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StateManagerFactoryTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StateManagerFactoryTest.java index abb9c23db3e5..ff9274f48d7a 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StateManagerFactoryTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StateManagerFactoryTest.java @@ -29,24 +29,39 @@ public class StateManagerFactoryTest { private static final String NAMESPACE = "namespace"; private static final String NAME = "name"; - private static final String REPLICATION_SLOT = "replication_slot"; - private static final String PUBLICATION = "publication"; - private static final String REPLICATION_METHOD = "replication_method"; - private static final Supplier GLOBAL_STATE = () -> true; + private static final Supplier GLOBAL_STATE_TYPE = () -> AirbyteStateType.GLOBAL; - private static final Supplier NO_GLOBAL_STATE = () -> false; + private static final Supplier LEGACY_STATE_TYPE = () -> AirbyteStateType.LEGACY; + + private static final Supplier STREAM_STATE_TYPE = () -> AirbyteStateType.STREAM; @Test void testNullOrEmptyState() { final ConfiguredAirbyteCatalog catalog = mock(ConfiguredAirbyteCatalog.class); Assertions.assertThrows(IllegalArgumentException.class, () -> { - StateManagerFactory.createStateManager(null, catalog, NO_GLOBAL_STATE); + StateManagerFactory.createStateManager(null, catalog, GLOBAL_STATE_TYPE); + }); + + Assertions.assertThrows(IllegalArgumentException.class, () -> { + StateManagerFactory.createStateManager(List.of(), catalog, GLOBAL_STATE_TYPE); + }); + + Assertions.assertThrows(IllegalArgumentException.class, () -> { + StateManagerFactory.createStateManager(null, catalog, LEGACY_STATE_TYPE); + }); + + Assertions.assertThrows(IllegalArgumentException.class, () -> { + StateManagerFactory.createStateManager(List.of(), catalog, LEGACY_STATE_TYPE); + }); + + Assertions.assertThrows(IllegalArgumentException.class, () -> { + StateManagerFactory.createStateManager(null, catalog, STREAM_STATE_TYPE); }); Assertions.assertThrows(IllegalArgumentException.class, () -> { - StateManagerFactory.createStateManager(List.of(), catalog, NO_GLOBAL_STATE); + StateManagerFactory.createStateManager(List.of(), catalog, STREAM_STATE_TYPE); }); } @@ -56,7 +71,7 @@ void testLegacyStateManagerCreationFromAirbyteStateMessage() { final AirbyteStateMessage airbyteStateMessage = mock(AirbyteStateMessage.class); when(airbyteStateMessage.getData()).thenReturn(Jsons.jsonNode(new DbState())); - final StateManager stateManager = StateManagerFactory.createStateManager(List.of(airbyteStateMessage), catalog, NO_GLOBAL_STATE); + final StateManager stateManager = StateManagerFactory.createStateManager(List.of(airbyteStateMessage), catalog, LEGACY_STATE_TYPE); Assertions.assertNotNull(stateManager); Assertions.assertEquals(LegacyStateManager.class, stateManager.getClass()); @@ -71,12 +86,39 @@ void testGlobalStateManagerCreation() { .withStreamState(Jsons.jsonNode(new DbStreamState())))); final AirbyteStateMessage airbyteStateMessage = new AirbyteStateMessage().withStateType(AirbyteStateType.GLOBAL).withGlobal(globalState); - final StateManager stateManager = StateManagerFactory.createStateManager(List.of(airbyteStateMessage), catalog, GLOBAL_STATE); + final StateManager stateManager = StateManagerFactory.createStateManager(List.of(airbyteStateMessage), catalog, GLOBAL_STATE_TYPE); Assertions.assertNotNull(stateManager); Assertions.assertEquals(GlobalStateManager.class, stateManager.getClass()); } + @Test + void testGlobalStateManagerCreationFromLegacyState() { + final ConfiguredAirbyteCatalog catalog = mock(ConfiguredAirbyteCatalog.class); + final CdcState cdcState = new CdcState(); + final DbState dbState = new DbState() + .withCdcState(cdcState) + .withStreams(List.of(new DbStreamState().withStreamName(NAME).withStreamNamespace(NAMESPACE))); + final AirbyteStateMessage airbyteStateMessage = + new AirbyteStateMessage().withStateType(AirbyteStateType.LEGACY).withData(Jsons.jsonNode(dbState)); + + final StateManager stateManager = StateManagerFactory.createStateManager(List.of(airbyteStateMessage), catalog, GLOBAL_STATE_TYPE); + + Assertions.assertNotNull(stateManager); + Assertions.assertEquals(GlobalStateManager.class, stateManager.getClass()); + } + + @Test + void testGlobalStateManagerCreationFromStreamState() { + final ConfiguredAirbyteCatalog catalog = mock(ConfiguredAirbyteCatalog.class); + final AirbyteStateMessage airbyteStateMessage = new AirbyteStateMessage().withStateType(AirbyteStateType.STREAM) + .withStream(new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName(NAME).withNamespace( + NAMESPACE)).withStreamState(Jsons.jsonNode(new DbStreamState()))); + + Assertions.assertThrows(IllegalArgumentException.class, + () -> StateManagerFactory.createStateManager(List.of(airbyteStateMessage), catalog, GLOBAL_STATE_TYPE)); + } + @Test void testGlobalStateManagerCreationWithLegacyDataPresent() { final ConfiguredAirbyteCatalog catalog = mock(ConfiguredAirbyteCatalog.class); @@ -87,7 +129,7 @@ void testGlobalStateManagerCreationWithLegacyDataPresent() { final AirbyteStateMessage airbyteStateMessage = new AirbyteStateMessage().withStateType(AirbyteStateType.GLOBAL).withGlobal(globalState).withData(Jsons.jsonNode(new DbState())); - final StateManager stateManager = StateManagerFactory.createStateManager(List.of(airbyteStateMessage), catalog, GLOBAL_STATE); + final StateManager stateManager = StateManagerFactory.createStateManager(List.of(airbyteStateMessage), catalog, GLOBAL_STATE_TYPE); Assertions.assertNotNull(stateManager); Assertions.assertEquals(GlobalStateManager.class, stateManager.getClass()); @@ -100,7 +142,38 @@ void testStreamStateManagerCreation() { .withStream(new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName(NAME).withNamespace( NAMESPACE)).withStreamState(Jsons.jsonNode(new DbStreamState()))); - final StateManager stateManager = StateManagerFactory.createStateManager(List.of(airbyteStateMessage), catalog, NO_GLOBAL_STATE); + final StateManager stateManager = StateManagerFactory.createStateManager(List.of(airbyteStateMessage), catalog, STREAM_STATE_TYPE); + + Assertions.assertNotNull(stateManager); + Assertions.assertEquals(StreamStateManager.class, stateManager.getClass()); + } + + @Test + void testStreamStateManagerCreationFromLegacy() { + final ConfiguredAirbyteCatalog catalog = mock(ConfiguredAirbyteCatalog.class); + final CdcState cdcState = new CdcState(); + final DbState dbState = new DbState() + .withCdcState(cdcState) + .withStreams(List.of(new DbStreamState().withStreamName(NAME).withStreamNamespace(NAMESPACE))); + final AirbyteStateMessage airbyteStateMessage = + new AirbyteStateMessage().withStateType(AirbyteStateType.LEGACY).withData(Jsons.jsonNode(dbState)); + + final StateManager stateManager = StateManagerFactory.createStateManager(List.of(airbyteStateMessage), catalog, STREAM_STATE_TYPE); + + Assertions.assertNotNull(stateManager); + Assertions.assertEquals(StreamStateManager.class, stateManager.getClass()); + } + + @Test + void testStreamStateManagerCreationFromGlobal() { + final ConfiguredAirbyteCatalog catalog = mock(ConfiguredAirbyteCatalog.class); + final AirbyteGlobalState globalState = + new AirbyteGlobalState().withSharedState(Jsons.jsonNode(new DbState().withCdcState(new CdcState().withState(Jsons.jsonNode(new DbState()))))) + .withStreamStates(List.of(new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withNamespace(NAMESPACE).withName(NAME)) + .withStreamState(Jsons.jsonNode(new DbStreamState())))); + final AirbyteStateMessage airbyteStateMessage = new AirbyteStateMessage().withStateType(AirbyteStateType.GLOBAL).withGlobal(globalState); + + final StateManager stateManager = StateManagerFactory.createStateManager(List.of(airbyteStateMessage), catalog, STREAM_STATE_TYPE); Assertions.assertNotNull(stateManager); Assertions.assertEquals(StreamStateManager.class, stateManager.getClass()); @@ -114,7 +187,7 @@ void testStreamStateManagerCreationWithLegacyDataPresent() { NAMESPACE)).withStreamState(Jsons.jsonNode(new DbStreamState()))) .withData(Jsons.jsonNode(new DbState())); - final StateManager stateManager = StateManagerFactory.createStateManager(List.of(airbyteStateMessage), catalog, NO_GLOBAL_STATE); + final StateManager stateManager = StateManagerFactory.createStateManager(List.of(airbyteStateMessage), catalog, STREAM_STATE_TYPE); Assertions.assertNotNull(stateManager); Assertions.assertEquals(StreamStateManager.class, stateManager.getClass());