Skip to content

Commit

Permalink
Parse docker image spec with specs list
Browse files Browse the repository at this point in the history
  • Loading branch information
tuliren committed Jun 8, 2022
1 parent 932125e commit 8fe41dd
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
import io.airbyte.commons.yaml.Yamls;
import io.airbyte.config.DockerImageSpec;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.Spec;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
Expand All @@ -27,21 +29,17 @@
import org.slf4j.LoggerFactory;

/**
* This script is responsible for ensuring that up-to-date {@link ConnectorSpecification}s for every
* connector definition in the seed are stored in a corresponding resource file, for the purpose of
* seeding the specs into the config database on server startup. See
* ./airbyte-config/specs/readme.md for more details on how this class is run and how it fits into
* the project.
* This script is responsible for ensuring that up-to-date {@link ConnectorSpecification}s for every connector definition in the seed are stored in a
* corresponding resource file, for the purpose of seeding the specs into the config database on server startup. See ./airbyte-config/specs/readme.md
* for more details on how this class is run and how it fits into the project.
* <p>
* Specs are stored in a separate file from the definitions in an effort to keep the definitions
* yaml files human-readable and easily-editable, as specs can be rather large.
* Specs are stored in a separate file from the definitions in an effort to keep the definitions yaml files human-readable and easily-editable, as
* specs can be rather large.
* <p>
* Specs are fetched from the GCS spec cache bucket, so if any specs are missing from the bucket
* then this will fail. Note that this script only pulls specs from the bucket cache; it never
* pushes specs to the bucket. Since this script runs at build time, the decision was to depend on
* the bucket cache rather than running a docker container to fetch the spec during the build which
* could be slow and unwieldy. If there is a failure, check the bucket cache and figure out how to
* get the correct spec in there.
* Specs are fetched from the GCS spec cache bucket, so if any specs are missing from the bucket then this will fail. Note that this script only pulls
* specs from the bucket cache; it never pushes specs to the bucket. Since this script runs at build time, the decision was to depend on the bucket
* cache rather than running a docker container to fetch the spec during the build which could be slow and unwieldy. If there is a failure, check the
* bucket cache and figure out how to get the correct spec in there.
*/
@SuppressWarnings("PMD.SignatureDeclareThrowsException")
public class SeedConnectorSpecGenerator {
Expand All @@ -50,6 +48,8 @@ public class SeedConnectorSpecGenerator {
private static final String DOCKER_IMAGE_TAG_FIELD = "dockerImageTag";
private static final String DOCKER_IMAGE_FIELD = "dockerImage";
private static final String SPEC_FIELD = "spec";
private static final String SPECS_FIELD = "specs";
private static final String SPEC_TYPE_FIELD = "spec_type";
private static final String SPEC_BUCKET_NAME = new EnvConfigs().getSpecCacheBucket();

private static final Logger LOGGER = LoggerFactory.getLogger(SeedConnectorSpecGenerator.class);
Expand Down Expand Up @@ -95,19 +95,32 @@ private JsonNode yamlToJson(final Path root, final String fileName) {
return Yamls.deserialize(yamlString);
}

final List<Spec> getSpecVariants(final JsonNode specs) {
if (specs == null) {
return Collections.emptyList();
}
return MoreIterators.toList(specs.elements())
.stream()
.map(json -> new Spec()
.withSpecType(Jsons.object(json.get(SPEC_TYPE_FIELD), Spec.SpecType.class))
.withSpec(Jsons.object(json.get(SPEC_FIELD), ConnectorSpecification.class)))
.toList();
}

@VisibleForTesting
final List<DockerImageSpec> fetchUpdatedSeedSpecs(final JsonNode seedDefinitions, final JsonNode currentSeedSpecs) {
final List<String> seedDefinitionsDockerImages = MoreIterators.toList(seedDefinitions.elements())
.stream()
.map(json -> String.format("%s:%s", json.get(DOCKER_REPOSITORY_FIELD).asText(), json.get(DOCKER_IMAGE_TAG_FIELD).asText()))
.collect(Collectors.toList());
.toList();

final Map<String, DockerImageSpec> currentSeedImageToSpec = MoreIterators.toList(currentSeedSpecs.elements())
.stream()
.collect(Collectors.toMap(
json -> json.get(DOCKER_IMAGE_FIELD).asText(),
json -> new DockerImageSpec().withDockerImage(json.get(DOCKER_IMAGE_FIELD).asText())
.withSpec(Jsons.object(json.get(SPEC_FIELD), ConnectorSpecification.class))));
.withSpec(Jsons.object(json.get(SPEC_FIELD), ConnectorSpecification.class))
.withSpecs(getSpecVariants(json.get(SPECS_FIELD)))));

return seedDefinitionsDockerImages
.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.DockerImageSpec;
import io.airbyte.config.Spec;
import io.airbyte.config.Spec.SpecType;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.util.Arrays;
Expand Down Expand Up @@ -138,8 +140,20 @@ void testNoFetchIsPerformedIfAllSpecsUpToDate() {
.withDockerImageTag(DOCKER_TAG1)
.withName(CONNECTOR_NAME1)
.withDocumentationUrl(DOCUMENTATION_URL);
final ConnectorSpecification spec = new ConnectorSpecification().withConnectionSpecification(Jsons.jsonNode(ImmutableMap.of("foo", "bar")));
final DockerImageSpec dockerImageSpec = new DockerImageSpec().withDockerImage(DOCKER_REPOSITORY1 + ":" + DOCKER_TAG1).withSpec(spec);
final ConnectorSpecification defaultSpec = new ConnectorSpecification()
.withConnectionSpecification(Jsons.jsonNode(ImmutableMap.of("foo", "bar")));
final Spec cloudSpec = new Spec()
.withSpecType(SpecType.CLOUD)
.withSpec(new ConnectorSpecification()
.withConnectionSpecification(Jsons.jsonNode(ImmutableMap.of("foo", "bar", "mode", "cloud"))));
final Spec ossSpec = new Spec()
.withSpecType(SpecType.CLOUD)
.withSpec(new ConnectorSpecification()
.withConnectionSpecification(Jsons.jsonNode(ImmutableMap.of("foo", "bar", "mode", "oss"))));

final DockerImageSpec dockerImageSpec = new DockerImageSpec()
.withDockerImage(DOCKER_REPOSITORY1 + ":" + DOCKER_TAG1).withSpec(defaultSpec)
.withSpecs(List.of(cloudSpec, ossSpec));

final JsonNode seedDefinitions = Jsons.jsonNode(List.of(sourceDefinition));
final JsonNode seedSpecs = Jsons.jsonNode(List.of(dockerImageSpec));
Expand All @@ -151,4 +165,19 @@ void testNoFetchIsPerformedIfAllSpecsUpToDate() {
verify(bucketSpecFetcherMock, never()).attemptFetch(any());
}

@Test
void testGetSpecVariants() {
final Spec cloudSpec = new Spec()
.withSpecType(SpecType.CLOUD)
.withSpec(new ConnectorSpecification()
.withConnectionSpecification(Jsons.jsonNode(ImmutableMap.of("foo", "bar", "mode", "cloud"))));
final Spec ossSpec = new Spec()
.withSpecType(SpecType.CLOUD)
.withSpec(new ConnectorSpecification()
.withConnectionSpecification(Jsons.jsonNode(ImmutableMap.of("foo", "bar", "mode", "oss"))));

assertEquals(List.of(cloudSpec, ossSpec),
seedConnectorSpecGenerator.getSpecVariants(Jsons.jsonNode(List.of(cloudSpec, ossSpec))));
}

}

0 comments on commit 8fe41dd

Please sign in to comment.