Skip to content

Commit

Permalink
Validate protocol version on connector update (#18639)
Browse files Browse the repository at this point in the history
* Add helper class to check protocol version range

* Check ProtocolVersion when modifying a destination definition

* Check ProtocolVersion when modifying a source definition

* Format

* Add UnsupportedProtocolVersion exception

* Rewrite AirbyteProtocolVersionRange as a record

* Format

* Rename exception
  • Loading branch information
gosusnp authored Nov 2, 2022
1 parent 0937b4c commit 35ceb67
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.version;

public record AirbyteProtocolVersionRange(Version min, Version max) {

public boolean isSupported(final Version v) {
final Integer major = getMajor(v);
return getMajor(min) <= major && major <= getMajor(max);
}

private Integer getMajor(final Version v) {
return Integer.valueOf(v.getMajorVersion());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.version;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

import org.junit.jupiter.api.Test;

class AirbyteProtocolVersionRangeTest {

@Test
void checkRanges() {
final AirbyteProtocolVersionRange range = new AirbyteProtocolVersionRange(new Version("1.2.3"), new Version("4.3.2"));
assertTrue(range.isSupported(new Version("2.0.0")));
assertTrue(range.isSupported(new Version("1.2.3")));
assertTrue(range.isSupported(new Version("4.3.2")));

// We should only be requiring major to be within range
assertTrue(range.isSupported(new Version("1.0.0")));
assertTrue(range.isSupported(new Version("4.4.0")));

assertFalse(range.isSupported(new Version("0.2.3")));
assertFalse(range.isSupported(new Version("5.0.0")));
}

@Test
void checkRangeWithOnlyOneMajor() {
final AirbyteProtocolVersionRange range = new AirbyteProtocolVersionRange(new Version("2.0.0"), new Version("2.1.2"));

assertTrue(range.isSupported(new Version("2.0.0")));
assertTrue(range.isSupported(new Version("2.5.0")));

assertFalse(range.isSupported(new Version("1.0.0")));
assertFalse(range.isSupported(new Version("3.0.0")));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server.errors;

import io.airbyte.commons.version.Version;

public class UnsupportedProtocolVersionException extends KnownException {

public UnsupportedProtocolVersionException(final Version current, final Version minSupported, final Version maxSupported) {
this(current.serialize(), minSupported, maxSupported);
}

public UnsupportedProtocolVersionException(final String current, final Version minSupported, final Version maxSupported) {
super(String.format("Airbyte Protocol Version %s is not supported. (Must be within [%s:%s])",
current, minSupported.serialize(), maxSupported.serialize()));
}

@Override
public int getHttpCode() {
return 400;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.util.MoreLists;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
import io.airbyte.commons.version.Version;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
Expand All @@ -34,6 +37,7 @@
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.IdNotFoundKnownException;
import io.airbyte.server.errors.InternalServerKnownException;
import io.airbyte.server.errors.UnsupportedProtocolVersionException;
import io.airbyte.server.scheduler.SynchronousResponse;
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
import io.airbyte.server.services.AirbyteGithubStore;
Expand All @@ -56,6 +60,7 @@ public class DestinationDefinitionsHandler {
private final SynchronousSchedulerClient schedulerSynchronousClient;
private final AirbyteGithubStore githubStore;
private final DestinationHandler destinationHandler;
private final AirbyteProtocolVersionRange protocolVersionRange;

public DestinationDefinitionsHandler(final ConfigRepository configRepository,
final SynchronousSchedulerClient schedulerSynchronousClient,
Expand All @@ -74,6 +79,10 @@ public DestinationDefinitionsHandler(final ConfigRepository configRepository,
this.schedulerSynchronousClient = schedulerSynchronousClient;
this.githubStore = githubStore;
this.destinationHandler = destinationHandler;

// TODO inject protocol min and max once this handler is being converted to micronaut
final Configs configs = new EnvConfigs();
protocolVersionRange = new AirbyteProtocolVersionRange(configs.getAirbyteProtocolVersionMin(), configs.getAirbyteProtocolVersionMax());
}

@VisibleForTesting
Expand Down Expand Up @@ -179,6 +188,10 @@ public DestinationDefinitionRead createPrivateDestinationDefinition(final Destin
final StandardDestinationDefinition destinationDefinition = destinationDefinitionFromCreate(destinationDefCreate)
.withPublic(false)
.withCustom(false);
if (!protocolVersionRange.isSupported(new Version(destinationDefinition.getProtocolVersion()))) {
throw new UnsupportedProtocolVersionException(destinationDefinition.getProtocolVersion(), protocolVersionRange.min(),
protocolVersionRange.max());
}
configRepository.writeStandardDestinationDefinition(destinationDefinition);

return buildDestinationDefinitionRead(destinationDefinition);
Expand All @@ -190,6 +203,10 @@ public DestinationDefinitionRead createCustomDestinationDefinition(final CustomD
customDestinationDefinitionCreate.getDestinationDefinition())
.withPublic(false)
.withCustom(true);
if (!protocolVersionRange.isSupported(new Version(destinationDefinition.getProtocolVersion()))) {
throw new UnsupportedProtocolVersionException(destinationDefinition.getProtocolVersion(), protocolVersionRange.min(),
protocolVersionRange.max());
}
configRepository.writeCustomDestinationDefinition(destinationDefinition, customDestinationDefinitionCreate.getWorkspaceId());

return buildDestinationDefinitionRead(destinationDefinition);
Expand Down Expand Up @@ -235,6 +252,9 @@ public DestinationDefinitionRead updateDestinationDefinition(final DestinationDe
: currentDestination.getResourceRequirements();

final Version airbyteProtocolVersion = AirbyteProtocolVersion.getWithDefault(spec.getProtocolVersion());
if (!protocolVersionRange.isSupported(airbyteProtocolVersion)) {
throw new UnsupportedProtocolVersionException(airbyteProtocolVersion, protocolVersionRange.min(), protocolVersionRange.max());
}

final StandardDestinationDefinition newDestination = new StandardDestinationDefinition()
.withDestinationDefinitionId(currentDestination.getDestinationDefinitionId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.util.MoreLists;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.version.AirbyteProtocolVersionRange;
import io.airbyte.commons.version.Version;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.Configs;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
Expand All @@ -35,6 +38,7 @@
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.IdNotFoundKnownException;
import io.airbyte.server.errors.InternalServerKnownException;
import io.airbyte.server.errors.UnsupportedProtocolVersionException;
import io.airbyte.server.scheduler.SynchronousResponse;
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
import io.airbyte.server.services.AirbyteGithubStore;
Expand All @@ -57,6 +61,7 @@ public class SourceDefinitionsHandler {
private final AirbyteGithubStore githubStore;
private final SynchronousSchedulerClient schedulerSynchronousClient;
private final SourceHandler sourceHandler;
private final AirbyteProtocolVersionRange protocolVersionRange;

public SourceDefinitionsHandler(final ConfigRepository configRepository,
final SynchronousSchedulerClient schedulerSynchronousClient,
Expand All @@ -74,6 +79,10 @@ public SourceDefinitionsHandler(final ConfigRepository configRepository,
this.schedulerSynchronousClient = schedulerSynchronousClient;
this.githubStore = githubStore;
this.sourceHandler = sourceHandler;

// TODO inject protocol min and max once this handler is being converted to micronaut
final Configs configs = new EnvConfigs();
protocolVersionRange = new AirbyteProtocolVersionRange(configs.getAirbyteProtocolVersionMin(), configs.getAirbyteProtocolVersionMax());
}

@VisibleForTesting
Expand Down Expand Up @@ -185,6 +194,9 @@ public SourceDefinitionRead createPrivateSourceDefinition(final SourceDefinition
final StandardSourceDefinition sourceDefinition = sourceDefinitionFromCreate(sourceDefinitionCreate)
.withPublic(false)
.withCustom(false);
if (!protocolVersionRange.isSupported(new Version(sourceDefinition.getProtocolVersion()))) {
throw new UnsupportedProtocolVersionException(sourceDefinition.getProtocolVersion(), protocolVersionRange.min(), protocolVersionRange.max());
}
configRepository.writeStandardSourceDefinition(sourceDefinition);

return buildSourceDefinitionRead(sourceDefinition);
Expand All @@ -195,6 +207,9 @@ public SourceDefinitionRead createCustomSourceDefinition(final CustomSourceDefin
final StandardSourceDefinition sourceDefinition = sourceDefinitionFromCreate(customSourceDefinitionCreate.getSourceDefinition())
.withPublic(false)
.withCustom(true);
if (!protocolVersionRange.isSupported(new Version(sourceDefinition.getProtocolVersion()))) {
throw new UnsupportedProtocolVersionException(sourceDefinition.getProtocolVersion(), protocolVersionRange.min(), protocolVersionRange.max());
}
configRepository.writeCustomSourceDefinition(sourceDefinition, customSourceDefinitionCreate.getWorkspaceId());

return buildSourceDefinitionRead(sourceDefinition);
Expand Down Expand Up @@ -238,6 +253,9 @@ public SourceDefinitionRead updateSourceDefinition(final SourceDefinitionUpdate
: currentSourceDefinition.getResourceRequirements();

final Version airbyteProtocolVersion = AirbyteProtocolVersion.getWithDefault(spec.getProtocolVersion());
if (!protocolVersionRange.isSupported(airbyteProtocolVersion)) {
throw new UnsupportedProtocolVersionException(airbyteProtocolVersion, protocolVersionRange.min(), protocolVersionRange.max());
}

final StandardSourceDefinition newSource = new StandardSourceDefinition()
.withSourceDefinitionId(currentSourceDefinition.getSourceDefinitionId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -38,6 +39,7 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.server.errors.IdNotFoundKnownException;
import io.airbyte.server.errors.UnsupportedProtocolVersionException;
import io.airbyte.server.scheduler.SynchronousJobMetadata;
import io.airbyte.server.scheduler.SynchronousResponse;
import io.airbyte.server.scheduler.SynchronousSchedulerClient;
Expand Down Expand Up @@ -376,6 +378,39 @@ void testCreateDestinationDefinition() throws URISyntaxException, IOException, J
.withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM));
}

@Test
@DisplayName("createDestinationDefinition should not create a destinationDefinition with unsupported protocol version")
void testCreateDestinationDefinitionShouldCheckProtocolVersion() throws URISyntaxException, IOException, JsonValidationException {
final String invalidProtocolVersion = "121.5.6";
final StandardDestinationDefinition destination = generateDestinationDefinition();
destination.getSpec().setProtocolVersion(invalidProtocolVersion);
final String imageName = DockerUtils.getTaggedImageName(destination.getDockerRepository(), destination.getDockerImageTag());

when(uuidSupplier.get()).thenReturn(destination.getDestinationDefinitionId());
when(schedulerSynchronousClient.createGetSpecJob(imageName)).thenReturn(new SynchronousResponse<>(
destination.getSpec(),
SynchronousJobMetadata.mock(ConfigType.GET_SPEC)));

final DestinationDefinitionCreate create = new DestinationDefinitionCreate()
.name(destination.getName())
.dockerRepository(destination.getDockerRepository())
.dockerImageTag(destination.getDockerImageTag())
.documentationUrl(new URI(destination.getDocumentationUrl()))
.icon(destination.getIcon())
.resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements()
._default(new io.airbyte.api.model.generated.ResourceRequirements()
.cpuRequest(destination.getResourceRequirements().getDefault().getCpuRequest()))
.jobSpecific(Collections.emptyList()));

assertThrows(UnsupportedProtocolVersionException.class, () -> destinationDefinitionsHandler.createPrivateDestinationDefinition(create));

verify(schedulerSynchronousClient).createGetSpecJob(imageName);
verify(configRepository, never()).writeStandardDestinationDefinition(destination
.withProtocolVersion(DEFAULT_PROTOCOL_VERSION)
.withReleaseDate(null)
.withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM));
}

@Test
@DisplayName("createCustomDestinationDefinition should correctly create a destinationDefinition")
void testCreateCustomDestinationDefinition() throws URISyntaxException, IOException, JsonValidationException {
Expand Down Expand Up @@ -429,6 +464,46 @@ void testCreateCustomDestinationDefinition() throws URISyntaxException, IOExcept
workspaceId);
}

@Test
@DisplayName("createCustomDestinationDefinition should not create a destinationDefinition with unsupported protocol range")
void testCreateCustomDestinationDefinitionWithInvalidProtocol() throws URISyntaxException, IOException, JsonValidationException {
final String invalidProtocol = "122.1.22";
final StandardDestinationDefinition destination = generateDestinationDefinition();
destination.getSpec().setProtocolVersion(invalidProtocol);
final String imageName = DockerUtils.getTaggedImageName(destination.getDockerRepository(), destination.getDockerImageTag());

when(uuidSupplier.get()).thenReturn(destination.getDestinationDefinitionId());
when(schedulerSynchronousClient.createGetSpecJob(imageName)).thenReturn(new SynchronousResponse<>(
destination.getSpec(),
SynchronousJobMetadata.mock(ConfigType.GET_SPEC)));

final DestinationDefinitionCreate create = new DestinationDefinitionCreate()
.name(destination.getName())
.dockerRepository(destination.getDockerRepository())
.dockerImageTag(destination.getDockerImageTag())
.documentationUrl(new URI(destination.getDocumentationUrl()))
.icon(destination.getIcon())
.resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements()
._default(new io.airbyte.api.model.generated.ResourceRequirements()
.cpuRequest(destination.getResourceRequirements().getDefault().getCpuRequest()))
.jobSpecific(Collections.emptyList()));

final CustomDestinationDefinitionCreate customCreate = new CustomDestinationDefinitionCreate()
.destinationDefinition(create)
.workspaceId(workspaceId);

assertThrows(UnsupportedProtocolVersionException.class, () -> destinationDefinitionsHandler.createCustomDestinationDefinition(customCreate));

verify(schedulerSynchronousClient).createGetSpecJob(imageName);
verify(configRepository, never()).writeCustomDestinationDefinition(
destination
.withProtocolVersion(invalidProtocol)
.withReleaseDate(null)
.withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM)
.withCustom(true),
workspaceId);
}

@Test
@DisplayName("updateDestinationDefinition should correctly update a destinationDefinition")
void testUpdateDestination() throws ConfigNotFoundException, IOException, JsonValidationException {
Expand Down Expand Up @@ -462,6 +537,38 @@ void testUpdateDestination() throws ConfigNotFoundException, IOException, JsonVa
verify(configRepository).writeStandardDestinationDefinition(updatedDestination);
}

@Test
@DisplayName("updateDestinationDefinition should not update a destinationDefinition if protocol version is out of range")
void testOutOfProtocolRangeUpdateDestination() throws ConfigNotFoundException, IOException, JsonValidationException {
when(configRepository.getStandardDestinationDefinition(destinationDefinition.getDestinationDefinitionId())).thenReturn(destinationDefinition);
final DestinationDefinitionRead currentDestination = destinationDefinitionsHandler
.getDestinationDefinition(
new DestinationDefinitionIdRequestBody().destinationDefinitionId(destinationDefinition.getDestinationDefinitionId()));
final String currentTag = currentDestination.getDockerImageTag();
final String newDockerImageTag = "averydifferenttagforprotocolversion";
final String newProtocolVersion = "120.2.4";
assertNotEquals(newDockerImageTag, currentTag);
assertNotEquals(newProtocolVersion, currentDestination.getProtocolVersion());

final String newImageName = DockerUtils.getTaggedImageName(destinationDefinition.getDockerRepository(), newDockerImageTag);
final ConnectorSpecification newSpec = new ConnectorSpecification()
.withConnectionSpecification(Jsons.jsonNode(ImmutableMap.of("foo2", "bar2")))
.withProtocolVersion(newProtocolVersion);
when(schedulerSynchronousClient.createGetSpecJob(newImageName)).thenReturn(new SynchronousResponse<>(
newSpec,
SynchronousJobMetadata.mock(ConfigType.GET_SPEC)));

final StandardDestinationDefinition updatedDestination =
Jsons.clone(destinationDefinition).withDockerImageTag(newDockerImageTag).withSpec(newSpec).withProtocolVersion(newProtocolVersion);

assertThrows(UnsupportedProtocolVersionException.class, () -> destinationDefinitionsHandler.updateDestinationDefinition(
new DestinationDefinitionUpdate().destinationDefinitionId(this.destinationDefinition.getDestinationDefinitionId())
.dockerImageTag(newDockerImageTag)));

verify(schedulerSynchronousClient).createGetSpecJob(newImageName);
verify(configRepository, never()).writeStandardDestinationDefinition(updatedDestination);
}

@Test
@DisplayName("deleteDestinationDefinition should correctly delete a sourceDefinition")
void testDeleteDestinationDefinition() throws ConfigNotFoundException, IOException, JsonValidationException {
Expand Down
Loading

0 comments on commit 35ceb67

Please sign in to comment.