diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index d251f3a2e29d..bf9d45a3f4c9 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -23,6 +23,7 @@ import com.google.common.hash.Hashing; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.MoreBooleans; +import io.airbyte.commons.version.AirbyteProtocolVersion; import io.airbyte.config.ActorCatalog; import io.airbyte.config.AirbyteConfig; import io.airbyte.config.ConfigSchema; @@ -206,6 +207,8 @@ public List listStandardSourceDefinitions(final boolea final List sourceDefinitions = new ArrayList<>(); for (final StandardSourceDefinition sourceDefinition : persistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)) { + sourceDefinition.withProtocolVersion(AirbyteProtocolVersion + .getWithDefault(sourceDefinition.getSpec() != null ? sourceDefinition.getSpec().getProtocolVersion() : null).serialize()); if (!MoreBooleans.isTruthy(sourceDefinition.getTombstone()) || includeTombstone) { sourceDefinitions.add(sourceDefinition); } @@ -306,6 +309,8 @@ public List listStandardDestinationDefinitions(fi for (final StandardDestinationDefinition destinationDefinition : persistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) { + destinationDefinition.withProtocolVersion(AirbyteProtocolVersion + .getWithDefault(destinationDefinition.getSpec() != null ? destinationDefinition.getSpec().getProtocolVersion() : null).serialize()); if (!MoreBooleans.isTruthy(destinationDefinition.getTombstone()) || includeTombstone) { destinationDefinitions.add(destinationDefinition); } diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java index f1b8ae80488b..b2fa1ea2ca37 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.when; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.version.AirbyteProtocolVersion; import io.airbyte.config.ConfigSchema; import io.airbyte.config.DestinationConnection; import io.airbyte.config.SourceConnection; @@ -31,6 +32,7 @@ import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.StreamDescriptor; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; @@ -225,6 +227,46 @@ void testListStandardSourceDefinitionsHandlesTombstoneSourceDefinitions(final in assertEquals(allSourceDefinitions, returnedSourceDefinitionsWithTombstone); } + @Test + void testListDestinationDefinitionsWithVersion() throws JsonValidationException, IOException { + final List allSourceDefinitions = List.of( + new StandardDestinationDefinition(), + new StandardDestinationDefinition().withSpec(new ConnectorSpecification().withProtocolVersion("0.3.1")), + // We expect the protocol version to be in the ConnectorSpec, so we'll override regardless. + new StandardDestinationDefinition().withProtocolVersion("0.4.0").withSpec(new ConnectorSpecification().withProtocolVersion("0.4.1")), + new StandardDestinationDefinition().withProtocolVersion("0.5.0").withSpec(new ConnectorSpecification())); + + when(configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) + .thenReturn(allSourceDefinitions); + + final List destinationDefinitions = configRepository.listStandardDestinationDefinitions(false); + final List protocolVersions = destinationDefinitions.stream().map(StandardDestinationDefinition::getProtocolVersion).toList(); + assertEquals( + List.of(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize(), "0.3.1", "0.4.1", + AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize()), + protocolVersions); + } + + @Test + void testListSourceDefinitionsWithVersion() throws JsonValidationException, IOException { + final List allSourceDefinitions = List.of( + new StandardSourceDefinition(), + new StandardSourceDefinition().withSpec(new ConnectorSpecification().withProtocolVersion("0.6.0")), + // We expect the protocol version to be in the ConnectorSpec, so we'll override regardless. + new StandardSourceDefinition().withProtocolVersion("0.7.0").withSpec(new ConnectorSpecification().withProtocolVersion("0.7.1")), + new StandardSourceDefinition().withProtocolVersion("0.8.0").withSpec(new ConnectorSpecification())); + + when(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)) + .thenReturn(allSourceDefinitions); + + final List sourceDefinitions = configRepository.listStandardSourceDefinitions(false); + final List protocolVersions = sourceDefinitions.stream().map(StandardSourceDefinition::getProtocolVersion).toList(); + assertEquals( + List.of(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize(), "0.6.0", "0.7.1", + AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize()), + protocolVersions); + } + @Test void testDeleteSourceDefinitionAndAssociations() throws JsonValidationException, IOException, ConfigNotFoundException { final StandardSourceDefinition sourceDefToDelete = new StandardSourceDefinition().withSourceDefinitionId(UUID.randomUUID());