Skip to content

Commit

Permalink
Update ConfigRepository to read protocol version (airbytehq#16670)
Browse files Browse the repository at this point in the history
When reading actor definitions, we need to copy the protocol version from the connector spec into the top level protocol version value of the standard definitions for it to be stored in the database.
  • Loading branch information
gosusnp authored and robbinhan committed Sep 29, 2022
1 parent c831c78 commit 6e7e9ed
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.hash.Hashing;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
Expand Down Expand Up @@ -206,6 +207,8 @@ public List<StandardSourceDefinition> listStandardSourceDefinitions(final boolea
final List<StandardSourceDefinition> sourceDefinitions = new ArrayList<>();
for (final StandardSourceDefinition sourceDefinition : persistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION,
StandardSourceDefinition.class)) {
sourceDefinition.withProtocolVersion(AirbyteProtocolVersion
.getWithDefault(sourceDefinition.getSpec() != null ? sourceDefinition.getSpec().getProtocolVersion() : null).serialize());
if (!MoreBooleans.isTruthy(sourceDefinition.getTombstone()) || includeTombstone) {
sourceDefinitions.add(sourceDefinition);
}
Expand Down Expand Up @@ -306,6 +309,8 @@ public List<StandardDestinationDefinition> listStandardDestinationDefinitions(fi

for (final StandardDestinationDefinition destinationDefinition : persistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION,
StandardDestinationDefinition.class)) {
destinationDefinition.withProtocolVersion(AirbyteProtocolVersion
.getWithDefault(destinationDefinition.getSpec() != null ? destinationDefinition.getSpec().getProtocolVersion() : null).serialize());
if (!MoreBooleans.isTruthy(destinationDefinition.getTombstone()) || includeTombstone) {
destinationDefinitions.add(destinationDefinition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.mockito.Mockito.when;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
Expand All @@ -31,6 +32,7 @@
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
Expand Down Expand Up @@ -225,6 +227,46 @@ void testListStandardSourceDefinitionsHandlesTombstoneSourceDefinitions(final in
assertEquals(allSourceDefinitions, returnedSourceDefinitionsWithTombstone);
}

@Test
void testListDestinationDefinitionsWithVersion() throws JsonValidationException, IOException {
final List<StandardDestinationDefinition> allSourceDefinitions = List.of(
new StandardDestinationDefinition(),
new StandardDestinationDefinition().withSpec(new ConnectorSpecification().withProtocolVersion("0.3.1")),
// We expect the protocol version to be in the ConnectorSpec, so we'll override regardless.
new StandardDestinationDefinition().withProtocolVersion("0.4.0").withSpec(new ConnectorSpecification().withProtocolVersion("0.4.1")),
new StandardDestinationDefinition().withProtocolVersion("0.5.0").withSpec(new ConnectorSpecification()));

when(configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class))
.thenReturn(allSourceDefinitions);

final List<StandardDestinationDefinition> destinationDefinitions = configRepository.listStandardDestinationDefinitions(false);
final List<String> protocolVersions = destinationDefinitions.stream().map(StandardDestinationDefinition::getProtocolVersion).toList();
assertEquals(
List.of(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize(), "0.3.1", "0.4.1",
AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize()),
protocolVersions);
}

@Test
void testListSourceDefinitionsWithVersion() throws JsonValidationException, IOException {
final List<StandardSourceDefinition> allSourceDefinitions = List.of(
new StandardSourceDefinition(),
new StandardSourceDefinition().withSpec(new ConnectorSpecification().withProtocolVersion("0.6.0")),
// We expect the protocol version to be in the ConnectorSpec, so we'll override regardless.
new StandardSourceDefinition().withProtocolVersion("0.7.0").withSpec(new ConnectorSpecification().withProtocolVersion("0.7.1")),
new StandardSourceDefinition().withProtocolVersion("0.8.0").withSpec(new ConnectorSpecification()));

when(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class))
.thenReturn(allSourceDefinitions);

final List<StandardSourceDefinition> sourceDefinitions = configRepository.listStandardSourceDefinitions(false);
final List<String> protocolVersions = sourceDefinitions.stream().map(StandardSourceDefinition::getProtocolVersion).toList();
assertEquals(
List.of(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize(), "0.6.0", "0.7.1",
AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize()),
protocolVersions);
}

@Test
void testDeleteSourceDefinitionAndAssociations() throws JsonValidationException, IOException, ConfigNotFoundException {
final StandardSourceDefinition sourceDefToDelete = new StandardSourceDefinition().withSourceDefinitionId(UUID.randomUUID());
Expand Down

0 comments on commit 6e7e9ed

Please sign in to comment.