From 6e7e9ed71ea4f7e0e7e0507250027a74800df988 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Wed, 14 Sep 2022 14:25:25 -0700 Subject: [PATCH] Update ConfigRepository to read protocol version (#16670) When reading actor definitions, we need to copy the protocol version from the connector spec into the top level protocol version value of the standard definitions for it to be stored in the database. --- .../config/persistence/ConfigRepository.java | 5 +++ .../persistence/ConfigRepositoryTest.java | 42 +++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index d251f3a2e29d..bf9d45a3f4c9 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -23,6 +23,7 @@ import com.google.common.hash.Hashing; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.MoreBooleans; +import io.airbyte.commons.version.AirbyteProtocolVersion; import io.airbyte.config.ActorCatalog; import io.airbyte.config.AirbyteConfig; import io.airbyte.config.ConfigSchema; @@ -206,6 +207,8 @@ public List listStandardSourceDefinitions(final boolea final List sourceDefinitions = new ArrayList<>(); for (final StandardSourceDefinition sourceDefinition : persistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)) { + sourceDefinition.withProtocolVersion(AirbyteProtocolVersion + .getWithDefault(sourceDefinition.getSpec() != null ? sourceDefinition.getSpec().getProtocolVersion() : null).serialize()); if (!MoreBooleans.isTruthy(sourceDefinition.getTombstone()) || includeTombstone) { sourceDefinitions.add(sourceDefinition); } @@ -306,6 +309,8 @@ public List listStandardDestinationDefinitions(fi for (final StandardDestinationDefinition destinationDefinition : persistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) { + destinationDefinition.withProtocolVersion(AirbyteProtocolVersion + .getWithDefault(destinationDefinition.getSpec() != null ? destinationDefinition.getSpec().getProtocolVersion() : null).serialize()); if (!MoreBooleans.isTruthy(destinationDefinition.getTombstone()) || includeTombstone) { destinationDefinitions.add(destinationDefinition); } diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java index f1b8ae80488b..b2fa1ea2ca37 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.when; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.version.AirbyteProtocolVersion; import io.airbyte.config.ConfigSchema; import io.airbyte.config.DestinationConnection; import io.airbyte.config.SourceConnection; @@ -31,6 +32,7 @@ import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.StreamDescriptor; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; @@ -225,6 +227,46 @@ void testListStandardSourceDefinitionsHandlesTombstoneSourceDefinitions(final in assertEquals(allSourceDefinitions, returnedSourceDefinitionsWithTombstone); } + @Test + void testListDestinationDefinitionsWithVersion() throws JsonValidationException, IOException { + final List allSourceDefinitions = List.of( + new StandardDestinationDefinition(), + new StandardDestinationDefinition().withSpec(new ConnectorSpecification().withProtocolVersion("0.3.1")), + // We expect the protocol version to be in the ConnectorSpec, so we'll override regardless. + new StandardDestinationDefinition().withProtocolVersion("0.4.0").withSpec(new ConnectorSpecification().withProtocolVersion("0.4.1")), + new StandardDestinationDefinition().withProtocolVersion("0.5.0").withSpec(new ConnectorSpecification())); + + when(configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) + .thenReturn(allSourceDefinitions); + + final List destinationDefinitions = configRepository.listStandardDestinationDefinitions(false); + final List protocolVersions = destinationDefinitions.stream().map(StandardDestinationDefinition::getProtocolVersion).toList(); + assertEquals( + List.of(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize(), "0.3.1", "0.4.1", + AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize()), + protocolVersions); + } + + @Test + void testListSourceDefinitionsWithVersion() throws JsonValidationException, IOException { + final List allSourceDefinitions = List.of( + new StandardSourceDefinition(), + new StandardSourceDefinition().withSpec(new ConnectorSpecification().withProtocolVersion("0.6.0")), + // We expect the protocol version to be in the ConnectorSpec, so we'll override regardless. + new StandardSourceDefinition().withProtocolVersion("0.7.0").withSpec(new ConnectorSpecification().withProtocolVersion("0.7.1")), + new StandardSourceDefinition().withProtocolVersion("0.8.0").withSpec(new ConnectorSpecification())); + + when(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)) + .thenReturn(allSourceDefinitions); + + final List sourceDefinitions = configRepository.listStandardSourceDefinitions(false); + final List protocolVersions = sourceDefinitions.stream().map(StandardSourceDefinition::getProtocolVersion).toList(); + assertEquals( + List.of(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize(), "0.6.0", "0.7.1", + AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize()), + protocolVersions); + } + @Test void testDeleteSourceDefinitionAndAssociations() throws JsonValidationException, IOException, ConfigNotFoundException { final StandardSourceDefinition sourceDefToDelete = new StandardSourceDefinition().withSourceDefinitionId(UUID.randomUUID());