Skip to content

Commit

Permalink
Handle conversion between different state types
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Jun 14, 2022
1 parent 0880059 commit 406698e
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -415,7 +416,7 @@ private static AirbyteStream addCdcMetadataColumns(final AirbyteStream stream) {
@Override
protected List<AirbyteStateMessage> deserializeState(final JsonNode stateJson, final JsonNode config) {
if (stateJson == null) {
if (isCdc(config)) {
if (supportedStateTypeSupplier(config).get() == AirbyteStateType.GLOBAL) {
final AirbyteGlobalState globalState = new AirbyteGlobalState()
.withSharedState(Jsons.jsonNode(new CdcState()))
.withStreamStates(List.of());
Expand All @@ -435,6 +436,11 @@ protected List<AirbyteStateMessage> deserializeState(final JsonNode stateJson, f
}
}

@Override
protected Supplier<AirbyteStateType> supportedStateTypeSupplier(final JsonNode config) {
return () -> isCdc(config) ? AirbyteStateType.GLOBAL : AirbyteStateType.STREAM;
}

public static void main(final String[] args) throws Exception {
final Source source = PostgresSource.sshWrappedSource();
LOGGER.info("starting source: {}", PostgresSource.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ public AutoCloseableIterator<AirbyteMessage> read(final JsonNode config,
final ConfiguredAirbyteCatalog catalog,
final JsonNode state)
throws Exception {
final StateManager stateManager = StateManagerFactory.createStateManager(deserializeState(state, config), catalog, supportsGlobalState(config));
final StateManager stateManager =
StateManagerFactory.createStateManager(deserializeState(state, config), catalog, supportedStateTypeSupplier(config));
final Instant emittedAt = Instant.now();

final Database database = createDatabaseInternal(config);
Expand Down Expand Up @@ -535,14 +536,14 @@ protected List<AirbyteStateMessage> deserializeState(final JsonNode stateJson, f
}

/**
* Generates a {@link Supplier} that can be used to determine if the global state manager should be
* selected for use by this connector.
* Generates a {@link Supplier} that can be used to determine which state manager should be selected
* for use by this connector.
*
* @param config The connector configuration.
* @return A {@link Supplier}.
*/
protected Supplier<Boolean> supportsGlobalState(final JsonNode config) {
return () -> false;
protected Supplier<AirbyteStateType> supportedStateTypeSupplier(final JsonNode config) {
return () -> AirbyteStateType.LEGACY;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.integrations.source.relationaldb.models.DbStreamState;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.StreamDescriptor;
import java.util.Collections;
Expand Down Expand Up @@ -157,4 +160,54 @@ public static boolean isValidStreamDescriptor(final StreamDescriptor streamDescr
}
}

/**
* Converts a {@link AirbyteStateType#LEGACY} state message into a {@link AirbyteStateType#GLOBAL}
* message.
*
* @param airbyteStateMessage A {@link AirbyteStateType#LEGACY} state message.
* @return A {@link AirbyteStateType#GLOBAL} state message.
*/
public static AirbyteStateMessage convertLegacyStateToGlobalState(final AirbyteStateMessage airbyteStateMessage) {
final DbState dbState = Jsons.object(airbyteStateMessage.getData(), DbState.class);
final AirbyteGlobalState globalState = new AirbyteGlobalState()
.withSharedState(Jsons.jsonNode(dbState.getCdcState()))
.withStreamStates(dbState.getStreams().stream()
.map(s -> new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor().withName(s.getStreamName()).withNamespace(s.getStreamNamespace()))
.withStreamState(Jsons.jsonNode(s)))
.collect(
Collectors.toList()));
return new AirbyteStateMessage().withStateType(AirbyteStateType.GLOBAL).withGlobal(globalState);
}

/**
* Converts a {@link AirbyteStateType#GLOBAL} state message into a list of
* {@link AirbyteStateType#STREAM} messages.
*
* @param airbyteStateMessage A {@link AirbyteStateType#GLOBAL} state message.
* @return A list {@link AirbyteStateType#STREAM} state messages.
*/
public static List<AirbyteStateMessage> convertGlobalStateToStreamState(final AirbyteStateMessage airbyteStateMessage) {
return airbyteStateMessage.getGlobal().getStreamStates().stream()
.map(s -> new AirbyteStateMessage().withStateType(AirbyteStateType.STREAM)
.withStream(new AirbyteStreamState().withStreamDescriptor(s.getStreamDescriptor()).withStreamState(s.getStreamState())))
.collect(Collectors.toList());
}

/**
* Converts a {@link AirbyteStateType#LEGACY} state message into a list of
* {@link AirbyteStateType#STREAM} messages.
*
* @param airbyteStateMessage A {@link AirbyteStateType#LEGACY} state message.
* @return A list {@link AirbyteStateType#STREAM} state messages.
*/
public static List<AirbyteStateMessage> convertLegacyStateToStreamState(final AirbyteStateMessage airbyteStateMessage) {
return Jsons.object(airbyteStateMessage.getData(), DbState.class).getStreams().stream()
.map(s -> new AirbyteStateMessage().withStateType(AirbyteStateType.STREAM)
.withStream(new AirbyteStreamState()
.withStreamDescriptor(new StreamDescriptor().withNamespace(s.getStreamNamespace()).withName(s.getStreamName()))
.withStreamState(Jsons.jsonNode(s))))
.collect(Collectors.toList());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import org.slf4j.Logger;
Expand All @@ -26,32 +28,99 @@ public class StateManagerFactory {
private StateManagerFactory() {}

/**
* Creates a {@link StateManager} based on the provided state object and catalog.
* Creates a {@link StateManager} based on the provided state object and catalog. This method will handle the
* conversion of the provided state to match the requested state manager based on the provided {@link AirbyteStateType}.
*
* @param state The deserialized state.
* @param catalog The {@link ConfiguredAirbyteCatalog} for the connector that will utilize the state
* manager.
* @param usesGlobalState {@link Supplier} that determines if global state is used by the connector.
* @param stateTypeSupplier {@link Supplier} that provides the {@link AirbyteStateType} that will be
* used to select the correct state manager.
* @return A newly created {@link StateManager} implementation based on the provided state.
*/
public static StateManager createStateManager(final List<AirbyteStateMessage> state,
final ConfiguredAirbyteCatalog catalog,
final Supplier<Boolean> usesGlobalState) {
final Supplier<AirbyteStateType> stateTypeSupplier) {
if (state != null && !state.isEmpty()) {
final AirbyteStateMessage airbyteStateMessage = state.get(0);
if (usesGlobalState.get()) {
LOGGER.info("Global state manager selected to manage state object with type {}.", airbyteStateMessage.getStateType());
return new GlobalStateManager(airbyteStateMessage, catalog);
} else if (airbyteStateMessage.getData() != null && airbyteStateMessage.getStream() == null) {
LOGGER.info("Legacy state manager selected to manage state object with type {}.", airbyteStateMessage.getStateType());
return new LegacyStateManager(Jsons.object(airbyteStateMessage.getData(), DbState.class), catalog);
} else {
LOGGER.info("Stream state manager selected to manage state object with type {}.", airbyteStateMessage.getStateType());
return new StreamStateManager(state, catalog);
switch (stateTypeSupplier.get()) {
case LEGACY:
LOGGER.info("Legacy state manager selected to manage state object with type {}.", airbyteStateMessage.getStateType());
return new LegacyStateManager(Jsons.object(airbyteStateMessage.getData(), DbState.class), catalog);
case GLOBAL:
LOGGER.info("Global state manager selected to manage state object with type {}.", airbyteStateMessage.getStateType());
return new GlobalStateManager(generateGlobalState(airbyteStateMessage), catalog);
case STREAM:
default:
LOGGER.info("Stream state manager selected to manage state object with type {}.", airbyteStateMessage.getStateType());
return new StreamStateManager(generateStreamState(state), catalog);
}
} else {
throw new IllegalArgumentException("Failed to create state manager due to empty state list.");
}
}

/**
* Handles the conversion between a different state type and the global state. This method handles
* the following transitions:
* <ul>
* <li>Stream -> Global (not supported, results in {@link IllegalArgumentException}</li>
* <li>Legacy -> Global (supported)</li>
* <li>Global -> Glboal (supported/no conversion required)</li>
* </ul>
*
* @param airbyteStateMessage The current state that is to be converted to global state.
* @return The converted state message.
* @throws IllegalArgumentException if unable to convert between the given state type and global.
*/
private static AirbyteStateMessage generateGlobalState(final AirbyteStateMessage airbyteStateMessage) {
AirbyteStateMessage globalStateMessage = airbyteStateMessage;

switch (airbyteStateMessage.getStateType()) {
case STREAM:
throw new IllegalArgumentException("Unable to convert connector state from per-stream to global. Please reset the connection to continue.");
case LEGACY:
globalStateMessage = StateGeneratorUtils.convertLegacyStateToGlobalState(airbyteStateMessage);
LOGGER.info("Legacy state converted to global state.", airbyteStateMessage.getStateType());
break;
case GLOBAL:
default:
break;
}

return globalStateMessage;
}

/**
* Handles the conversion between a different state type and the stream state. This method handles
* the following transitions:
* <ul>
* <li>Global -> Stream (supported/shared state discarded)</li>
* <li>Legacy -> Stream (supported)</li>
* <li>Stream -> Stream (supported/no conversion required)</li>
* </ul>
*
* @param states The list of current states.
* @return The converted state messages.
*/
private static List<AirbyteStateMessage> generateStreamState(final List<AirbyteStateMessage> states) {
final AirbyteStateMessage airbyteStateMessage = states.get(0);
final List<AirbyteStateMessage> streamStates = new ArrayList<>();
switch (airbyteStateMessage.getStateType()) {
case GLOBAL:
streamStates.addAll(StateGeneratorUtils.convertGlobalStateToStreamState(airbyteStateMessage));
LOGGER.info("Global state converted to stream state.", airbyteStateMessage.getStateType());
break;
case LEGACY:
streamStates.addAll(StateGeneratorUtils.convertLegacyStateToStreamState(airbyteStateMessage));
break;
case STREAM:
default:
streamStates.addAll(states);
break;
}

return streamStates;
}

}
Loading

0 comments on commit 406698e

Please sign in to comment.