Skip to content

Commit

Permalink
Handle empty state case in global state manager
Browse files Browse the repository at this point in the history
  • Loading branch information
jdpgrailsdev committed Jun 14, 2022
1 parent e1ae1f7 commit 649effc
Showing 1 changed file with 10 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.StreamDescriptor;
import java.util.Collection;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -110,12 +111,19 @@ private static Supplier<Collection<AirbyteStreamState>> getStreamsSupplier(final
* we can look for streams in the "global" field of the message. Otherwise, the message is still
* storing state in the legacy "data" field.
*/
return () -> airbyteStateMessage.getStateType() == AirbyteStateType.GLOBAL ? airbyteStateMessage.getGlobal().getStreamStates()
: Jsons.object(airbyteStateMessage.getData(), DbState.class).getStreams().stream()
return () -> {
if (airbyteStateMessage.getStateType() == AirbyteStateType.GLOBAL) {
return airbyteStateMessage.getGlobal().getStreamStates();
} else if (airbyteStateMessage.getData() != null) {
return Jsons.object(airbyteStateMessage.getData(), DbState.class).getStreams().stream()
.map(s -> new AirbyteStreamState().withStreamState(Jsons.jsonNode(s))
.withStreamDescriptor(new StreamDescriptor().withNamespace(s.getStreamNamespace()).withName(s.getStreamName())))
.collect(
Collectors.toList());
} else {
return List.of();
}
};
}

}

0 comments on commit 649effc

Please sign in to comment.