diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/version/AirbyteProtocolVersionRange.java b/airbyte-commons/src/main/java/io/airbyte/commons/version/AirbyteProtocolVersionRange.java new file mode 100644 index 000000000000..8c7571541079 --- /dev/null +++ b/airbyte-commons/src/main/java/io/airbyte/commons/version/AirbyteProtocolVersionRange.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.version; + +public record AirbyteProtocolVersionRange(Version min, Version max) { + + public boolean isSupported(final Version v) { + final Integer major = getMajor(v); + return getMajor(min) <= major && major <= getMajor(max); + } + + private Integer getMajor(final Version v) { + return Integer.valueOf(v.getMajorVersion()); + } + +} diff --git a/airbyte-commons/src/test/java/io/airbyte/commons/version/AirbyteProtocolVersionRangeTest.java b/airbyte-commons/src/test/java/io/airbyte/commons/version/AirbyteProtocolVersionRangeTest.java new file mode 100644 index 000000000000..758760ab170e --- /dev/null +++ b/airbyte-commons/src/test/java/io/airbyte/commons/version/AirbyteProtocolVersionRangeTest.java @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.commons.version; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +class AirbyteProtocolVersionRangeTest { + + @Test + void checkRanges() { + final AirbyteProtocolVersionRange range = new AirbyteProtocolVersionRange(new Version("1.2.3"), new Version("4.3.2")); + assertTrue(range.isSupported(new Version("2.0.0"))); + assertTrue(range.isSupported(new Version("1.2.3"))); + assertTrue(range.isSupported(new Version("4.3.2"))); + + // We should only be requiring major to be within range + assertTrue(range.isSupported(new Version("1.0.0"))); + assertTrue(range.isSupported(new Version("4.4.0"))); + + assertFalse(range.isSupported(new Version("0.2.3"))); + assertFalse(range.isSupported(new Version("5.0.0"))); + } + + @Test + void checkRangeWithOnlyOneMajor() { + final AirbyteProtocolVersionRange range = new AirbyteProtocolVersionRange(new Version("2.0.0"), new Version("2.1.2")); + + assertTrue(range.isSupported(new Version("2.0.0"))); + assertTrue(range.isSupported(new Version("2.5.0"))); + + assertFalse(range.isSupported(new Version("1.0.0"))); + assertFalse(range.isSupported(new Version("3.0.0"))); + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/errors/UnsupportedProtocolVersionException.java b/airbyte-server/src/main/java/io/airbyte/server/errors/UnsupportedProtocolVersionException.java new file mode 100644 index 000000000000..5b83844a54c1 --- /dev/null +++ b/airbyte-server/src/main/java/io/airbyte/server/errors/UnsupportedProtocolVersionException.java @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.server.errors; + +import io.airbyte.commons.version.Version; + +public class UnsupportedProtocolVersionException extends KnownException { + + public UnsupportedProtocolVersionException(final Version current, final Version minSupported, final Version maxSupported) { + this(current.serialize(), minSupported, maxSupported); + } + + public UnsupportedProtocolVersionException(final String current, final Version minSupported, final Version maxSupported) { + super(String.format("Airbyte Protocol Version %s is not supported. (Must be within [%s:%s])", + current, minSupported.serialize(), maxSupported.serialize())); + } + + @Override + public int getHttpCode() { + return 400; + } + +} diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java index 32fab92ad6d7..156b478f0663 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/DestinationDefinitionsHandler.java @@ -24,8 +24,11 @@ import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.util.MoreLists; import io.airbyte.commons.version.AirbyteProtocolVersion; +import io.airbyte.commons.version.AirbyteProtocolVersionRange; import io.airbyte.commons.version.Version; import io.airbyte.config.ActorDefinitionResourceRequirements; +import io.airbyte.config.Configs; +import io.airbyte.config.EnvConfigs; import io.airbyte.config.StandardDestinationDefinition; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; @@ -34,6 +37,7 @@ import io.airbyte.server.converters.SpecFetcher; import io.airbyte.server.errors.IdNotFoundKnownException; import io.airbyte.server.errors.InternalServerKnownException; +import io.airbyte.server.errors.UnsupportedProtocolVersionException; import io.airbyte.server.scheduler.SynchronousResponse; import io.airbyte.server.scheduler.SynchronousSchedulerClient; import io.airbyte.server.services.AirbyteGithubStore; @@ -56,6 +60,7 @@ public class DestinationDefinitionsHandler { private final SynchronousSchedulerClient schedulerSynchronousClient; private final AirbyteGithubStore githubStore; private final DestinationHandler destinationHandler; + private final AirbyteProtocolVersionRange protocolVersionRange; public DestinationDefinitionsHandler(final ConfigRepository configRepository, final SynchronousSchedulerClient schedulerSynchronousClient, @@ -74,6 +79,10 @@ public DestinationDefinitionsHandler(final ConfigRepository configRepository, this.schedulerSynchronousClient = schedulerSynchronousClient; this.githubStore = githubStore; this.destinationHandler = destinationHandler; + + // TODO inject protocol min and max once this handler is being converted to micronaut + final Configs configs = new EnvConfigs(); + protocolVersionRange = new AirbyteProtocolVersionRange(configs.getAirbyteProtocolVersionMin(), configs.getAirbyteProtocolVersionMax()); } @VisibleForTesting @@ -179,6 +188,10 @@ public DestinationDefinitionRead createPrivateDestinationDefinition(final Destin final StandardDestinationDefinition destinationDefinition = destinationDefinitionFromCreate(destinationDefCreate) .withPublic(false) .withCustom(false); + if (!protocolVersionRange.isSupported(new Version(destinationDefinition.getProtocolVersion()))) { + throw new UnsupportedProtocolVersionException(destinationDefinition.getProtocolVersion(), protocolVersionRange.min(), + protocolVersionRange.max()); + } configRepository.writeStandardDestinationDefinition(destinationDefinition); return buildDestinationDefinitionRead(destinationDefinition); @@ -190,6 +203,10 @@ public DestinationDefinitionRead createCustomDestinationDefinition(final CustomD customDestinationDefinitionCreate.getDestinationDefinition()) .withPublic(false) .withCustom(true); + if (!protocolVersionRange.isSupported(new Version(destinationDefinition.getProtocolVersion()))) { + throw new UnsupportedProtocolVersionException(destinationDefinition.getProtocolVersion(), protocolVersionRange.min(), + protocolVersionRange.max()); + } configRepository.writeCustomDestinationDefinition(destinationDefinition, customDestinationDefinitionCreate.getWorkspaceId()); return buildDestinationDefinitionRead(destinationDefinition); @@ -235,6 +252,9 @@ public DestinationDefinitionRead updateDestinationDefinition(final DestinationDe : currentDestination.getResourceRequirements(); final Version airbyteProtocolVersion = AirbyteProtocolVersion.getWithDefault(spec.getProtocolVersion()); + if (!protocolVersionRange.isSupported(airbyteProtocolVersion)) { + throw new UnsupportedProtocolVersionException(airbyteProtocolVersion, protocolVersionRange.min(), protocolVersionRange.max()); + } final StandardDestinationDefinition newDestination = new StandardDestinationDefinition() .withDestinationDefinitionId(currentDestination.getDestinationDefinitionId()) diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceDefinitionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceDefinitionsHandler.java index 2b092bf52542..b508be1ed641 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceDefinitionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SourceDefinitionsHandler.java @@ -25,8 +25,11 @@ import io.airbyte.commons.resources.MoreResources; import io.airbyte.commons.util.MoreLists; import io.airbyte.commons.version.AirbyteProtocolVersion; +import io.airbyte.commons.version.AirbyteProtocolVersionRange; import io.airbyte.commons.version.Version; import io.airbyte.config.ActorDefinitionResourceRequirements; +import io.airbyte.config.Configs; +import io.airbyte.config.EnvConfigs; import io.airbyte.config.StandardSourceDefinition; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; @@ -35,6 +38,7 @@ import io.airbyte.server.converters.SpecFetcher; import io.airbyte.server.errors.IdNotFoundKnownException; import io.airbyte.server.errors.InternalServerKnownException; +import io.airbyte.server.errors.UnsupportedProtocolVersionException; import io.airbyte.server.scheduler.SynchronousResponse; import io.airbyte.server.scheduler.SynchronousSchedulerClient; import io.airbyte.server.services.AirbyteGithubStore; @@ -57,6 +61,7 @@ public class SourceDefinitionsHandler { private final AirbyteGithubStore githubStore; private final SynchronousSchedulerClient schedulerSynchronousClient; private final SourceHandler sourceHandler; + private final AirbyteProtocolVersionRange protocolVersionRange; public SourceDefinitionsHandler(final ConfigRepository configRepository, final SynchronousSchedulerClient schedulerSynchronousClient, @@ -74,6 +79,10 @@ public SourceDefinitionsHandler(final ConfigRepository configRepository, this.schedulerSynchronousClient = schedulerSynchronousClient; this.githubStore = githubStore; this.sourceHandler = sourceHandler; + + // TODO inject protocol min and max once this handler is being converted to micronaut + final Configs configs = new EnvConfigs(); + protocolVersionRange = new AirbyteProtocolVersionRange(configs.getAirbyteProtocolVersionMin(), configs.getAirbyteProtocolVersionMax()); } @VisibleForTesting @@ -185,6 +194,9 @@ public SourceDefinitionRead createPrivateSourceDefinition(final SourceDefinition final StandardSourceDefinition sourceDefinition = sourceDefinitionFromCreate(sourceDefinitionCreate) .withPublic(false) .withCustom(false); + if (!protocolVersionRange.isSupported(new Version(sourceDefinition.getProtocolVersion()))) { + throw new UnsupportedProtocolVersionException(sourceDefinition.getProtocolVersion(), protocolVersionRange.min(), protocolVersionRange.max()); + } configRepository.writeStandardSourceDefinition(sourceDefinition); return buildSourceDefinitionRead(sourceDefinition); @@ -195,6 +207,9 @@ public SourceDefinitionRead createCustomSourceDefinition(final CustomSourceDefin final StandardSourceDefinition sourceDefinition = sourceDefinitionFromCreate(customSourceDefinitionCreate.getSourceDefinition()) .withPublic(false) .withCustom(true); + if (!protocolVersionRange.isSupported(new Version(sourceDefinition.getProtocolVersion()))) { + throw new UnsupportedProtocolVersionException(sourceDefinition.getProtocolVersion(), protocolVersionRange.min(), protocolVersionRange.max()); + } configRepository.writeCustomSourceDefinition(sourceDefinition, customSourceDefinitionCreate.getWorkspaceId()); return buildSourceDefinitionRead(sourceDefinition); @@ -238,6 +253,9 @@ public SourceDefinitionRead updateSourceDefinition(final SourceDefinitionUpdate : currentSourceDefinition.getResourceRequirements(); final Version airbyteProtocolVersion = AirbyteProtocolVersion.getWithDefault(spec.getProtocolVersion()); + if (!protocolVersionRange.isSupported(airbyteProtocolVersion)) { + throw new UnsupportedProtocolVersionException(airbyteProtocolVersion, protocolVersionRange.min(), protocolVersionRange.max()); + } final StandardSourceDefinition newSource = new StandardSourceDefinition() .withSourceDefinitionId(currentSourceDefinition.getSourceDefinitionId()) diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationDefinitionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationDefinitionsHandlerTest.java index 2432cd6f5973..afbb40d1f8c5 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationDefinitionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/DestinationDefinitionsHandlerTest.java @@ -9,6 +9,7 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -38,6 +39,7 @@ import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.server.errors.IdNotFoundKnownException; +import io.airbyte.server.errors.UnsupportedProtocolVersionException; import io.airbyte.server.scheduler.SynchronousJobMetadata; import io.airbyte.server.scheduler.SynchronousResponse; import io.airbyte.server.scheduler.SynchronousSchedulerClient; @@ -376,6 +378,39 @@ void testCreateDestinationDefinition() throws URISyntaxException, IOException, J .withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM)); } + @Test + @DisplayName("createDestinationDefinition should not create a destinationDefinition with unsupported protocol version") + void testCreateDestinationDefinitionShouldCheckProtocolVersion() throws URISyntaxException, IOException, JsonValidationException { + final String invalidProtocolVersion = "121.5.6"; + final StandardDestinationDefinition destination = generateDestinationDefinition(); + destination.getSpec().setProtocolVersion(invalidProtocolVersion); + final String imageName = DockerUtils.getTaggedImageName(destination.getDockerRepository(), destination.getDockerImageTag()); + + when(uuidSupplier.get()).thenReturn(destination.getDestinationDefinitionId()); + when(schedulerSynchronousClient.createGetSpecJob(imageName)).thenReturn(new SynchronousResponse<>( + destination.getSpec(), + SynchronousJobMetadata.mock(ConfigType.GET_SPEC))); + + final DestinationDefinitionCreate create = new DestinationDefinitionCreate() + .name(destination.getName()) + .dockerRepository(destination.getDockerRepository()) + .dockerImageTag(destination.getDockerImageTag()) + .documentationUrl(new URI(destination.getDocumentationUrl())) + .icon(destination.getIcon()) + .resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements() + ._default(new io.airbyte.api.model.generated.ResourceRequirements() + .cpuRequest(destination.getResourceRequirements().getDefault().getCpuRequest())) + .jobSpecific(Collections.emptyList())); + + assertThrows(UnsupportedProtocolVersionException.class, () -> destinationDefinitionsHandler.createPrivateDestinationDefinition(create)); + + verify(schedulerSynchronousClient).createGetSpecJob(imageName); + verify(configRepository, never()).writeStandardDestinationDefinition(destination + .withProtocolVersion(DEFAULT_PROTOCOL_VERSION) + .withReleaseDate(null) + .withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM)); + } + @Test @DisplayName("createCustomDestinationDefinition should correctly create a destinationDefinition") void testCreateCustomDestinationDefinition() throws URISyntaxException, IOException, JsonValidationException { @@ -429,6 +464,46 @@ void testCreateCustomDestinationDefinition() throws URISyntaxException, IOExcept workspaceId); } + @Test + @DisplayName("createCustomDestinationDefinition should not create a destinationDefinition with unsupported protocol range") + void testCreateCustomDestinationDefinitionWithInvalidProtocol() throws URISyntaxException, IOException, JsonValidationException { + final String invalidProtocol = "122.1.22"; + final StandardDestinationDefinition destination = generateDestinationDefinition(); + destination.getSpec().setProtocolVersion(invalidProtocol); + final String imageName = DockerUtils.getTaggedImageName(destination.getDockerRepository(), destination.getDockerImageTag()); + + when(uuidSupplier.get()).thenReturn(destination.getDestinationDefinitionId()); + when(schedulerSynchronousClient.createGetSpecJob(imageName)).thenReturn(new SynchronousResponse<>( + destination.getSpec(), + SynchronousJobMetadata.mock(ConfigType.GET_SPEC))); + + final DestinationDefinitionCreate create = new DestinationDefinitionCreate() + .name(destination.getName()) + .dockerRepository(destination.getDockerRepository()) + .dockerImageTag(destination.getDockerImageTag()) + .documentationUrl(new URI(destination.getDocumentationUrl())) + .icon(destination.getIcon()) + .resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements() + ._default(new io.airbyte.api.model.generated.ResourceRequirements() + .cpuRequest(destination.getResourceRequirements().getDefault().getCpuRequest())) + .jobSpecific(Collections.emptyList())); + + final CustomDestinationDefinitionCreate customCreate = new CustomDestinationDefinitionCreate() + .destinationDefinition(create) + .workspaceId(workspaceId); + + assertThrows(UnsupportedProtocolVersionException.class, () -> destinationDefinitionsHandler.createCustomDestinationDefinition(customCreate)); + + verify(schedulerSynchronousClient).createGetSpecJob(imageName); + verify(configRepository, never()).writeCustomDestinationDefinition( + destination + .withProtocolVersion(invalidProtocol) + .withReleaseDate(null) + .withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM) + .withCustom(true), + workspaceId); + } + @Test @DisplayName("updateDestinationDefinition should correctly update a destinationDefinition") void testUpdateDestination() throws ConfigNotFoundException, IOException, JsonValidationException { @@ -462,6 +537,38 @@ void testUpdateDestination() throws ConfigNotFoundException, IOException, JsonVa verify(configRepository).writeStandardDestinationDefinition(updatedDestination); } + @Test + @DisplayName("updateDestinationDefinition should not update a destinationDefinition if protocol version is out of range") + void testOutOfProtocolRangeUpdateDestination() throws ConfigNotFoundException, IOException, JsonValidationException { + when(configRepository.getStandardDestinationDefinition(destinationDefinition.getDestinationDefinitionId())).thenReturn(destinationDefinition); + final DestinationDefinitionRead currentDestination = destinationDefinitionsHandler + .getDestinationDefinition( + new DestinationDefinitionIdRequestBody().destinationDefinitionId(destinationDefinition.getDestinationDefinitionId())); + final String currentTag = currentDestination.getDockerImageTag(); + final String newDockerImageTag = "averydifferenttagforprotocolversion"; + final String newProtocolVersion = "120.2.4"; + assertNotEquals(newDockerImageTag, currentTag); + assertNotEquals(newProtocolVersion, currentDestination.getProtocolVersion()); + + final String newImageName = DockerUtils.getTaggedImageName(destinationDefinition.getDockerRepository(), newDockerImageTag); + final ConnectorSpecification newSpec = new ConnectorSpecification() + .withConnectionSpecification(Jsons.jsonNode(ImmutableMap.of("foo2", "bar2"))) + .withProtocolVersion(newProtocolVersion); + when(schedulerSynchronousClient.createGetSpecJob(newImageName)).thenReturn(new SynchronousResponse<>( + newSpec, + SynchronousJobMetadata.mock(ConfigType.GET_SPEC))); + + final StandardDestinationDefinition updatedDestination = + Jsons.clone(destinationDefinition).withDockerImageTag(newDockerImageTag).withSpec(newSpec).withProtocolVersion(newProtocolVersion); + + assertThrows(UnsupportedProtocolVersionException.class, () -> destinationDefinitionsHandler.updateDestinationDefinition( + new DestinationDefinitionUpdate().destinationDefinitionId(this.destinationDefinition.getDestinationDefinitionId()) + .dockerImageTag(newDockerImageTag))); + + verify(schedulerSynchronousClient).createGetSpecJob(newImageName); + verify(configRepository, never()).writeStandardDestinationDefinition(updatedDestination); + } + @Test @DisplayName("deleteDestinationDefinition should correctly delete a sourceDefinition") void testDeleteDestinationDefinition() throws ConfigNotFoundException, IOException, JsonValidationException { diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceDefinitionsHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceDefinitionsHandlerTest.java index 1f4167817c92..07acc870461a 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceDefinitionsHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SourceDefinitionsHandlerTest.java @@ -10,6 +10,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -39,6 +40,7 @@ import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.server.errors.IdNotFoundKnownException; +import io.airbyte.server.errors.UnsupportedProtocolVersionException; import io.airbyte.server.scheduler.SynchronousJobMetadata; import io.airbyte.server.scheduler.SynchronousResponse; import io.airbyte.server.scheduler.SynchronousSchedulerClient; @@ -365,6 +367,41 @@ void testCreateSourceDefinition() throws URISyntaxException, IOException, JsonVa .withProtocolVersion(DEFAULT_PROTOCOL_VERSION)); } + @Test + @DisplayName("createSourceDefinition should not create a sourceDefinition with an unsupported protocol version") + void testCreateSourceDefinitionWithInvalidProtocol() throws URISyntaxException, IOException, JsonValidationException { + final String invalidProtocol = "131.1.2"; + final StandardSourceDefinition sourceDefinition = generateSourceDefinition(); + sourceDefinition.getSpec().setProtocolVersion(invalidProtocol); + final String imageName = DockerUtils.getTaggedImageName(sourceDefinition.getDockerRepository(), sourceDefinition.getDockerImageTag()); + + when(uuidSupplier.get()).thenReturn(sourceDefinition.getSourceDefinitionId()); + when(schedulerSynchronousClient.createGetSpecJob(imageName)).thenReturn(new SynchronousResponse<>( + sourceDefinition.getSpec(), + SynchronousJobMetadata.mock(ConfigType.GET_SPEC))); + + final SourceDefinitionCreate create = new SourceDefinitionCreate() + .name(sourceDefinition.getName()) + .dockerRepository(sourceDefinition.getDockerRepository()) + .dockerImageTag(sourceDefinition.getDockerImageTag()) + .documentationUrl(new URI(sourceDefinition.getDocumentationUrl())) + .icon(sourceDefinition.getIcon()) + .resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements() + ._default(new io.airbyte.api.model.generated.ResourceRequirements() + .cpuRequest(sourceDefinition.getResourceRequirements().getDefault().getCpuRequest())) + .jobSpecific(Collections.emptyList())); + + assertThrows(UnsupportedProtocolVersionException.class, () -> sourceDefinitionsHandler.createPrivateSourceDefinition(create)); + + verify(schedulerSynchronousClient).createGetSpecJob(imageName); + verify(configRepository, never()) + .writeStandardSourceDefinition( + sourceDefinition + .withReleaseDate(null) + .withReleaseStage(StandardSourceDefinition.ReleaseStage.CUSTOM) + .withProtocolVersion(DEFAULT_PROTOCOL_VERSION)); + } + @Test @DisplayName("createCustomSourceDefinition should correctly create a sourceDefinition") void testCreateCustomSourceDefinition() throws URISyntaxException, IOException, JsonValidationException { @@ -418,6 +455,46 @@ void testCreateCustomSourceDefinition() throws URISyntaxException, IOException, workspaceId); } + @Test + @DisplayName("createCustomSourceDefinition should not create a sourceDefinition with unspported protocol version") + void testCreateCustomSourceDefinitionWithInvalidProtocol() throws URISyntaxException, IOException, JsonValidationException { + final String invalidVersion = "130.0.0"; + final StandardSourceDefinition sourceDefinition = generateSourceDefinition(); + sourceDefinition.getSpec().setProtocolVersion(invalidVersion); + final String imageName = DockerUtils.getTaggedImageName(sourceDefinition.getDockerRepository(), sourceDefinition.getDockerImageTag()); + + when(uuidSupplier.get()).thenReturn(sourceDefinition.getSourceDefinitionId()); + when(schedulerSynchronousClient.createGetSpecJob(imageName)).thenReturn(new SynchronousResponse<>( + sourceDefinition.getSpec(), + SynchronousJobMetadata.mock(ConfigType.GET_SPEC))); + + final SourceDefinitionCreate create = new SourceDefinitionCreate() + .name(sourceDefinition.getName()) + .dockerRepository(sourceDefinition.getDockerRepository()) + .dockerImageTag(sourceDefinition.getDockerImageTag()) + .documentationUrl(new URI(sourceDefinition.getDocumentationUrl())) + .icon(sourceDefinition.getIcon()) + .resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements() + ._default(new io.airbyte.api.model.generated.ResourceRequirements() + .cpuRequest(sourceDefinition.getResourceRequirements().getDefault().getCpuRequest())) + .jobSpecific(Collections.emptyList())); + + final CustomSourceDefinitionCreate customCreate = new CustomSourceDefinitionCreate() + .sourceDefinition(create) + .workspaceId(workspaceId); + + assertThrows(UnsupportedProtocolVersionException.class, () -> sourceDefinitionsHandler.createCustomSourceDefinition(customCreate)); + + verify(schedulerSynchronousClient).createGetSpecJob(imageName); + verify(configRepository, never()).writeCustomSourceDefinition( + sourceDefinition + .withReleaseDate(null) + .withReleaseStage(StandardSourceDefinition.ReleaseStage.CUSTOM) + .withProtocolVersion(invalidVersion) + .withCustom(true), + workspaceId); + } + @Test @DisplayName("updateSourceDefinition should correctly update a sourceDefinition") void testUpdateSourceDefinition() throws ConfigNotFoundException, IOException, JsonValidationException, URISyntaxException { @@ -449,6 +526,36 @@ void testUpdateSourceDefinition() throws ConfigNotFoundException, IOException, J verify(configRepository).writeStandardSourceDefinition(updatedSource); } + @Test + @DisplayName("updateSourceDefinition should not update a sourceDefinition with an invalid protocol version") + void testUpdateSourceDefinitionWithInvalidProtocol() throws ConfigNotFoundException, IOException, JsonValidationException, URISyntaxException { + when(configRepository.getStandardSourceDefinition(sourceDefinition.getSourceDefinitionId())).thenReturn(sourceDefinition); + final String newDockerImageTag = "averydifferenttag"; + final String newProtocolVersion = "132.2.1"; + final SourceDefinitionRead sourceDefinition = sourceDefinitionsHandler + .getSourceDefinition(new SourceDefinitionIdRequestBody().sourceDefinitionId(this.sourceDefinition.getSourceDefinitionId())); + final String currentTag = sourceDefinition.getDockerImageTag(); + assertNotEquals(newDockerImageTag, currentTag); + + final String newImageName = DockerUtils.getTaggedImageName(this.sourceDefinition.getDockerRepository(), newDockerImageTag); + final ConnectorSpecification newSpec = new ConnectorSpecification() + .withConnectionSpecification(Jsons.jsonNode(ImmutableMap.of("foo2", "bar2"))) + .withProtocolVersion(newProtocolVersion); + when(schedulerSynchronousClient.createGetSpecJob(newImageName)).thenReturn(new SynchronousResponse<>( + newSpec, + SynchronousJobMetadata.mock(ConfigType.GET_SPEC))); + + final StandardSourceDefinition updatedSource = Jsons.clone(this.sourceDefinition) + .withDockerImageTag(newDockerImageTag).withSpec(newSpec).withProtocolVersion(newProtocolVersion); + + assertThrows(UnsupportedProtocolVersionException.class, () -> sourceDefinitionsHandler + .updateSourceDefinition( + new SourceDefinitionUpdate().sourceDefinitionId(this.sourceDefinition.getSourceDefinitionId()).dockerImageTag(newDockerImageTag))); + + verify(schedulerSynchronousClient).createGetSpecJob(newImageName); + verify(configRepository, never()).writeStandardSourceDefinition(updatedSource); + } + @Test @DisplayName("deleteSourceDefinition should correctly delete a sourceDefinition") void testDeleteSourceDefinition() throws ConfigNotFoundException, IOException, JsonValidationException {