Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update ConfigRepository to read protocol version #16670

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.hash.Hashing;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.MoreBooleans;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.config.ActorCatalog;
import io.airbyte.config.AirbyteConfig;
import io.airbyte.config.ConfigSchema;
Expand Down Expand Up @@ -206,6 +207,8 @@ public List<StandardSourceDefinition> listStandardSourceDefinitions(final boolea
final List<StandardSourceDefinition> sourceDefinitions = new ArrayList<>();
for (final StandardSourceDefinition sourceDefinition : persistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION,
StandardSourceDefinition.class)) {
sourceDefinition.withProtocolVersion(AirbyteProtocolVersion
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be change to a common method or something like:

return persistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION,
StandardSourceDefinition.class)
.map(extract the protocol version)
.filter(filter the tombstone)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, we should use set since we don't use the returned value of withProtocolVersion

.getWithDefault(sourceDefinition.getSpec() != null ? sourceDefinition.getSpec().getProtocolVersion() : null).serialize());
if (!MoreBooleans.isTruthy(sourceDefinition.getTombstone()) || includeTombstone) {
sourceDefinitions.add(sourceDefinition);
}
Expand Down Expand Up @@ -306,6 +309,8 @@ public List<StandardDestinationDefinition> listStandardDestinationDefinitions(fi

for (final StandardDestinationDefinition destinationDefinition : persistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION,
StandardDestinationDefinition.class)) {
destinationDefinition.withProtocolVersion(AirbyteProtocolVersion
.getWithDefault(destinationDefinition.getSpec() != null ? destinationDefinition.getSpec().getProtocolVersion() : null).serialize());
if (!MoreBooleans.isTruthy(destinationDefinition.getTombstone()) || includeTombstone) {
destinationDefinitions.add(destinationDefinition);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import static org.mockito.Mockito.when;

import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.version.AirbyteProtocolVersion;
import io.airbyte.config.ConfigSchema;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
Expand All @@ -31,6 +32,7 @@
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.StreamDescriptor;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
Expand Down Expand Up @@ -225,6 +227,46 @@ void testListStandardSourceDefinitionsHandlesTombstoneSourceDefinitions(final in
assertEquals(allSourceDefinitions, returnedSourceDefinitionsWithTombstone);
}

@Test
void testListDestinationDefinitionsWithVersion() throws JsonValidationException, IOException {
final List<StandardDestinationDefinition> allSourceDefinitions = List.of(
new StandardDestinationDefinition(),
new StandardDestinationDefinition().withSpec(new ConnectorSpecification().withProtocolVersion("0.3.1")),
// We expect the protocol version to be in the ConnectorSpec, so we'll override regardless.
new StandardDestinationDefinition().withProtocolVersion("0.4.0").withSpec(new ConnectorSpecification().withProtocolVersion("0.4.1")),
new StandardDestinationDefinition().withProtocolVersion("0.5.0").withSpec(new ConnectorSpecification()));

when(configPersistence.listConfigs(ConfigSchema.STANDARD_DESTINATION_DEFINITION, StandardDestinationDefinition.class))
.thenReturn(allSourceDefinitions);

final List<StandardDestinationDefinition> destinationDefinitions = configRepository.listStandardDestinationDefinitions(false);
final List<String> protocolVersions = destinationDefinitions.stream().map(StandardDestinationDefinition::getProtocolVersion).toList();
assertEquals(
List.of(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize(), "0.3.1", "0.4.1",
AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize()),
protocolVersions);
}

@Test
void testListSourceDefinitionsWithVersion() throws JsonValidationException, IOException {
final List<StandardSourceDefinition> allSourceDefinitions = List.of(
new StandardSourceDefinition(),
new StandardSourceDefinition().withSpec(new ConnectorSpecification().withProtocolVersion("0.6.0")),
// We expect the protocol version to be in the ConnectorSpec, so we'll override regardless.
new StandardSourceDefinition().withProtocolVersion("0.7.0").withSpec(new ConnectorSpecification().withProtocolVersion("0.7.1")),
new StandardSourceDefinition().withProtocolVersion("0.8.0").withSpec(new ConnectorSpecification()));

when(configPersistence.listConfigs(ConfigSchema.STANDARD_SOURCE_DEFINITION, StandardSourceDefinition.class))
.thenReturn(allSourceDefinitions);

final List<StandardSourceDefinition> sourceDefinitions = configRepository.listStandardSourceDefinitions(false);
final List<String> protocolVersions = sourceDefinitions.stream().map(StandardSourceDefinition::getProtocolVersion).toList();
assertEquals(
List.of(AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize(), "0.6.0", "0.7.1",
AirbyteProtocolVersion.DEFAULT_AIRBYTE_PROTOCOL_VERSION.serialize()),
protocolVersions);
}

@Test
void testDeleteSourceDefinitionAndAssociations() throws JsonValidationException, IOException, ConfigNotFoundException {
final StandardSourceDefinition sourceDefToDelete = new StandardSourceDefinition().withSourceDefinitionId(UUID.randomUUID());
Expand Down