From 58529892673def7633d4cf1ae34f686fc5e9bb04 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Fri, 17 Jun 2022 19:01:33 -0700 Subject: [PATCH] Common code to deserialize a state message in the new format (#13772) * Common code to deserialize a state message in the new format * PR comments and type changed to typed * Format * Add StateType and StateWrapper objects to the model * Use state wrapper instead of Either * Switch to optional * PR comments * Support array legacy state * format Co-authored-by: Jimmy Ma --- .../config/helpers/StateMessageHelper.java | 62 +++++++++ .../helpers/StateMessageHelperTest.java | 123 ++++++++++++++++++ 2 files changed, 185 insertions(+) create mode 100644 airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/StateMessageHelper.java create mode 100644 airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/StateMessageHelperTest.java diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/StateMessageHelper.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/StateMessageHelper.java new file mode 100644 index 000000000000..bc8180d28557 --- /dev/null +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/StateMessageHelper.java @@ -0,0 +1,62 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.config.helpers; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.JsonNode; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.StateType; +import io.airbyte.config.StateWrapper; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; +import java.util.List; +import java.util.Optional; + +public class StateMessageHelper { + + public static class AirbyteStateMessageListTypeReference extends TypeReference> {} + + /** + * This a takes a json blob state and tries return either a legacy state in the format of a json + * object or a state message with the new format which is a list of airbyte state message. + * + * @param state - a blob representing the state + * @return An optional state wrapper, if there is no state an empty optional will be returned + */ + public static Optional getTypedState(final JsonNode state) { + if (state == null) { + return Optional.empty(); + } else { + final List stateMessages; + try { + stateMessages = Jsons.object(state, new AirbyteStateMessageListTypeReference()); + } catch (final IllegalArgumentException e) { + return Optional.of(getLegacyStateWrapper(state)); + } + if (stateMessages.stream().anyMatch(streamMessage -> !streamMessage.getAdditionalProperties().isEmpty())) { + return Optional.of(getLegacyStateWrapper(state)); + } + if (stateMessages.size() == 1 && stateMessages.get(0).getStateType() == AirbyteStateType.GLOBAL) { + return Optional.of(new StateWrapper() + .withStateType(StateType.GLOBAL) + .withGlobal(stateMessages.get(0))); + } else if (stateMessages.size() >= 1 + && stateMessages.stream().allMatch(stateMessage -> stateMessage.getStateType() == AirbyteStateType.STREAM)) { + return Optional.of(new StateWrapper() + .withStateType(StateType.STREAM) + .withStateMessages(stateMessages)); + } else { + throw new IllegalStateException("Unexpected state blob"); + } + } + } + + private static StateWrapper getLegacyStateWrapper(final JsonNode state) { + return new StateWrapper() + .withStateType(StateType.LEGACY) + .withLegacyState(state); + } + +} diff --git a/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/StateMessageHelperTest.java b/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/StateMessageHelperTest.java new file mode 100644 index 000000000000..0fa57cb4c9ff --- /dev/null +++ b/airbyte-config/config-models/src/test/java/io/airbyte/config/helpers/StateMessageHelperTest.java @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.config.helpers; + +import com.google.common.collect.Lists; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.StateType; +import io.airbyte.config.StateWrapper; +import io.airbyte.protocol.models.AirbyteGlobalState; +import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType; +import io.airbyte.protocol.models.AirbyteStreamState; +import io.airbyte.protocol.models.StreamDescriptor; +import java.util.Map; +import java.util.Optional; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +public class StateMessageHelperTest { + + @Test + public void testEmpty() { + final Optional stateWrapper = StateMessageHelper.getTypedState(null); + Assertions.assertThat(stateWrapper).isEmpty(); + } + + @Test + public void testLegacy() { + final Optional stateWrapper = StateMessageHelper.getTypedState(Jsons.emptyObject()); + Assertions.assertThat(stateWrapper).isNotEmpty(); + Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.LEGACY); + } + + @Test + public void testLegacyInList() { + final Optional stateWrapper = StateMessageHelper.getTypedState(Jsons.jsonNode( + Lists.newArrayList( + Map.of("Any", "value")))); + Assertions.assertThat(stateWrapper).isNotEmpty(); + Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.LEGACY); + } + + @Test + public void testGlobal() { + final AirbyteStateMessage stateMessage = new AirbyteStateMessage() + .withStateType(AirbyteStateType.GLOBAL) + .withGlobal( + new AirbyteGlobalState() + .withSharedState(Jsons.emptyObject()) + .withStreamStates(Lists.newArrayList( + new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()), + new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject())))); + final Optional stateWrapper = StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage))); + Assertions.assertThat(stateWrapper).isNotEmpty(); + Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.GLOBAL); + Assertions.assertThat(stateWrapper.get().getGlobal()).isEqualTo(stateMessage); + } + + @Test + public void testStream() { + final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage() + .withStateType(AirbyteStateType.STREAM) + .withStream( + new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject())); + final AirbyteStateMessage stateMessage2 = new AirbyteStateMessage() + .withStateType(AirbyteStateType.STREAM) + .withStream( + new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject())); + final Optional stateWrapper = StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2))); + Assertions.assertThat(stateWrapper).isNotEmpty(); + Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.STREAM); + Assertions.assertThat(stateWrapper.get().getStateMessages()).containsExactlyInAnyOrder(stateMessage1, stateMessage2); + } + + @Test + public void testInvalidMixedState() { + final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage() + .withStateType(AirbyteStateType.STREAM) + .withStream( + new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject())); + final AirbyteStateMessage stateMessage2 = new AirbyteStateMessage() + .withStateType(AirbyteStateType.GLOBAL) + .withGlobal( + new AirbyteGlobalState() + .withSharedState(Jsons.emptyObject()) + .withStreamStates(Lists.newArrayList( + new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()), + new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject())))); + Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2)))) + .isInstanceOf(IllegalStateException.class); + } + + @Test + public void testDuplicatedGlobalState() { + final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage() + .withStateType(AirbyteStateType.GLOBAL) + .withGlobal( + new AirbyteGlobalState() + .withSharedState(Jsons.emptyObject()) + .withStreamStates(Lists.newArrayList( + new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()), + new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject())))); + final AirbyteStateMessage stateMessage2 = new AirbyteStateMessage() + .withStateType(AirbyteStateType.GLOBAL) + .withGlobal( + new AirbyteGlobalState() + .withSharedState(Jsons.emptyObject()) + .withStreamStates(Lists.newArrayList( + new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()), + new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject())))); + Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2)))) + .isInstanceOf(IllegalStateException.class); + } + + @Test + public void testEmptyStateList() { + Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList()))) + .isInstanceOf(IllegalStateException.class); + } + +}