Skip to content

Commit

Permalink
Store protocol version from spec (#16416)
Browse files Browse the repository at this point in the history
* Update protocol version from actor defs API operations

* Implement default airbyte protocol version support

* Add version parsing

* Add acceptance tests

* Fix Acceptance Tests

* format

* Make test package private
  • Loading branch information
gosusnp authored Sep 9, 2022
1 parent 80397ad commit 9ad847b
Show file tree
Hide file tree
Showing 8 changed files with 234 additions and 7 deletions.
6 changes: 6 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2509,6 +2509,9 @@ components:
format: uri
icon:
type: string
protocolVersion:
description: The Airbyte Protocol version supported by the connector
type: string
releaseStage:
$ref: "#/components/schemas/ReleaseStage"
releaseDate:
Expand Down Expand Up @@ -2869,6 +2872,9 @@ components:
format: uri
icon:
type: string
protocolVersion:
description: The Airbyte Protocol version supported by the connector
type: string
releaseStage:
$ref: "#/components/schemas/ReleaseStage"
releaseDate:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.commons.version;

public class AirbyteProtocolVersion {

public final static AirbyteVersion DEFAULT_AIRBYTE_PROTOCOL_VERSION = new AirbyteVersion("0.2.0");

public static AirbyteVersion getWithDefault(final String version) {
if (version == null || version.isEmpty() || version.isBlank()) {
return DEFAULT_AIRBYTE_PROTOCOL_VERSION;
} else {
return new AirbyteVersion(version);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.util.MoreLists;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
Expand Down Expand Up @@ -84,6 +86,7 @@ static DestinationDefinitionRead buildDestinationDefinitionRead(final StandardDe
.dockerImageTag(standardDestinationDefinition.getDockerImageTag())
.documentationUrl(new URI(standardDestinationDefinition.getDocumentationUrl()))
.icon(loadIcon(standardDestinationDefinition.getIcon()))
.protocolVersion(standardDestinationDefinition.getProtocolVersion())
.releaseStage(getReleaseStage(standardDestinationDefinition))
.releaseDate(getReleaseDate(standardDestinationDefinition))
.resourceRequirements(ApiPojoConverters.actorDefResourceReqsToApi(standardDestinationDefinition.getResourceRequirements()));
Expand Down Expand Up @@ -197,6 +200,8 @@ private StandardDestinationDefinition destinationDefinitionFromCreate(final Dest
destinationDefCreate.getDockerRepository(),
destinationDefCreate.getDockerImageTag());

final AirbyteVersion airbyteProtocolVersion = AirbyteProtocolVersion.getWithDefault(spec.getProtocolVersion());

final UUID id = uuidSupplier.get();
final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition()
.withDestinationDefinitionId(id)
Expand All @@ -206,6 +211,7 @@ private StandardDestinationDefinition destinationDefinitionFromCreate(final Dest
.withName(destinationDefCreate.getName())
.withIcon(destinationDefCreate.getIcon())
.withSpec(spec)
.withProtocolVersion(airbyteProtocolVersion.serialize())
.withTombstone(false)
.withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM)
.withResourceRequirements(ApiPojoConverters.actorDefResourceReqsToInternal(destinationDefCreate.getResourceRequirements()));
Expand All @@ -228,6 +234,8 @@ public DestinationDefinitionRead updateDestinationDefinition(final DestinationDe
? ApiPojoConverters.actorDefResourceReqsToInternal(destinationDefinitionUpdate.getResourceRequirements())
: currentDestination.getResourceRequirements();

final AirbyteVersion airbyteProtocolVersion = AirbyteProtocolVersion.getWithDefault(spec.getProtocolVersion());

final StandardDestinationDefinition newDestination = new StandardDestinationDefinition()
.withDestinationDefinitionId(currentDestination.getDestinationDefinitionId())
.withDockerImageTag(destinationDefinitionUpdate.getDockerImageTag())
Expand All @@ -236,6 +244,7 @@ public DestinationDefinitionRead updateDestinationDefinition(final DestinationDe
.withDocumentationUrl(currentDestination.getDocumentationUrl())
.withIcon(currentDestination.getIcon())
.withSpec(spec)
.withProtocolVersion(airbyteProtocolVersion.serialize())
.withTombstone(currentDestination.getTombstone())
.withPublic(currentDestination.getPublic())
.withCustom(currentDestination.getCustom())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.util.MoreLists;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.persistence.ConfigNotFoundException;
Expand Down Expand Up @@ -85,6 +87,7 @@ static SourceDefinitionRead buildSourceDefinitionRead(final StandardSourceDefini
.dockerImageTag(standardSourceDefinition.getDockerImageTag())
.documentationUrl(new URI(standardSourceDefinition.getDocumentationUrl()))
.icon(loadIcon(standardSourceDefinition.getIcon()))
.protocolVersion(standardSourceDefinition.getProtocolVersion())
.releaseStage(getReleaseStage(standardSourceDefinition))
.releaseDate(getReleaseDate(standardSourceDefinition))
.resourceRequirements(ApiPojoConverters.actorDefResourceReqsToApi(standardSourceDefinition.getResourceRequirements()));
Expand Down Expand Up @@ -201,6 +204,8 @@ private StandardSourceDefinition sourceDefinitionFromCreate(final SourceDefiniti
throws IOException {
final ConnectorSpecification spec = getSpecForImage(sourceDefinitionCreate.getDockerRepository(), sourceDefinitionCreate.getDockerImageTag());

final AirbyteVersion airbyteProtocolVersion = AirbyteProtocolVersion.getWithDefault(spec.getProtocolVersion());

final UUID id = uuidSupplier.get();
return new StandardSourceDefinition()
.withSourceDefinitionId(id)
Expand All @@ -210,6 +215,7 @@ private StandardSourceDefinition sourceDefinitionFromCreate(final SourceDefiniti
.withName(sourceDefinitionCreate.getName())
.withIcon(sourceDefinitionCreate.getIcon())
.withSpec(spec)
.withProtocolVersion(airbyteProtocolVersion.serialize())
.withTombstone(false)
.withReleaseStage(StandardSourceDefinition.ReleaseStage.CUSTOM)
.withResourceRequirements(ApiPojoConverters.actorDefResourceReqsToInternal(sourceDefinitionCreate.getResourceRequirements()));
Expand All @@ -231,6 +237,8 @@ public SourceDefinitionRead updateSourceDefinition(final SourceDefinitionUpdate
? ApiPojoConverters.actorDefResourceReqsToInternal(sourceDefinitionUpdate.getResourceRequirements())
: currentSourceDefinition.getResourceRequirements();

final AirbyteVersion airbyteProtocolVersion = AirbyteProtocolVersion.getWithDefault(spec.getProtocolVersion());

final StandardSourceDefinition newSource = new StandardSourceDefinition()
.withSourceDefinitionId(currentSourceDefinition.getSourceDefinitionId())
.withDockerImageTag(sourceDefinitionUpdate.getDockerImageTag())
Expand All @@ -239,6 +247,7 @@ public SourceDefinitionRead updateSourceDefinition(final SourceDefinitionUpdate
.withName(currentSourceDefinition.getName())
.withIcon(currentSourceDefinition.getIcon())
.withSpec(spec)
.withProtocolVersion(airbyteProtocolVersion.serialize())
.withTombstone(currentSourceDefinition.getTombstone())
.withPublic(currentSourceDefinition.getPublic())
.withCustom(currentSourceDefinition.getCustom())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
class DestinationDefinitionsHandlerTest {

private static final String TODAY_DATE_STRING = LocalDate.now().toString();
private static final String DEFAULT_PROTOCOL_VERSION = "0.2.0";

private ConfigRepository configRepository;
private StandardDestinationDefinition destinationDefinition;
Expand Down Expand Up @@ -100,6 +101,7 @@ private StandardDestinationDefinition generateDestinationDefinition() {
.withDocumentationUrl("https://hulu.com")
.withIcon("http.svg")
.withSpec(spec)
.withProtocolVersion("0.2.2")
.withTombstone(false)
.withReleaseStage(StandardDestinationDefinition.ReleaseStage.ALPHA)
.withReleaseDate(TODAY_DATE_STRING)
Expand All @@ -120,6 +122,7 @@ void testListDestinations() throws JsonValidationException, IOException, URISynt
.dockerImageTag(destinationDefinition.getDockerImageTag())
.documentationUrl(new URI(destinationDefinition.getDocumentationUrl()))
.icon(DestinationDefinitionsHandler.loadIcon(destinationDefinition.getIcon()))
.protocolVersion(destinationDefinition.getProtocolVersion())
.releaseStage(ReleaseStage.fromValue(destinationDefinition.getReleaseStage().value()))
.releaseDate(LocalDate.parse(destinationDefinition.getReleaseDate()))
.resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements()
Expand All @@ -134,6 +137,7 @@ void testListDestinations() throws JsonValidationException, IOException, URISynt
.dockerImageTag(destination2.getDockerImageTag())
.documentationUrl(new URI(destination2.getDocumentationUrl()))
.icon(DestinationDefinitionsHandler.loadIcon(destination2.getIcon()))
.protocolVersion(destinationDefinition.getProtocolVersion())
.releaseStage(ReleaseStage.fromValue(destinationDefinition.getReleaseStage().value()))
.releaseDate(LocalDate.parse(destinationDefinition.getReleaseDate()))
.resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements()
Expand Down Expand Up @@ -163,6 +167,7 @@ void testListDestinationDefinitionsForWorkspace() throws IOException, URISyntaxE
.dockerImageTag(destinationDefinition.getDockerImageTag())
.documentationUrl(new URI(destinationDefinition.getDocumentationUrl()))
.icon(DestinationDefinitionsHandler.loadIcon(destinationDefinition.getIcon()))
.protocolVersion(destinationDefinition.getProtocolVersion())
.releaseStage(ReleaseStage.fromValue(destinationDefinition.getReleaseStage().value()))
.releaseDate(LocalDate.parse(destinationDefinition.getReleaseDate()))
.resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements()
Expand All @@ -177,6 +182,7 @@ void testListDestinationDefinitionsForWorkspace() throws IOException, URISyntaxE
.dockerImageTag(destination2.getDockerImageTag())
.documentationUrl(new URI(destination2.getDocumentationUrl()))
.icon(DestinationDefinitionsHandler.loadIcon(destination2.getIcon()))
.protocolVersion(destinationDefinition.getProtocolVersion())
.releaseStage(ReleaseStage.fromValue(destinationDefinition.getReleaseStage().value()))
.releaseDate(LocalDate.parse(destinationDefinition.getReleaseDate()))
.resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements()
Expand Down Expand Up @@ -209,6 +215,7 @@ void testListPrivateDestinationDefinitions() throws IOException, URISyntaxExcept
.dockerImageTag(destinationDefinition.getDockerImageTag())
.documentationUrl(new URI(destinationDefinition.getDocumentationUrl()))
.icon(DestinationDefinitionsHandler.loadIcon(destinationDefinition.getIcon()))
.protocolVersion(destinationDefinition.getProtocolVersion())
.releaseStage(ReleaseStage.fromValue(destinationDefinition.getReleaseStage().value()))
.releaseDate(LocalDate.parse(destinationDefinition.getReleaseDate()))
.resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements()
Expand All @@ -223,6 +230,7 @@ void testListPrivateDestinationDefinitions() throws IOException, URISyntaxExcept
.dockerImageTag(destinationDefinition.getDockerImageTag())
.documentationUrl(new URI(destinationDefinition.getDocumentationUrl()))
.icon(DestinationDefinitionsHandler.loadIcon(destinationDefinition.getIcon()))
.protocolVersion(destinationDefinition.getProtocolVersion())
.releaseStage(ReleaseStage.fromValue(destinationDefinition.getReleaseStage().value()))
.releaseDate(LocalDate.parse(destinationDefinition.getReleaseDate()))
.resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements()
Expand Down Expand Up @@ -258,6 +266,7 @@ void testGetDestination() throws JsonValidationException, ConfigNotFoundExceptio
.dockerImageTag(destinationDefinition.getDockerImageTag())
.documentationUrl(new URI(destinationDefinition.getDocumentationUrl()))
.icon(DestinationDefinitionsHandler.loadIcon(destinationDefinition.getIcon()))
.protocolVersion(destinationDefinition.getProtocolVersion())
.releaseStage(ReleaseStage.fromValue(destinationDefinition.getReleaseStage().value()))
.releaseDate(LocalDate.parse(destinationDefinition.getReleaseDate()))
.resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements()
Expand Down Expand Up @@ -303,6 +312,7 @@ void testGetDefinitionWithGrantForWorkspace() throws JsonValidationException, Co
.dockerImageTag(destinationDefinition.getDockerImageTag())
.documentationUrl(new URI(destinationDefinition.getDocumentationUrl()))
.icon(DestinationDefinitionsHandler.loadIcon(destinationDefinition.getIcon()))
.protocolVersion(destinationDefinition.getProtocolVersion())
.releaseStage(ReleaseStage.fromValue(destinationDefinition.getReleaseStage().value()))
.releaseDate(LocalDate.parse(destinationDefinition.getReleaseDate()))
.resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements()
Expand Down Expand Up @@ -349,6 +359,7 @@ void testCreateDestinationDefinition() throws URISyntaxException, IOException, J
.documentationUrl(new URI(destination.getDocumentationUrl()))
.destinationDefinitionId(destination.getDestinationDefinitionId())
.icon(DestinationDefinitionsHandler.loadIcon(destination.getIcon()))
.protocolVersion(DEFAULT_PROTOCOL_VERSION)
.releaseStage(ReleaseStage.CUSTOM)
.resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements()
._default(new io.airbyte.api.model.generated.ResourceRequirements()
Expand All @@ -360,6 +371,7 @@ void testCreateDestinationDefinition() throws URISyntaxException, IOException, J
assertEquals(expectedRead, actualRead);
verify(schedulerSynchronousClient).createGetSpecJob(imageName);
verify(configRepository).writeStandardDestinationDefinition(destination
.withProtocolVersion(DEFAULT_PROTOCOL_VERSION)
.withReleaseDate(null)
.withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM));
}
Expand Down Expand Up @@ -397,6 +409,7 @@ void testCreateCustomDestinationDefinition() throws URISyntaxException, IOExcept
.documentationUrl(new URI(destination.getDocumentationUrl()))
.destinationDefinitionId(destination.getDestinationDefinitionId())
.icon(DestinationDefinitionsHandler.loadIcon(destination.getIcon()))
.protocolVersion(DEFAULT_PROTOCOL_VERSION)
.releaseStage(ReleaseStage.CUSTOM)
.resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements()
._default(new io.airbyte.api.model.generated.ResourceRequirements()
Expand All @@ -409,6 +422,7 @@ void testCreateCustomDestinationDefinition() throws URISyntaxException, IOExcept
verify(schedulerSynchronousClient).createGetSpecJob(imageName);
verify(configRepository).writeCustomDestinationDefinition(
destination
.withProtocolVersion(DEFAULT_PROTOCOL_VERSION)
.withReleaseDate(null)
.withReleaseStage(StandardDestinationDefinition.ReleaseStage.CUSTOM)
.withCustom(true),
Expand All @@ -424,17 +438,20 @@ void testUpdateDestination() throws ConfigNotFoundException, IOException, JsonVa
new DestinationDefinitionIdRequestBody().destinationDefinitionId(destinationDefinition.getDestinationDefinitionId()));
final String currentTag = currentDestination.getDockerImageTag();
final String newDockerImageTag = "averydifferenttag";
final String newProtocolVersion = "0.2.4";
assertNotEquals(newDockerImageTag, currentTag);
assertNotEquals(newProtocolVersion, currentDestination.getProtocolVersion());

final String newImageName = DockerUtils.getTaggedImageName(destinationDefinition.getDockerRepository(), newDockerImageTag);
final ConnectorSpecification newSpec = new ConnectorSpecification().withConnectionSpecification(
Jsons.jsonNode(ImmutableMap.of("foo2", "bar2")));
final ConnectorSpecification newSpec = new ConnectorSpecification()
.withConnectionSpecification(Jsons.jsonNode(ImmutableMap.of("foo2", "bar2")))
.withProtocolVersion(newProtocolVersion);
when(schedulerSynchronousClient.createGetSpecJob(newImageName)).thenReturn(new SynchronousResponse<>(
newSpec,
SynchronousJobMetadata.mock(ConfigType.GET_SPEC)));

final StandardDestinationDefinition updatedDestination =
Jsons.clone(destinationDefinition).withDockerImageTag(newDockerImageTag).withSpec(newSpec);
Jsons.clone(destinationDefinition).withDockerImageTag(newDockerImageTag).withSpec(newSpec).withProtocolVersion(newProtocolVersion);

final DestinationDefinitionRead destinationRead = destinationDefinitionsHandler.updateDestinationDefinition(
new DestinationDefinitionUpdate().destinationDefinitionId(this.destinationDefinition.getDestinationDefinitionId())
Expand Down Expand Up @@ -479,6 +496,7 @@ void testGrantDestinationDefinitionToWorkspace() throws JsonValidationException,
.dockerImageTag(destinationDefinition.getDockerImageTag())
.documentationUrl(new URI(destinationDefinition.getDocumentationUrl()))
.icon(DestinationDefinitionsHandler.loadIcon(destinationDefinition.getIcon()))
.protocolVersion(destinationDefinition.getProtocolVersion())
.releaseStage(ReleaseStage.fromValue(destinationDefinition.getReleaseStage().value()))
.releaseDate(LocalDate.parse(destinationDefinition.getReleaseDate()))
.resourceRequirements(new io.airbyte.api.model.generated.ActorDefinitionResourceRequirements()
Expand Down
Loading

0 comments on commit 9ad847b

Please sign in to comment.