Skip to content

Commit

Permalink
Common code to deserialize a state message in the new format (#13772)
Browse files Browse the repository at this point in the history
* Common code to deserialize a state message in the new format

* PR comments and type changed to typed

* Format

* Add StateType and StateWrapper objects to the model

* Use state wrapper instead of Either

* Switch to optional

* PR comments

* Support array legacy state

* format

Co-authored-by: Jimmy Ma <jimmy@airbyte.io>
  • Loading branch information
benmoriceau and gosusnp authored Jun 18, 2022
1 parent 74d16cc commit 5852989
Show file tree
Hide file tree
Showing 2 changed files with 185 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.helpers;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StateType;
import io.airbyte.config.StateWrapper;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import java.util.List;
import java.util.Optional;

public class StateMessageHelper {

public static class AirbyteStateMessageListTypeReference extends TypeReference<List<AirbyteStateMessage>> {}

/**
* This a takes a json blob state and tries return either a legacy state in the format of a json
* object or a state message with the new format which is a list of airbyte state message.
*
* @param state - a blob representing the state
* @return An optional state wrapper, if there is no state an empty optional will be returned
*/
public static Optional<StateWrapper> getTypedState(final JsonNode state) {
if (state == null) {
return Optional.empty();
} else {
final List<AirbyteStateMessage> stateMessages;
try {
stateMessages = Jsons.object(state, new AirbyteStateMessageListTypeReference());
} catch (final IllegalArgumentException e) {
return Optional.of(getLegacyStateWrapper(state));
}
if (stateMessages.stream().anyMatch(streamMessage -> !streamMessage.getAdditionalProperties().isEmpty())) {
return Optional.of(getLegacyStateWrapper(state));
}
if (stateMessages.size() == 1 && stateMessages.get(0).getStateType() == AirbyteStateType.GLOBAL) {
return Optional.of(new StateWrapper()
.withStateType(StateType.GLOBAL)
.withGlobal(stateMessages.get(0)));
} else if (stateMessages.size() >= 1
&& stateMessages.stream().allMatch(stateMessage -> stateMessage.getStateType() == AirbyteStateType.STREAM)) {
return Optional.of(new StateWrapper()
.withStateType(StateType.STREAM)
.withStateMessages(stateMessages));
} else {
throw new IllegalStateException("Unexpected state blob");
}
}
}

private static StateWrapper getLegacyStateWrapper(final JsonNode state) {
return new StateWrapper()
.withStateType(StateType.LEGACY)
.withLegacyState(state);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.helpers;

import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StateType;
import io.airbyte.config.StateWrapper;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteStreamState;
import io.airbyte.protocol.models.StreamDescriptor;
import java.util.Map;
import java.util.Optional;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

public class StateMessageHelperTest {

@Test
public void testEmpty() {
final Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(null);
Assertions.assertThat(stateWrapper).isEmpty();
}

@Test
public void testLegacy() {
final Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.emptyObject());
Assertions.assertThat(stateWrapper).isNotEmpty();
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.LEGACY);
}

@Test
public void testLegacyInList() {
final Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.jsonNode(
Lists.newArrayList(
Map.of("Any", "value"))));
Assertions.assertThat(stateWrapper).isNotEmpty();
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.LEGACY);
}

@Test
public void testGlobal() {
final AirbyteStateMessage stateMessage = new AirbyteStateMessage()
.withStateType(AirbyteStateType.GLOBAL)
.withGlobal(
new AirbyteGlobalState()
.withSharedState(Jsons.emptyObject())
.withStreamStates(Lists.newArrayList(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()),
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()))));
final Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage)));
Assertions.assertThat(stateWrapper).isNotEmpty();
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.GLOBAL);
Assertions.assertThat(stateWrapper.get().getGlobal()).isEqualTo(stateMessage);
}

@Test
public void testStream() {
final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.STREAM)
.withStream(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()));
final AirbyteStateMessage stateMessage2 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.STREAM)
.withStream(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()));
final Optional<StateWrapper> stateWrapper = StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2)));
Assertions.assertThat(stateWrapper).isNotEmpty();
Assertions.assertThat(stateWrapper.get().getStateType()).isEqualTo(StateType.STREAM);
Assertions.assertThat(stateWrapper.get().getStateMessages()).containsExactlyInAnyOrder(stateMessage1, stateMessage2);
}

@Test
public void testInvalidMixedState() {
final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.STREAM)
.withStream(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()));
final AirbyteStateMessage stateMessage2 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.GLOBAL)
.withGlobal(
new AirbyteGlobalState()
.withSharedState(Jsons.emptyObject())
.withStreamStates(Lists.newArrayList(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()),
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()))));
Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2))))
.isInstanceOf(IllegalStateException.class);
}

@Test
public void testDuplicatedGlobalState() {
final AirbyteStateMessage stateMessage1 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.GLOBAL)
.withGlobal(
new AirbyteGlobalState()
.withSharedState(Jsons.emptyObject())
.withStreamStates(Lists.newArrayList(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()),
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()))));
final AirbyteStateMessage stateMessage2 = new AirbyteStateMessage()
.withStateType(AirbyteStateType.GLOBAL)
.withGlobal(
new AirbyteGlobalState()
.withSharedState(Jsons.emptyObject())
.withStreamStates(Lists.newArrayList(
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("a")).withStreamState(Jsons.emptyObject()),
new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName("b")).withStreamState(Jsons.emptyObject()))));
Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList(stateMessage1, stateMessage2))))
.isInstanceOf(IllegalStateException.class);
}

@Test
public void testEmptyStateList() {
Assertions.assertThatThrownBy(() -> StateMessageHelper.getTypedState(Jsons.jsonNode(Lists.newArrayList())))
.isInstanceOf(IllegalStateException.class);
}

}

0 comments on commit 5852989

Please sign in to comment.