From 1afede30410cccc9fe2ef198f192f9bd13e5df75 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Tue, 13 Sep 2022 14:31:38 -0700 Subject: [PATCH 1/3] Update ConfigRepository to read protocol version --- .../config/persistence/ConfigRepository.java | 5 +++ .../persistence/ConfigRepositoryTest.java | 37 +++++++++++++++++++ 2 files changed, 42 insertions(+) diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index d251f3a2e29d..bf9d45a3f4c9 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -23,6 +23,7 @@ import com.google.common.hash.Hashing; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.MoreBooleans; +import io.airbyte.commons.version.AirbyteProtocolVersion; import io.airbyte.config.ActorCatalog; import io.airbyte.config.AirbyteConfig; import io.airbyte.config.ConfigSchema; @@ -206,6 +207,8 @@ public List listStandardSourceDefinitions(final boolea final List sourceDefinitions = new ArrayList<>(); for (final StandardSourceDefinition sourceDefinition : persistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)) { + sourceDefinition.withProtocolVersion(AirbyteProtocolVersion + .getWithDefault(sourceDefinition.getSpec() != null ? sourceDefinition.getSpec().getProtocolVersion() : null).serialize()); if (!MoreBooleans.isTruthy(sourceDefinition.getTombstone()) || includeTombstone) { sourceDefinitions.add(sourceDefinition); } @@ -306,6 +309,8 @@ public List listStandardDestinationDefinitions(fi for (final StandardDestinationDefinition destinationDefinition : persistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) { + destinationDefinition.withProtocolVersion(AirbyteProtocolVersion + .getWithDefault(destinationDefinition.getSpec() != null ? destinationDefinition.getSpec().getProtocolVersion() : null).serialize()); if (!MoreBooleans.isTruthy(destinationDefinition.getTombstone()) || includeTombstone) { destinationDefinitions.add(destinationDefinition); } diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java index f1b8ae80488b..70fae1c0c3f5 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java @@ -31,6 +31,7 @@ import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.StreamDescriptor; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; @@ -225,6 +226,42 @@ void testListStandardSourceDefinitionsHandlesTombstoneSourceDefinitions(final in assertEquals(allSourceDefinitions, returnedSourceDefinitionsWithTombstone); } + @Test + void testListDestinationDefinitionsWithVersion() throws JsonValidationException, IOException { + final List allSourceDefinitions = List.of( + new StandardDestinationDefinition(), + new StandardDestinationDefinition().withSpec(new ConnectorSpecification().withProtocolVersion("0.3.0")), + // We expect the protocol version to be in the ConnectorSpec, so we'll override regardless. + new StandardDestinationDefinition().withProtocolVersion("0.3.0").withSpec(new ConnectorSpecification().withProtocolVersion("0.3.1")), + new StandardDestinationDefinition().withProtocolVersion("0.3.0").withSpec(new ConnectorSpecification()) + ); + + when(configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) + .thenReturn(allSourceDefinitions); + + final List destinationDefinitions = configRepository.listStandardDestinationDefinitions(false); + final List protocolVersions = destinationDefinitions.stream().map(StandardDestinationDefinition::getProtocolVersion).toList(); + assertEquals(List.of("0.2.0", "0.3.0", "0.3.1", "0.2.0"), protocolVersions); + } + + @Test + void testListSourceDefinitionsWithVersion() throws JsonValidationException, IOException { + final List allSourceDefinitions = List.of( + new StandardSourceDefinition(), + new StandardSourceDefinition().withSpec(new ConnectorSpecification().withProtocolVersion("0.3.0")), + // We expect the protocol version to be in the ConnectorSpec, so we'll override regardless. + new StandardSourceDefinition().withProtocolVersion("0.3.0").withSpec(new ConnectorSpecification().withProtocolVersion("0.3.1")), + new StandardSourceDefinition().withProtocolVersion("0.3.0").withSpec(new ConnectorSpecification()) + ); + + when(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)) + .thenReturn(allSourceDefinitions); + + final List sourceDefinitions = configRepository.listStandardSourceDefinitions(false); + final List protocolVersions = sourceDefinitions.stream().map(StandardSourceDefinition::getProtocolVersion).toList(); + assertEquals(List.of("0.2.0", "0.3.0", "0.3.1", "0.2.0"), protocolVersions); + } + @Test void testDeleteSourceDefinitionAndAssociations() throws JsonValidationException, IOException, ConfigNotFoundException { final StandardSourceDefinition sourceDefToDelete = new StandardSourceDefinition().withSourceDefinitionId(UUID.randomUUID()); From 28156682a47ddda6426d62fc482a29e0c170aaf3 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Tue, 13 Sep 2022 14:48:45 -0700 Subject: [PATCH 2/3] Format --- .../io/airbyte/config/persistence/ConfigRepositoryTest.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java index 70fae1c0c3f5..265efd1f759a 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java @@ -233,8 +233,7 @@ void testListDestinationDefinitionsWithVersion() throws JsonValidationException, new StandardDestinationDefinition().withSpec(new ConnectorSpecification().withProtocolVersion("0.3.0")), // We expect the protocol version to be in the ConnectorSpec, so we'll override regardless. new StandardDestinationDefinition().withProtocolVersion("0.3.0").withSpec(new ConnectorSpecification().withProtocolVersion("0.3.1")), - new StandardDestinationDefinition().withProtocolVersion("0.3.0").withSpec(new ConnectorSpecification()) - ); + new StandardDestinationDefinition().withProtocolVersion("0.3.0").withSpec(new ConnectorSpecification())); when(configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) .thenReturn(allSourceDefinitions); @@ -251,8 +250,7 @@ void testListSourceDefinitionsWithVersion() throws JsonValidationException, IOEx new StandardSourceDefinition().withSpec(new ConnectorSpecification().withProtocolVersion("0.3.0")), // We expect the protocol version to be in the ConnectorSpec, so we'll override regardless. new StandardSourceDefinition().withProtocolVersion("0.3.0").withSpec(new ConnectorSpecification().withProtocolVersion("0.3.1")), - new StandardSourceDefinition().withProtocolVersion("0.3.0").withSpec(new ConnectorSpecification()) - ); + new StandardSourceDefinition().withProtocolVersion("0.3.0").withSpec(new ConnectorSpecification())); when(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)) .thenReturn(allSourceDefinitions); From 957172a123532029061c3e99b8bfa8db45f20718 Mon Sep 17 00:00:00 2001 From: Jimmy Ma Date: Tue, 13 Sep 2022 14:55:56 -0700 Subject: [PATCH 3/3] PMD --- .../persistence/ConfigRepositoryTest.java | 23 ++++++++++++------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java index 265efd1f759a..b2fa1ea2ca37 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/ConfigRepositoryTest.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.when; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.version.AirbyteProtocolVersion; import io.airbyte.config.ConfigSchema; import io.airbyte.config.DestinationConnection; import io.airbyte.config.SourceConnection; @@ -230,34 +231,40 @@ void testListStandardSourceDefinitionsHandlesTombstoneSourceDefinitions(final in void testListDestinationDefinitionsWithVersion() throws JsonValidationException, IOException { final List allSourceDefinitions = List.of( new StandardDestinationDefinition(), - new StandardDestinationDefinition().withSpec(new ConnectorSpecification().withProtocolVersion("0.3.0")), + new StandardDestinationDefinition().withSpec(new ConnectorSpecification().withProtocolVersion("0.3.1")), // We expect the protocol version to be in the ConnectorSpec, so we'll override regardless. - new StandardDestinationDefinition().withProtocolVersion("0.3.0").withSpec(new ConnectorSpecification().withProtocolVersion("0.3.1")), - new StandardDestinationDefinition().withProtocolVersion("0.3.0").withSpec(new ConnectorSpecification())); + new StandardDestinationDefinition().withProtocolVersion("0.4.0").withSpec(new ConnectorSpecification().withProtocolVersion("0.4.1")), + new StandardDestinationDefinition().withProtocolVersion("0.5.0").withSpec(new ConnectorSpecification())); when(configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class)) .thenReturn(allSourceDefinitions); final List destinationDefinitions = configRepository.listStandardDestinationDefinitions(false); final List protocolVersions = destinationDefinitions.stream().map(StandardDestinationDefinition::getProtocolVersion).toList(); - assertEquals(List.of("0.2.0", "0.3.0", "0.3.1", "0.2.0"), protocolVersions); + assertEquals( + List.of(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize(), "0.3.1", "0.4.1", + AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize()), + protocolVersions); } @Test void testListSourceDefinitionsWithVersion() throws JsonValidationException, IOException { final List allSourceDefinitions = List.of( new StandardSourceDefinition(), - new StandardSourceDefinition().withSpec(new ConnectorSpecification().withProtocolVersion("0.3.0")), + new StandardSourceDefinition().withSpec(new ConnectorSpecification().withProtocolVersion("0.6.0")), // We expect the protocol version to be in the ConnectorSpec, so we'll override regardless. - new StandardSourceDefinition().withProtocolVersion("0.3.0").withSpec(new ConnectorSpecification().withProtocolVersion("0.3.1")), - new StandardSourceDefinition().withProtocolVersion("0.3.0").withSpec(new ConnectorSpecification())); + new StandardSourceDefinition().withProtocolVersion("0.7.0").withSpec(new ConnectorSpecification().withProtocolVersion("0.7.1")), + new StandardSourceDefinition().withProtocolVersion("0.8.0").withSpec(new ConnectorSpecification())); when(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class)) .thenReturn(allSourceDefinitions); final List sourceDefinitions = configRepository.listStandardSourceDefinitions(false); final List protocolVersions = sourceDefinitions.stream().map(StandardSourceDefinition::getProtocolVersion).toList(); - assertEquals(List.of("0.2.0", "0.3.0", "0.3.1", "0.2.0"), protocolVersions); + assertEquals( + List.of(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize(), "0.6.0", "0.7.1", + AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize()), + protocolVersions); } @Test