Skip to content

Commit

Permalink
Revert "Remove data field in the state aggregator if possible (airbyt…
Browse files Browse the repository at this point in the history
…ehq#17538)" (airbytehq#17583)

This reverts commit f45275e.
  • Loading branch information
benmoriceau authored Oct 4, 2022
1 parent 759a27e commit 7180ac3
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@ public State getAggregated() {
if (state.getType() == null || state.getType() == AirbyteStateType.LEGACY) {
return new State().withState(state.getData());
} else {
/**
* The destination emit a Legacy state in order to be retro-compatible with old platform. If we are
* running this code, we know that the platform has been upgraded and we can thus discard the legacy
* state. Keeping the legacy state is causing issue because of its size
* (https://github.com/airbytehq/oncall/issues/731)
*/
state.setData(null);
return new State()
.withState(Jsons.jsonNode(List.of(state)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,6 @@ class StreamStateAggregator implements StateAggregator {

@Override
public void ingest(final AirbyteStateMessage stateMessage) {
/**
* The destination emit a Legacy state in order to be retro-compatible with old platform. If we are
* running this code, we know that the platform has been upgraded and we can thus discard the legacy
* state. Keeping the legacy state is causing issue because of its size
* (https://github.com/airbytehq/oncall/issues/731)
*/
stateMessage.setData(null);
aggregatedState.put(stateMessage.getStream().getStreamDescriptor(), stateMessage);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,13 @@ void testGlobalState() {
final AirbyteStateMessage state1 = getGlobalMessage(1);
final AirbyteStateMessage state2 = getGlobalMessage(2);

final AirbyteStateMessage state1NoData = getGlobalMessage(1).withData(null);
final AirbyteStateMessage state2NoData = getGlobalMessage(2).withData(null);

stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state1), AirbyteStateMessage.class));
stateAggregator.ingest(state1);
Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State()
.withState(Jsons.jsonNode(List.of(state1NoData))));
.withState(Jsons.jsonNode(List.of(state1))));

stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state2), AirbyteStateMessage.class));
stateAggregator.ingest(state2);
Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State()
.withState(Jsons.jsonNode(List.of(state2NoData))));
.withState(Jsons.jsonNode(List.of(state2))));
}

@Test
Expand Down Expand Up @@ -129,23 +126,19 @@ void testStreamStateWithFeatureFlagOn() {
final AirbyteStateMessage state2 = getStreamMessage("b", 2);
final AirbyteStateMessage state3 = getStreamMessage("b", 3);

final AirbyteStateMessage state1NoData = getStreamMessage("a", 1).withData(null);
final AirbyteStateMessage state2NoData = getStreamMessage("b", 2).withData(null);
final AirbyteStateMessage state3NoData = getStreamMessage("b", 3).withData(null);

stateAggregator = new DefaultStateAggregator(USE_STREAM_CAPABLE_STATE);

stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state1), AirbyteStateMessage.class));
stateAggregator.ingest(state1);
Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State()
.withState(Jsons.jsonNode(List.of(state1NoData))));
.withState(Jsons.jsonNode(List.of(state1))));

stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state2), AirbyteStateMessage.class));
stateAggregator.ingest(state2);
Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State()
.withState(Jsons.jsonNode(List.of(state2NoData, state1NoData))));
.withState(Jsons.jsonNode(List.of(state2, state1))));

stateAggregator.ingest(Jsons.object(Jsons.jsonNode(state3), AirbyteStateMessage.class));
stateAggregator.ingest(state3);
Assertions.assertThat(stateAggregator.getAggregated()).isEqualTo(new State()
.withState(Jsons.jsonNode(List.of(state3NoData, state1NoData))));
.withState(Jsons.jsonNode(List.of(state3, state1))));
}

private AirbyteStateMessage getNullMessage(final int stateValue) {
Expand All @@ -165,8 +158,7 @@ private AirbyteStateMessage getGlobalMessage(final int stateValue) {
.withStreamDescriptor(
new StreamDescriptor()
.withName("test"))
.withStreamState(Jsons.jsonNode(stateValue)))))
.withData(Jsons.jsonNode("HelloWorld"));
.withStreamState(Jsons.jsonNode(stateValue)))));
}

private AirbyteStateMessage getStreamMessage(final String streamName, final int stateValue) {
Expand All @@ -176,8 +168,7 @@ private AirbyteStateMessage getStreamMessage(final String streamName, final int
.withStreamDescriptor(
new StreamDescriptor()
.withName(streamName))
.withStreamState(Jsons.jsonNode(stateValue)))
.withData(Jsons.jsonNode("Hello"));
.withStreamState(Jsons.jsonNode(stateValue)));
}

private AirbyteStateMessage getEmptyMessage(final AirbyteStateType stateType) {
Expand Down

0 comments on commit 7180ac3

Please sign in to comment.