Skip to content

Commit

Permalink
Handle invalid conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Jun 15, 2022
1 parent 1c2a65b commit 566baee
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private static AirbyteStateMessage generateGlobalState(final AirbyteStateMessage

switch (airbyteStateMessage.getStateType()) {
case STREAM:
throw new IllegalArgumentException("Unable to convert connector state from per-stream to global. Please reset the connection to continue.");
throw new IllegalArgumentException("Unable to convert connector state from stream to global. Please reset the connection to continue.");
case LEGACY:
globalStateMessage = StateGeneratorUtils.convertLegacyStateToGlobalState(airbyteStateMessage);
LOGGER.info("Legacy state converted to global state.", airbyteStateMessage.getStateType());
Expand All @@ -102,15 +102,14 @@ private static AirbyteStateMessage generateGlobalState(final AirbyteStateMessage
*
* @param states The list of current states.
* @return The converted state messages.
* @throws IllegalArgumentException if unable to convert between the given state type and stream.
*/
private static List<AirbyteStateMessage> generateStreamState(final List<AirbyteStateMessage> states) {
final AirbyteStateMessage airbyteStateMessage = states.get(0);
final List<AirbyteStateMessage> streamStates = new ArrayList<>();
switch (airbyteStateMessage.getStateType()) {
case GLOBAL:
streamStates.addAll(StateGeneratorUtils.convertGlobalStateToStreamState(airbyteStateMessage));
LOGGER.info("Global state converted to stream state.", airbyteStateMessage.getStateType());
break;
throw new IllegalArgumentException("Unable to convert connector state from global to stream. Please reset the connection to continue.");
case LEGACY:
streamStates.addAll(StateGeneratorUtils.convertLegacyStateToStreamState(airbyteStateMessage));
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,7 @@ void testStreamStateManagerCreationFromGlobal() {
.withStreamState(Jsons.jsonNode(new DbStreamState()))));
final AirbyteStateMessage airbyteStateMessage = new AirbyteStateMessage().withStateType(AirbyteStateType.GLOBAL).withGlobal(globalState);

final StateManager stateManager = StateManagerFactory.createStateManager(List.of(airbyteStateMessage), catalog, STREAM_STATE_TYPE);

Assertions.assertNotNull(stateManager);
Assertions.assertEquals(StreamStateManager.class, stateManager.getClass());
Assertions.assertThrows(IllegalArgumentException.class, () -> StateManagerFactory.createStateManager(List.of(airbyteStateMessage), catalog, STREAM_STATE_TYPE));
}

@Test
Expand Down

0 comments on commit 566baee

Please sign in to comment.