Skip to content

Commit

Permalink
Add global state manager
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Jun 13, 2022
1 parent 4ca2b18 commit 1a4ad84
Show file tree
Hide file tree
Showing 6 changed files with 194 additions and 117 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.airbyte.integrations.source.relationaldb.state;

import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.CURSOR_FIELD_FUNCTION;
import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.CURSOR_FUNCTION;
import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.NAME_NAMESPACE_PAIR_FUNCTION;

import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.source.relationaldb.CdcStateManager;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.Map;

/**
* Global implementation of the {@link StateManager} interface.
*
* This implementation generates a single, global state object for the state
* tracked by this manager.
*/
public class GlobalStateManager extends AbstractStateManager<AirbyteStateMessage, AirbyteStreamState> {

/**
* Constructs a new {@link GlobalStateManager} that is seeded with the provided
* {@link AirbyteStateMessage}.
*
* @param airbyteStateMessage The initial state represented as an {@link AirbyteStateMessage}.
* @param catalog The {@link ConfiguredAirbyteCatalog} for the connector associated with this state
* manager.
*/
public GlobalStateManager(final AirbyteStateMessage airbyteStateMessage, final ConfiguredAirbyteCatalog catalog) {
super(catalog,
() -> airbyteStateMessage.getStreams(),
CURSOR_FUNCTION,
CURSOR_FIELD_FUNCTION,
NAME_NAMESPACE_PAIR_FUNCTION);
}

@Override
public CdcStateManager getCdcStateManager() {
return null;
}

@Override
public AirbyteStateMessage toState() {
final Map<AirbyteStreamNameNamespacePair, CursorInfo> pairToCursorInfoMap = getPairToCursorInfoMap();
final AirbyteStateMessage airbyteStateMessage = new AirbyteStateMessage();
return airbyteStateMessage
.withStateType(AirbyteStateType.GLOBAL)
// Temporarily include legacy state for backwards compatibility with the platform
.withData(Jsons.jsonNode(StateGeneratorUtils.generateDbState(pairToCursorInfoMap)))
// TODO generate global state
.withGlobal(null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,7 @@ public AirbyteStateMessage toState() {
.withCdc(isCdc)
.withStreams(getPairToCursorInfoMap().entrySet().stream()
.sorted(Entry.comparingByKey()) // sort by stream name then namespace for sanity.
.map(e -> new DbStreamState()
.withStreamName(e.getKey().getName())
.withStreamNamespace(e.getKey().getNamespace())
.withCursorField(e.getValue().getCursorField() == null ? Collections.emptyList() : List.of(e.getValue().getCursorField()))
.withCursor(e.getValue().getCursor()))
.map(e -> StateGeneratorUtils.generateDbStreamState(e.getKey(), e.getValue()))
.collect(Collectors.toList()))
.withCdcState(getCdcStateManager().getCdcState());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,61 +4,29 @@

package io.airbyte.integrations.source.relationaldb.state;

import com.google.common.collect.Lists;
import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.CURSOR_FIELD_FUNCTION;
import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.CURSOR_FUNCTION;
import static io.airbyte.integrations.source.relationaldb.state.StateGeneratorUtils.NAME_NAMESPACE_PAIR_FUNCTION;

import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.source.relationaldb.CdcStateManager;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.integrations.source.relationaldb.models.DbStreamState;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Per-stream implementation of the {@link StateManager} interface.
*
* This implementation generates a state object for each stream detected in catalog/map of known
* streams to cursor information stored in this manager.
*/
public class PerStreamStateManager extends AbstractStateManager<AirbyteStateMessage, AirbyteStreamState> {

private static final Logger LOGGER = LoggerFactory.getLogger(PerStreamStateManager.class);

/**
* {@link Function} that extracts the cursor from the stream state.
*/
private static final Function<AirbyteStreamState, String> CURSOR_FUNCTION = stream -> {
final Optional<DbStreamState> dbStreamState = extractState(stream);
if (dbStreamState.isPresent()) {
return dbStreamState.get().getCursor();
} else {
return null;
}
};

/**
* {@link Function} that extracts the cursor field(s) from the stream state.
*/
private static final Function<AirbyteStreamState, List<String>> CURSOR_FIELD_FUNCTION = stream -> {
final Optional<DbStreamState> dbStreamState = extractState(stream);
if (dbStreamState.isPresent()) {
return dbStreamState.get().getCursorField();
} else {
return List.of();
}
};

/**
* {@link Function} that creates an {@link AirbyteStreamNameNamespacePair} from the stream state.
*/
private static final Function<AirbyteStreamState, AirbyteStreamNameNamespacePair> NAME_NAMESPACE_PAIR_FUNCTION =
s -> new AirbyteStreamNameNamespacePair(s.getName(), s.getNamespace());

/**
* Constructs a new {@link PerStreamStateManager} that is seeded with the provided
* {@link AirbyteStateMessage}.
Expand All @@ -84,74 +52,11 @@ public CdcStateManager getCdcStateManager() {
public AirbyteStateMessage toState() {
final Map<AirbyteStreamNameNamespacePair, CursorInfo> pairToCursorInfoMap = getPairToCursorInfoMap();
final AirbyteStateMessage airbyteStateMessage = new AirbyteStateMessage();
final List<AirbyteStreamState> airbyteStreamStates = generatePerStreamState(pairToCursorInfoMap);
final List<AirbyteStreamState> airbyteStreamStates = StateGeneratorUtils.generatePerStreamState(pairToCursorInfoMap);
return airbyteStateMessage
.withStateType(AirbyteStateType.PER_STREAM)
// Temporarily include legacy state for backwards compatibility with the platform
.withData(Jsons.jsonNode(generateDbState(pairToCursorInfoMap)))
.withData(Jsons.jsonNode(StateGeneratorUtils.generateDbState(pairToCursorInfoMap)))
.withStreams(airbyteStreamStates);
}

/**
* Generates the per-stream state for each stream.
*
* @param pairToCursorInfoMap The map of stream name/namespace tuple to the current cursor information for that stream
* @return The list of per-stream state.
*/
private List<AirbyteStreamState> generatePerStreamState(final Map<AirbyteStreamNameNamespacePair, CursorInfo> pairToCursorInfoMap) {
return pairToCursorInfoMap.entrySet().stream()
.filter(s -> s.getKey().getName() != null && s.getKey().getNamespace() != null)
.sorted(Entry.comparingByKey()) // sort by stream name then namespace for sanity.
.map(e -> new AirbyteStreamState()
.withName(e.getKey().getName())
.withNamespace(e.getKey().getNamespace())
.withState(Jsons.jsonNode(generateDbStreamState(e.getKey(), e.getValue()))))
.collect(Collectors.toList());
}

/**
* Generates the legacy global state for backwards compatibility.
*
* @param pairToCursorInfoMap The map of stream name/namespace tuple to the current cursor information for that stream
* @return The legacy {@link DbState}.
*/
private DbState generateDbState(final Map<AirbyteStreamNameNamespacePair, CursorInfo> pairToCursorInfoMap) {
return new DbState().withStreams(pairToCursorInfoMap.entrySet().stream()
.sorted(Entry.comparingByKey()) // sort by stream name then namespace for sanity.
.map(e -> generateDbStreamState(e.getKey(), e.getValue()))
.collect(Collectors.toList()));
}

/**
* Generates the {@link DbStreamState} for the given stream and cursor.
*
* @param airbyteStreamNameNamespacePair The stream.
* @param cursorInfo The current cursor.
* @return The {@link DbStreamState}.
*/
private DbStreamState generateDbStreamState(final AirbyteStreamNameNamespacePair airbyteStreamNameNamespacePair, final CursorInfo cursorInfo) {
return new DbStreamState()
.withStreamName(airbyteStreamNameNamespacePair.getName())
.withStreamNamespace(airbyteStreamNameNamespacePair.getNamespace())
.withCursorField(cursorInfo.getCursorField() == null ? Collections.emptyList() : Lists.newArrayList(cursorInfo.getCursorField()))
.withCursor(cursorInfo.getCursor());
}

/**
* Extracts the actual state from the {@link AirbyteStreamState} object.
*
* @param state The {@link AirbyteStreamState} that contains the actual stream state as JSON.
* @return An {@link Optional} possibly containing the deserialized representation of the stream
* state or an empty {@link Optional} if the state is not present or could not be
* deserialized.
*/
private static Optional<DbStreamState> extractState(final AirbyteStreamState state) {
try {
return Optional.ofNullable(Jsons.object(state.getState(), DbStreamState.class));
} catch (final IllegalArgumentException e) {
LOGGER.error("Unable to extract state.", e);
return Optional.empty();
}
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package io.airbyte.integrations.source.relationaldb.state;

import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.integrations.source.relationaldb.models.DbStreamState;
import io.airbyte.protocol.models.AirbyteStreamState;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Collection of utilities that facilitate the generation of state objects.
*/
public class StateGeneratorUtils {

private static final Logger LOGGER = LoggerFactory.getLogger(StateGeneratorUtils.class);

/**
* {@link Function} that extracts the cursor from the stream state.
*/
public static final Function<AirbyteStreamState, String> CURSOR_FUNCTION = stream -> {
final Optional<DbStreamState> dbStreamState = StateGeneratorUtils.extractState(stream);
if (dbStreamState.isPresent()) {
return dbStreamState.get().getCursor();
} else {
return null;
}
};

/**
* {@link Function} that extracts the cursor field(s) from the stream state.
*/
public static final Function<AirbyteStreamState, List<String>> CURSOR_FIELD_FUNCTION = stream -> {
final Optional<DbStreamState> dbStreamState = StateGeneratorUtils.extractState(stream);
if (dbStreamState.isPresent()) {
return dbStreamState.get().getCursorField();
} else {
return List.of();
}
};

/**
* {@link Function} that creates an {@link AirbyteStreamNameNamespacePair} from the stream state.
*/
public static final Function<AirbyteStreamState, AirbyteStreamNameNamespacePair> NAME_NAMESPACE_PAIR_FUNCTION =
s -> new AirbyteStreamNameNamespacePair(s.getName(), s.getNamespace());

private StateGeneratorUtils() {}

/**
* Generates the per-stream state for each stream.
*
* @param pairToCursorInfoMap The map of stream name/namespace tuple to the current cursor information for that stream
* @return The list of per-stream state.
*/
public static List<AirbyteStreamState> generatePerStreamState(final Map<AirbyteStreamNameNamespacePair, CursorInfo> pairToCursorInfoMap) {
return pairToCursorInfoMap.entrySet().stream()
.filter(s -> s.getKey().getName() != null && s.getKey().getNamespace() != null)
.sorted(Entry.comparingByKey()) // sort by stream name then namespace for sanity.
.map(e -> new AirbyteStreamState()
.withName(e.getKey().getName())
.withNamespace(e.getKey().getNamespace())
.withState(Jsons.jsonNode(generateDbStreamState(e.getKey(), e.getValue()))))
.collect(Collectors.toList());
}

/**
* Generates the legacy global state for backwards compatibility.
*
* @param pairToCursorInfoMap The map of stream name/namespace tuple to the current cursor information for that stream
* @return The legacy {@link DbState}.
*/
public static DbState generateDbState(final Map<AirbyteStreamNameNamespacePair, CursorInfo> pairToCursorInfoMap) {
return new DbState().withStreams(pairToCursorInfoMap.entrySet().stream()
.sorted(Entry.comparingByKey()) // sort by stream name then namespace for sanity.
.map(e -> generateDbStreamState(e.getKey(), e.getValue()))
.collect(Collectors.toList()));
}

/**
* Generates the {@link DbStreamState} for the given stream and cursor.
*
* @param airbyteStreamNameNamespacePair The stream.
* @param cursorInfo The current cursor.
* @return The {@link DbStreamState}.
*/
public static DbStreamState generateDbStreamState(final AirbyteStreamNameNamespacePair airbyteStreamNameNamespacePair, final CursorInfo cursorInfo) {
return new DbStreamState()
.withStreamName(airbyteStreamNameNamespacePair.getName())
.withStreamNamespace(airbyteStreamNameNamespacePair.getNamespace())
.withCursorField(cursorInfo.getCursorField() == null ? Collections.emptyList() : Lists.newArrayList(cursorInfo.getCursorField()))
.withCursor(cursorInfo.getCursor());
}

/**
* Extracts the actual state from the {@link AirbyteStreamState} object.
*
* @param state The {@link AirbyteStreamState} that contains the actual stream state as JSON.
* @return An {@link Optional} possibly containing the deserialized representation of the stream
* state or an empty {@link Optional} if the state is not present or could not be
* deserialized.
*/
public static Optional<DbStreamState> extractState(final AirbyteStreamState state) {
try {
return Optional.ofNullable(Jsons.object(state.getState(), DbStreamState.class));
} catch (final IllegalArgumentException e) {
LOGGER.error("Unable to extract state.", e);
return Optional.empty();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,7 @@ public static StateManager createStateManager(final Object state, final Configur
return new PerStreamStateManager(airbyteStateMessage, catalog);
} else {
LOGGER.info("Global state manager selected to manage state object with type {}.", state.getClass().getName());
// TODO create proper Global state manager
return null;
return new GlobalStateManager(airbyteStateMessage, catalog);
}
} else if (state instanceof DbState dbState) {
LOGGER.info("Legacy state manager selected to manage state object with type {}.", state.getClass().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ void testGlobalStateManagerCreation() {

final StateManager stateManager = StateManagerFactory.createStateManager(airbyteStateMessage, catalog, config);

// TODO replace with non-null assertion and type assertion once the Global state manager exists
Assertions.assertNull(stateManager);
Assertions.assertNotNull(stateManager);
Assertions.assertEquals(GlobalStateManager.class, stateManager.getClass());
}

@Test
Expand Down

0 comments on commit 1a4ad84

Please sign in to comment.