From 366f75ba2abcfe99a655648c214a76a5ac7fce7c Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Tue, 4 Oct 2022 14:41:28 -0700 Subject: [PATCH] Revert "Remove data field in the state aggregator if possible (#17538)" This reverts commit f45275e78f5ec7dceb05fae0d0d02fffc22dcc10. --- .../SingleStateAggregator.java | 7 ---- .../StreamStateAggregator.java | 7 ---- .../state_aggregator/StateAggregatorTest.java | 33 +++++++------------ 3 files changed, 12 insertions(+), 35 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java index 0cfe422ea1f7..09106a08c1e2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java @@ -24,13 +24,6 @@ public State getAggregated() { if (state.getType() == null || state.getType() == AirbyteStateType.LEGACY) { return new State().withState(state.getData()); } else { - /** - * The destination emit a Legacy state in order to be retro-compatible with old platform. If we are - * running this code, we know that the platform has been upgraded and we can thus discard the legacy - * state. Keeping the legacy state is causing issue because of its size - * (https://github.com/airbytehq/oncall/issues/731) - */ - state.setData(null); return new State() .withState(Jsons.jsonNode(List.of(state))); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java index 4d3247b2549d..d55563efe0ec 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java @@ -17,13 +17,6 @@ class StreamStateAggregator implements StateAggregator { @Override public void ingest(final AirbyteStateMessage stateMessage) { - /** - * The destination emit a Legacy state in order to be retro-compatible with old platform. If we are - * running this code, we know that the platform has been upgraded and we can thus discard the legacy - * state. Keeping the legacy state is causing issue because of its size - * (https://github.com/airbytehq/oncall/issues/731) - */ - stateMessage.setData(null); aggregatedState.put(stateMessage.getStream().getStreamDescriptor(), stateMessage); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/internal/state_aggregator/StateAggregatorTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/internal/state_aggregator/StateAggregatorTest.java index 83761bf5b194..bd591be29b13 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/internal/state_aggregator/StateAggregatorTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/internal/state_aggregator/StateAggregatorTest.java @@ -92,16 +92,13 @@ void testGlobalState() { final AirbyteStateMessage state1 = getGlobalMessage(1); final AirbyteStateMessage state2 = getGlobalMessage(2); - final AirbyteStateMessage state1NoData = getGlobalMessage(1).withData(null); - final AirbyteStateMessage state2NoData = getGlobalMessage(2).withData(null); - - stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state1), AirbyteStateMessage.class)); + stateAggregator.ingest(state1); Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State() - .withState(Jsons.jsonNode(List.of(state1NoData)))); + .withState(Jsons.jsonNode(List.of(state1)))); - stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state2), AirbyteStateMessage.class)); + stateAggregator.ingest(state2); Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State() - .withState(Jsons.jsonNode(List.of(state2NoData)))); + .withState(Jsons.jsonNode(List.of(state2)))); } @Test @@ -129,23 +126,19 @@ void testStreamStateWithFeatureFlagOn() { final AirbyteStateMessage state2 = getStreamMessage("b", 2); final AirbyteStateMessage state3 = getStreamMessage("b", 3); - final AirbyteStateMessage state1NoData = getStreamMessage("a", 1).withData(null); - final AirbyteStateMessage state2NoData = getStreamMessage("b", 2).withData(null); - final AirbyteStateMessage state3NoData = getStreamMessage("b", 3).withData(null); - stateAggregator = new DefaultStateAggregator(USE_STREAM_CAPABLE_STATE); - stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state1), AirbyteStateMessage.class)); + stateAggregator.ingest(state1); Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State() - .withState(Jsons.jsonNode(List.of(state1NoData)))); + .withState(Jsons.jsonNode(List.of(state1)))); - stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state2), AirbyteStateMessage.class)); + stateAggregator.ingest(state2); Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State() - .withState(Jsons.jsonNode(List.of(state2NoData, state1NoData)))); + .withState(Jsons.jsonNode(List.of(state2, state1)))); - stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state3), AirbyteStateMessage.class)); + stateAggregator.ingest(state3); Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State() - .withState(Jsons.jsonNode(List.of(state3NoData, state1NoData)))); + .withState(Jsons.jsonNode(List.of(state3, state1)))); } private AirbyteStateMessage getNullMessage(final int stateValue) { @@ -165,8 +158,7 @@ private AirbyteStateMessage getGlobalMessage(final int stateValue) { .withStreamDescriptor( new StreamDescriptor() .withName("test")) - .withStreamState(Jsons.jsonNode(stateValue))))) - .withData(Jsons.jsonNode("HelloWorld")); + .withStreamState(Jsons.jsonNode(stateValue))))); } private AirbyteStateMessage getStreamMessage(final String streamName, final int stateValue) { @@ -176,8 +168,7 @@ private AirbyteStateMessage getStreamMessage(final String streamName, final int .withStreamDescriptor( new StreamDescriptor() .withName(streamName)) - .withStreamState(Jsons.jsonNode(stateValue))) - .withData(Jsons.jsonNode("Hello")); + .withStreamState(Jsons.jsonNode(stateValue))); } private AirbyteStateMessage getEmptyMessage(final AirbyteStateType stateType) {