diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java index 09106a08c1e2..0cfe422ea1f7 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java @@ -24,6 +24,13 @@ public State getAggregated() { if (state.getType() == null || state.getType() == AirbyteStateType.LEGACY) { return new State().withState(state.getData()); } else { + /** + * The destination emit a Legacy state in order to be retro-compatible with old platform. If we are + * running this code, we know that the platform has been upgraded and we can thus discard the legacy + * state. Keeping the legacy state is causing issue because of its size + * (https://github.com/airbytehq/oncall/issues/731) + */ + state.setData(null); return new State() .withState(Jsons.jsonNode(List.of(state))); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java index d55563efe0ec..4d3247b2549d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java @@ -17,6 +17,13 @@ class StreamStateAggregator implements StateAggregator { @Override public void ingest(final AirbyteStateMessage stateMessage) { + /** + * The destination emit a Legacy state in order to be retro-compatible with old platform. If we are + * running this code, we know that the platform has been upgraded and we can thus discard the legacy + * state. Keeping the legacy state is causing issue because of its size + * (https://github.com/airbytehq/oncall/issues/731) + */ + stateMessage.setData(null); aggregatedState.put(stateMessage.getStream().getStreamDescriptor(), stateMessage); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/internal/state_aggregator/StateAggregatorTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/internal/state_aggregator/StateAggregatorTest.java index bd591be29b13..83761bf5b194 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/internal/state_aggregator/StateAggregatorTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/internal/state_aggregator/StateAggregatorTest.java @@ -92,13 +92,16 @@ void testGlobalState() { final AirbyteStateMessage state1 = getGlobalMessage(1); final AirbyteStateMessage state2 = getGlobalMessage(2); - stateAggregator.ingest(state1); + final AirbyteStateMessage state1NoData = getGlobalMessage(1).withData(null); + final AirbyteStateMessage state2NoData = getGlobalMessage(2).withData(null); + + stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state1), AirbyteStateMessage.class)); Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State() - .withState(Jsons.jsonNode(List.of(state1)))); + .withState(Jsons.jsonNode(List.of(state1NoData)))); - stateAggregator.ingest(state2); + stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state2), AirbyteStateMessage.class)); Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State() - .withState(Jsons.jsonNode(List.of(state2)))); + .withState(Jsons.jsonNode(List.of(state2NoData)))); } @Test @@ -126,19 +129,23 @@ void testStreamStateWithFeatureFlagOn() { final AirbyteStateMessage state2 = getStreamMessage("b", 2); final AirbyteStateMessage state3 = getStreamMessage("b", 3); + final AirbyteStateMessage state1NoData = getStreamMessage("a", 1).withData(null); + final AirbyteStateMessage state2NoData = getStreamMessage("b", 2).withData(null); + final AirbyteStateMessage state3NoData = getStreamMessage("b", 3).withData(null); + stateAggregator = new DefaultStateAggregator(USE_STREAM_CAPABLE_STATE); - stateAggregator.ingest(state1); + stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state1), AirbyteStateMessage.class)); Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State() - .withState(Jsons.jsonNode(List.of(state1)))); + .withState(Jsons.jsonNode(List.of(state1NoData)))); - stateAggregator.ingest(state2); + stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state2), AirbyteStateMessage.class)); Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State() - .withState(Jsons.jsonNode(List.of(state2, state1)))); + .withState(Jsons.jsonNode(List.of(state2NoData, state1NoData)))); - stateAggregator.ingest(state3); + stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state3), AirbyteStateMessage.class)); Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State() - .withState(Jsons.jsonNode(List.of(state3, state1)))); + .withState(Jsons.jsonNode(List.of(state3NoData, state1NoData)))); } private AirbyteStateMessage getNullMessage(final int stateValue) { @@ -158,7 +165,8 @@ private AirbyteStateMessage getGlobalMessage(final int stateValue) { .withStreamDescriptor( new StreamDescriptor() .withName("test")) - .withStreamState(Jsons.jsonNode(stateValue))))); + .withStreamState(Jsons.jsonNode(stateValue))))) + .withData(Jsons.jsonNode("HelloWorld")); } private AirbyteStateMessage getStreamMessage(final String streamName, final int stateValue) { @@ -168,7 +176,8 @@ private AirbyteStateMessage getStreamMessage(final String streamName, final int .withStreamDescriptor( new StreamDescriptor() .withName(streamName)) - .withStreamState(Jsons.jsonNode(stateValue))); + .withStreamState(Jsons.jsonNode(stateValue))) + .withData(Jsons.jsonNode("Hello")); } private AirbyteStateMessage getEmptyMessage(final AirbyteStateType stateType) {