From 9fc71e8de6709086f5511fe32ded46d63043126e Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Mon, 3 Oct 2022 16:10:41 -0700 Subject: [PATCH 1/3] Remove data field in the state aggregator if possible --- .../workers/internal/state_aggregator/SingleStateAggregator.java | 1 + .../workers/internal/state_aggregator/StreamStateAggregator.java | 1 + 2 files changed, 2 insertions(+) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java index 09106a08c1e2..86582176dd77 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java @@ -24,6 +24,7 @@ public State getAggregated() { if (state.getType() == null || state.getType() == AirbyteStateType.LEGACY) { return new State().withState(state.getData()); } else { + state.setData(null); return new State() .withState(Jsons.jsonNode(List.of(state))); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java index d55563efe0ec..4ee2b608af69 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java @@ -17,6 +17,7 @@ class StreamStateAggregator implements StateAggregator { @Override public void ingest(final AirbyteStateMessage stateMessage) { + stateMessage.setData(null); aggregatedState.put(stateMessage.getStream().getStreamDescriptor(), stateMessage); } From a61f70955fd686a41c978b99454a04493d430d50 Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Mon, 3 Oct 2022 16:24:27 -0700 Subject: [PATCH 2/3] Add comments --- .../internal/state_aggregator/SingleStateAggregator.java | 5 +++++ .../internal/state_aggregator/StreamStateAggregator.java | 5 +++++ 2 files changed, 10 insertions(+) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java index 86582176dd77..671fabc0a1f4 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java @@ -24,6 +24,11 @@ public State getAggregated() { if (state.getType() == null || state.getType() == AirbyteStateType.LEGACY) { return new State().withState(state.getData()); } else { + /** + * The destination emit a Legacy state in order to be retro-compatible with old platform. + * If we are running this code, we know that the platform has been upgraded and we can thus discard the legacy state. + * Keeping the legacy state is causing issue because of its size (https://github.com/airbytehq/oncall/issues/731) + */ state.setData(null); return new State() .withState(Jsons.jsonNode(List.of(state))); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java index 4ee2b608af69..dc4145bfca58 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java @@ -17,6 +17,11 @@ class StreamStateAggregator implements StateAggregator { @Override public void ingest(final AirbyteStateMessage stateMessage) { + /** + * The destination emit a Legacy state in order to be retro-compatible with old platform. + * If we are running this code, we know that the platform has been upgraded and we can thus discard the legacy state. + * Keeping the legacy state is causing issue because of its size (https://github.com/airbytehq/oncall/issues/731) + */ stateMessage.setData(null); aggregatedState.put(stateMessage.getStream().getStreamDescriptor(), stateMessage); } From 8109754d4285be37ef5b8dfa51ea8245540f483f Mon Sep 17 00:00:00 2001 From: Benoit Moriceau Date: Mon, 3 Oct 2022 16:53:54 -0700 Subject: [PATCH 3/3] Add comment and update test --- .../SingleStateAggregator.java | 7 ++-- .../StreamStateAggregator.java | 7 ++-- .../state_aggregator/StateAggregatorTest.java | 33 ++++++++++++------- 3 files changed, 29 insertions(+), 18 deletions(-) diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java index 671fabc0a1f4..0cfe422ea1f7 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/SingleStateAggregator.java @@ -25,9 +25,10 @@ public State getAggregated() { return new State().withState(state.getData()); } else { /** - * The destination emit a Legacy state in order to be retro-compatible with old platform. - * If we are running this code, we know that the platform has been upgraded and we can thus discard the legacy state. - * Keeping the legacy state is causing issue because of its size (https://github.com/airbytehq/oncall/issues/731) + * The destination emit a Legacy state in order to be retro-compatible with old platform. If we are + * running this code, we know that the platform has been upgraded and we can thus discard the legacy + * state. Keeping the legacy state is causing issue because of its size + * (https://github.com/airbytehq/oncall/issues/731) */ state.setData(null); return new State() diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java index dc4145bfca58..4d3247b2549d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/internal/state_aggregator/StreamStateAggregator.java @@ -18,9 +18,10 @@ class StreamStateAggregator implements StateAggregator { @Override public void ingest(final AirbyteStateMessage stateMessage) { /** - * The destination emit a Legacy state in order to be retro-compatible with old platform. - * If we are running this code, we know that the platform has been upgraded and we can thus discard the legacy state. - * Keeping the legacy state is causing issue because of its size (https://github.com/airbytehq/oncall/issues/731) + * The destination emit a Legacy state in order to be retro-compatible with old platform. If we are + * running this code, we know that the platform has been upgraded and we can thus discard the legacy + * state. Keeping the legacy state is causing issue because of its size + * (https://github.com/airbytehq/oncall/issues/731) */ stateMessage.setData(null); aggregatedState.put(stateMessage.getStream().getStreamDescriptor(), stateMessage); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/internal/state_aggregator/StateAggregatorTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/internal/state_aggregator/StateAggregatorTest.java index bd591be29b13..83761bf5b194 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/internal/state_aggregator/StateAggregatorTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/internal/state_aggregator/StateAggregatorTest.java @@ -92,13 +92,16 @@ void testGlobalState() { final AirbyteStateMessage state1 = getGlobalMessage(1); final AirbyteStateMessage state2 = getGlobalMessage(2); - stateAggregator.ingest(state1); + final AirbyteStateMessage state1NoData = getGlobalMessage(1).withData(null); + final AirbyteStateMessage state2NoData = getGlobalMessage(2).withData(null); + + stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state1), AirbyteStateMessage.class)); Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State() - .withState(Jsons.jsonNode(List.of(state1)))); + .withState(Jsons.jsonNode(List.of(state1NoData)))); - stateAggregator.ingest(state2); + stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state2), AirbyteStateMessage.class)); Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State() - .withState(Jsons.jsonNode(List.of(state2)))); + .withState(Jsons.jsonNode(List.of(state2NoData)))); } @Test @@ -126,19 +129,23 @@ void testStreamStateWithFeatureFlagOn() { final AirbyteStateMessage state2 = getStreamMessage("b", 2); final AirbyteStateMessage state3 = getStreamMessage("b", 3); + final AirbyteStateMessage state1NoData = getStreamMessage("a", 1).withData(null); + final AirbyteStateMessage state2NoData = getStreamMessage("b", 2).withData(null); + final AirbyteStateMessage state3NoData = getStreamMessage("b", 3).withData(null); + stateAggregator = new DefaultStateAggregator(USE_STREAM_CAPABLE_STATE); - stateAggregator.ingest(state1); + stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state1), AirbyteStateMessage.class)); Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State() - .withState(Jsons.jsonNode(List.of(state1)))); + .withState(Jsons.jsonNode(List.of(state1NoData)))); - stateAggregator.ingest(state2); + stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state2), AirbyteStateMessage.class)); Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State() - .withState(Jsons.jsonNode(List.of(state2, state1)))); + .withState(Jsons.jsonNode(List.of(state2NoData, state1NoData)))); - stateAggregator.ingest(state3); + stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state3), AirbyteStateMessage.class)); Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State() - .withState(Jsons.jsonNode(List.of(state3, state1)))); + .withState(Jsons.jsonNode(List.of(state3NoData, state1NoData)))); } private AirbyteStateMessage getNullMessage(final int stateValue) { @@ -158,7 +165,8 @@ private AirbyteStateMessage getGlobalMessage(final int stateValue) { .withStreamDescriptor( new StreamDescriptor() .withName("test")) - .withStreamState(Jsons.jsonNode(stateValue))))); + .withStreamState(Jsons.jsonNode(stateValue))))) + .withData(Jsons.jsonNode("HelloWorld")); } private AirbyteStateMessage getStreamMessage(final String streamName, final int stateValue) { @@ -168,7 +176,8 @@ private AirbyteStateMessage getStreamMessage(final String streamName, final int .withStreamDescriptor( new StreamDescriptor() .withName(streamName)) - .withStreamState(Jsons.jsonNode(stateValue))); + .withStreamState(Jsons.jsonNode(stateValue))) + .withData(Jsons.jsonNode("Hello")); } private AirbyteStateMessage getEmptyMessage(final AirbyteStateType stateType) {