From 7a3d2d7a3304746d3f0e6c35e3e03238f49bdd8a Mon Sep 17 00:00:00 2001 From: jdpgrailsdev Date: Fri, 10 Jun 2022 15:38:13 -0400 Subject: [PATCH] Add global state manager --- .../state/GlobalStateManager.java | 57 +++++++++ .../state/LegacyStateManager.java | 6 +- .../state/PerStreamStateManager.java | 121 ++---------------- .../state/StateGeneratorUtils.java | 120 +++++++++++++++++ .../state/StateManagerFactory.java | 3 +- .../state/StateManagerFactoryTest.java | 4 +- 6 files changed, 194 insertions(+), 117 deletions(-) create mode 100644 airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java create mode 100644 airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java new file mode 100644 index 000000000000..686565998f64 --- /dev/null +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/GlobalStateManager.java @@ -0,0 +1,57 @@ +package io.airbyte.integrations.source.relationaldb.state; + +import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.CURSOR_FIELD_FUNCTION; +import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.CURSOR_FUNCTION; +import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.NAME_NAMESPACE_PAIR_FUNCTION; + +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.source.relationaldb.CdcStateManager; +import io.airbyte.integrations.source.relationaldb.CursorInfo; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; +import io.airbyte.protocol.models.AirbyteStreamState; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import java.util.Map; + +/** + * Global implementation of the {@link StateManager} interface. + * + * This implementation generates a single, global state object for the state + * tracked by this manager. + */ +public class GlobalStateManager extends AbstractStateManager { + + /** + * Constructs a new {@link GlobalStateManager} that is seeded with the provided + * {@link AirbyteStateMessage}. + * + * @param airbyteStateMessage The initial state represented as an {@link AirbyteStateMessage}. + * @param catalog The {@link ConfiguredAirbyteCatalog} for the connector associated with this state + * manager. + */ + public GlobalStateManager(final AirbyteStateMessage airbyteStateMessage, final ConfiguredAirbyteCatalog catalog) { + super(catalog, + () -> airbyteStateMessage.getStreams(), + CURSOR_FUNCTION, + CURSOR_FIELD_FUNCTION, + NAME_NAMESPACE_PAIR_FUNCTION); + } + + @Override + public CdcStateManager getCdcStateManager() { + return null; + } + + @Override + public AirbyteStateMessage toState() { + final Map pairToCursorInfoMap = getPairToCursorInfoMap(); + final AirbyteStateMessage airbyteStateMessage = new AirbyteStateMessage(); + return airbyteStateMessage + .withStateType(AirbyteStateType.GLOBAL) + // Temporarily include legacy state for backwards compatibility with the platform + .withData(Jsons.jsonNode(StateGeneratorUtils.generateDbState(pairToCursorInfoMap))) + // TODO generate global state + .withGlobal(null); + } +} diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/LegacyStateManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/LegacyStateManager.java index 2613ebe4bdcd..70d418eff0f6 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/LegacyStateManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/LegacyStateManager.java @@ -95,11 +95,7 @@ public AirbyteStateMessage toState() { .withCdc(isCdc) .withStreams(getPairToCursorInfoMap().entrySet().stream() .sorted(Entry.comparingByKey()) // sort by stream name then namespace for sanity. - .map(e -> new DbStreamState() - .withStreamName(e.getKey().getName()) - .withStreamNamespace(e.getKey().getNamespace()) - .withCursorField(e.getValue().getCursorField() == null ? Collections.emptyList() : List.of(e.getValue().getCursorField())) - .withCursor(e.getValue().getCursor())) + .map(e -> StateGeneratorUtils.generateDbStreamState(e.getKey(), e.getValue())) .collect(Collectors.toList())) .withCdcState(getCdcStateManager().getCdcState()); diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/PerStreamStateManager.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/PerStreamStateManager.java index c0146aa79155..9e8f31555243 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/PerStreamStateManager.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/PerStreamStateManager.java @@ -4,61 +4,29 @@ package io.airbyte.integrations.source.relationaldb.state; -import com.google.common.collect.Lists; +import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.CURSOR_FIELD_FUNCTION; +import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.CURSOR_FUNCTION; +import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.NAME_NAMESPACE_PAIR_FUNCTION; + import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.source.relationaldb.CdcStateManager; import io.airbyte.integrations.source.relationaldb.CursorInfo; -import io.airbyte.integrations.source.relationaldb.models.DbState; -import io.airbyte.integrations.source.relationaldb.models.DbStreamState; import io.airbyte.protocol.models.AirbyteStateMessage; import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; import io.airbyte.protocol.models.AirbyteStreamState; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; -import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; -import java.util.function.Function; -import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +/** + * Per-stream implementation of the {@link StateManager} interface. + * + * This implementation generates a state object for each stream detected in catalog/map of known + * streams to cursor information stored in this manager. + */ public class PerStreamStateManager extends AbstractStateManager { - private static final Logger LOGGER = LoggerFactory.getLogger(PerStreamStateManager.class); - - /** - * {@link Function} that extracts the cursor from the stream state. - */ - private static final Function CURSOR_FUNCTION = stream -> { - final Optional dbStreamState = extractState(stream); - if (dbStreamState.isPresent()) { - return dbStreamState.get().getCursor(); - } else { - return null; - } - }; - - /** - * {@link Function} that extracts the cursor field(s) from the stream state. - */ - private static final Function> CURSOR_FIELD_FUNCTION = stream -> { - final Optional dbStreamState = extractState(stream); - if (dbStreamState.isPresent()) { - return dbStreamState.get().getCursorField(); - } else { - return List.of(); - } - }; - - /** - * {@link Function} that creates an {@link AirbyteStreamNameNamespacePair} from the stream state. - */ - private static final Function NAME_NAMESPACE_PAIR_FUNCTION = - s -> new AirbyteStreamNameNamespacePair(s.getName(), s.getNamespace()); - /** * Constructs a new {@link PerStreamStateManager} that is seeded with the provided * {@link AirbyteStateMessage}. @@ -84,74 +52,11 @@ public CdcStateManager getCdcStateManager() { public AirbyteStateMessage toState() { final Map pairToCursorInfoMap = getPairToCursorInfoMap(); final AirbyteStateMessage airbyteStateMessage = new AirbyteStateMessage(); - final List airbyteStreamStates = generatePerStreamState(pairToCursorInfoMap); + final List airbyteStreamStates = StateGeneratorUtils.generatePerStreamState(pairToCursorInfoMap); return airbyteStateMessage .withStateType(AirbyteStateType.PER_STREAM) // Temporarily include legacy state for backwards compatibility with the platform - .withData(Jsons.jsonNode(generateDbState(pairToCursorInfoMap))) + .withData(Jsons.jsonNode(StateGeneratorUtils.generateDbState(pairToCursorInfoMap))) .withStreams(airbyteStreamStates); } - - /** - * Generates the per-stream state for each stream. - * - * @param pairToCursorInfoMap The map of stream name/namespace tuple to the current cursor information for that stream - * @return The list of per-stream state. - */ - private List generatePerStreamState(final Map pairToCursorInfoMap) { - return pairToCursorInfoMap.entrySet().stream() - .filter(s -> s.getKey().getName() != null && s.getKey().getNamespace() != null) - .sorted(Entry.comparingByKey()) // sort by stream name then namespace for sanity. - .map(e -> new AirbyteStreamState() - .withName(e.getKey().getName()) - .withNamespace(e.getKey().getNamespace()) - .withState(Jsons.jsonNode(generateDbStreamState(e.getKey(), e.getValue())))) - .collect(Collectors.toList()); - } - - /** - * Generates the legacy global state for backwards compatibility. - * - * @param pairToCursorInfoMap The map of stream name/namespace tuple to the current cursor information for that stream - * @return The legacy {@link DbState}. - */ - private DbState generateDbState(final Map pairToCursorInfoMap) { - return new DbState().withStreams(pairToCursorInfoMap.entrySet().stream() - .sorted(Entry.comparingByKey()) // sort by stream name then namespace for sanity. - .map(e -> generateDbStreamState(e.getKey(), e.getValue())) - .collect(Collectors.toList())); - } - - /** - * Generates the {@link DbStreamState} for the given stream and cursor. - * - * @param airbyteStreamNameNamespacePair The stream. - * @param cursorInfo The current cursor. - * @return The {@link DbStreamState}. - */ - private DbStreamState generateDbStreamState(final AirbyteStreamNameNamespacePair airbyteStreamNameNamespacePair, final CursorInfo cursorInfo) { - return new DbStreamState() - .withStreamName(airbyteStreamNameNamespacePair.getName()) - .withStreamNamespace(airbyteStreamNameNamespacePair.getNamespace()) - .withCursorField(cursorInfo.getCursorField() == null ? Collections.emptyList() : Lists.newArrayList(cursorInfo.getCursorField())) - .withCursor(cursorInfo.getCursor()); - } - - /** - * Extracts the actual state from the {@link AirbyteStreamState} object. - * - * @param state The {@link AirbyteStreamState} that contains the actual stream state as JSON. - * @return An {@link Optional} possibly containing the deserialized representation of the stream - * state or an empty {@link Optional} if the state is not present or could not be - * deserialized. - */ - private static Optional extractState(final AirbyteStreamState state) { - try { - return Optional.ofNullable(Jsons.object(state.getState(), DbStreamState.class)); - } catch (final IllegalArgumentException e) { - LOGGER.error("Unable to extract state.", e); - return Optional.empty(); - } - } - -} +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java new file mode 100644 index 000000000000..f74b0e6d4846 --- /dev/null +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateGeneratorUtils.java @@ -0,0 +1,120 @@ +package io.airbyte.integrations.source.relationaldb.state; + +import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; +import io.airbyte.integrations.source.relationaldb.CursorInfo; +import io.airbyte.integrations.source.relationaldb.models.DbState; +import io.airbyte.integrations.source.relationaldb.models.DbStreamState; +import io.airbyte.protocol.models.AirbyteStreamState; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Collection of utilities that facilitate the generation of state objects. + */ +public class StateGeneratorUtils { + + private static final Logger LOGGER = LoggerFactory.getLogger(StateGeneratorUtils.class); + + /** + * {@link Function} that extracts the cursor from the stream state. + */ + public static final Function CURSOR_FUNCTION = stream -> { + final Optional dbStreamState = StateGeneratorUtils.extractState(stream); + if (dbStreamState.isPresent()) { + return dbStreamState.get().getCursor(); + } else { + return null; + } + }; + + /** + * {@link Function} that extracts the cursor field(s) from the stream state. + */ + public static final Function> CURSOR_FIELD_FUNCTION = stream -> { + final Optional dbStreamState = StateGeneratorUtils.extractState(stream); + if (dbStreamState.isPresent()) { + return dbStreamState.get().getCursorField(); + } else { + return List.of(); + } + }; + + /** + * {@link Function} that creates an {@link AirbyteStreamNameNamespacePair} from the stream state. + */ + public static final Function NAME_NAMESPACE_PAIR_FUNCTION = + s -> new AirbyteStreamNameNamespacePair(s.getName(), s.getNamespace()); + + private StateGeneratorUtils() {} + + /** + * Generates the per-stream state for each stream. + * + * @param pairToCursorInfoMap The map of stream name/namespace tuple to the current cursor information for that stream + * @return The list of per-stream state. + */ + public static List generatePerStreamState(final Map pairToCursorInfoMap) { + return pairToCursorInfoMap.entrySet().stream() + .filter(s -> s.getKey().getName() != null && s.getKey().getNamespace() != null) + .sorted(Entry.comparingByKey()) // sort by stream name then namespace for sanity. + .map(e -> new AirbyteStreamState() + .withName(e.getKey().getName()) + .withNamespace(e.getKey().getNamespace()) + .withState(Jsons.jsonNode(generateDbStreamState(e.getKey(), e.getValue())))) + .collect(Collectors.toList()); + } + + /** + * Generates the legacy global state for backwards compatibility. + * + * @param pairToCursorInfoMap The map of stream name/namespace tuple to the current cursor information for that stream + * @return The legacy {@link DbState}. + */ + public static DbState generateDbState(final Map pairToCursorInfoMap) { + return new DbState().withStreams(pairToCursorInfoMap.entrySet().stream() + .sorted(Entry.comparingByKey()) // sort by stream name then namespace for sanity. + .map(e -> generateDbStreamState(e.getKey(), e.getValue())) + .collect(Collectors.toList())); + } + + /** + * Generates the {@link DbStreamState} for the given stream and cursor. + * + * @param airbyteStreamNameNamespacePair The stream. + * @param cursorInfo The current cursor. + * @return The {@link DbStreamState}. + */ + public static DbStreamState generateDbStreamState(final AirbyteStreamNameNamespacePair airbyteStreamNameNamespacePair, final CursorInfo cursorInfo) { + return new DbStreamState() + .withStreamName(airbyteStreamNameNamespacePair.getName()) + .withStreamNamespace(airbyteStreamNameNamespacePair.getNamespace()) + .withCursorField(cursorInfo.getCursorField() == null ? Collections.emptyList() : Lists.newArrayList(cursorInfo.getCursorField())) + .withCursor(cursorInfo.getCursor()); + } + + /** + * Extracts the actual state from the {@link AirbyteStreamState} object. + * + * @param state The {@link AirbyteStreamState} that contains the actual stream state as JSON. + * @return An {@link Optional} possibly containing the deserialized representation of the stream + * state or an empty {@link Optional} if the state is not present or could not be + * deserialized. + */ + public static Optional extractState(final AirbyteStreamState state) { + try { + return Optional.ofNullable(Jsons.object(state.getState(), DbStreamState.class)); + } catch (final IllegalArgumentException e) { + LOGGER.error("Unable to extract state.", e); + return Optional.empty(); + } + } +} diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateManagerFactory.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateManagerFactory.java index d39d6daca992..1c92cc6abd71 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateManagerFactory.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/state/StateManagerFactory.java @@ -49,8 +49,7 @@ public static StateManager createStateManager(final Object state, final Configur return new PerStreamStateManager(airbyteStateMessage, catalog); } else { LOGGER.info("Global state manager selected to manage state object with type {}.", state.getClass().getName()); - // TODO create proper Global state manager - return null; + return new GlobalStateManager(airbyteStateMessage, catalog); } } else if (state instanceof DbState dbState) { LOGGER.info("Legacy state manager selected to manage state object with type {}.", state.getClass().getName()); diff --git a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StateManagerFactoryTest.java b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StateManagerFactoryTest.java index 95519bed3398..4df2964deb1e 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StateManagerFactoryTest.java +++ b/airbyte-integrations/connectors/source-relational-db/src/test/java/io/airbyte/integrations/source/relationaldb/state/StateManagerFactoryTest.java @@ -76,8 +76,8 @@ void testGlobalStateManagerCreation() { final StateManager stateManager = StateManagerFactory.createStateManager(airbyteStateMessage, catalog, config); - // TODO replace with non-null assertion and type assertion once the Global state manager exists - Assertions.assertNull(stateManager); + Assertions.assertNotNull(stateManager); + Assertions.assertEquals(GlobalStateManager.class, stateManager.getClass()); } @Test