From 6dbef7436392a4643cbc1478deda639a3f05d622 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Fri, 24 Jun 2022 17:15:33 -0700 Subject: [PATCH] Fix StatePersistence Legacy read/write StatePersistence will wrap/unwrap legacy state on write/read to ensure compatibility with the old behavior/data. --- .../config/persistence/StatePersistence.java | 8 ++++-- .../persistence/StatePersistenceTest.java | 25 ++++++++++++++++++- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/StatePersistence.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/StatePersistence.java index a23d1f0c4e0f..e21becfdf1d9 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/StatePersistence.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/StatePersistence.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.enums.Enums; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.State; import io.airbyte.config.StateType; import io.airbyte.config.StateWrapper; import io.airbyte.db.Database; @@ -158,7 +159,9 @@ static void writeStateToDb(final DSLContext ctx, isNullOrEquals(STATE.NAMESPACE, namespace)) .fetch().isNotEmpty(); - final JSONB jsonbState = JSONB.valueOf(Jsons.serialize(state)); + // NOTE: the legacy code was storing a State object instead of just the State data field. We kept + // the same behavior for consistency. + final JSONB jsonbState = JSONB.valueOf(Jsons.serialize(stateType != StateType.LEGACY ? state : new State().withState(state))); final OffsetDateTime now = OffsetDateTime.now(); if (!hasState) { @@ -292,9 +295,10 @@ record -> new AirbyteStateMessage() * Build a StateWrapper for Legacy state */ private static StateWrapper buildLegacyState(final List records) { + final State legacyState = Jsons.convertValue(records.get(0).state, State.class); return new StateWrapper() .withStateType(StateType.LEGACY) - .withLegacyState(records.get(0).state); + .withLegacyState(legacyState.getState()); } /** diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java index 0c4e70dcf522..2b67fb8c6321 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java @@ -15,6 +15,7 @@ import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardWorkspace; +import io.airbyte.config.State; import io.airbyte.config.StateType; import io.airbyte.config.StateWrapper; import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor; @@ -45,6 +46,7 @@ public class StatePersistenceTest extends BaseDatabaseConfigPersistenceTest { + private ConfigRepository configRepository; private StatePersistence statePersistence; private UUID connectionId; @@ -489,6 +491,27 @@ public void testEnumsConversion() { io.airbyte.config.StateType.class)); } + @Test + public void testStatePersistenceLegacyReadConsistency() throws IOException { + final JsonNode jsonState = Jsons.deserialize("{\"my\": \"state\"}"); + final State state = new State().withState(jsonState); + configRepository.updateConnectionState(connectionId, state); + + final StateWrapper readStateWrapper = statePersistence.getCurrentState(connectionId).orElseThrow(); + Assertions.assertEquals(StateType.LEGACY, readStateWrapper.getStateType()); + Assertions.assertEquals(state.getState(), readStateWrapper.getLegacyState()); + } + + @Test + public void testStatePersistenceLegacyWriteConsistency() throws IOException { + final JsonNode jsonState = Jsons.deserialize("{\"my\": \"state\"}"); + final StateWrapper stateWrapper = new StateWrapper().withStateType(StateType.LEGACY).withLegacyState(jsonState); + statePersistence.updateOrCreateState(connectionId, stateWrapper); + + final State readState = configRepository.getConnectionState(connectionId).orElseThrow(); + Assertions.assertEquals(readState.getState(), stateWrapper.getLegacyState()); + } + @BeforeEach public void beforeEach() throws DatabaseInitializationException, IOException, JsonValidationException { dataSource = DatabaseConnectionHelper.createDataSource(container); @@ -510,7 +533,7 @@ public void afterEach() { } private void setupTestData() throws JsonValidationException, IOException { - ConfigRepository configRepository = new ConfigRepository( + configRepository = new ConfigRepository( new DatabaseConfigPersistence(database, mock(JsonSecretsProcessor.class)), database);