Skip to content

Commit

Permalink
Backfill specs into definitions (#7616)
Browse files Browse the repository at this point in the history
  • Loading branch information
lmossman authored Nov 6, 2021
1 parent 9791a14 commit 058c8f8
Show file tree
Hide file tree
Showing 2 changed files with 282 additions and 0 deletions.
123 changes: 123 additions & 0 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,24 @@

package io.airbyte.server;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.airbyte.analytics.Deployment;
import io.airbyte.analytics.TrackingClient;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.api.model.LogRead;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.init.YamlSeedConfigPersistence;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
Expand All @@ -27,17 +34,20 @@
import io.airbyte.db.instance.configs.ConfigsDatabaseMigrator;
import io.airbyte.db.instance.jobs.JobsDatabaseInstance;
import io.airbyte.db.instance.jobs.JobsDatabaseMigrator;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.BucketSpecCacheSchedulerClient;
import io.airbyte.scheduler.client.DefaultSchedulerJobClient;
import io.airbyte.scheduler.client.DefaultSynchronousSchedulerClient;
import io.airbyte.scheduler.client.SchedulerJobClient;
import io.airbyte.scheduler.client.SpecCachingSynchronousSchedulerClient;
import io.airbyte.scheduler.client.SynchronousResponse;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import io.airbyte.scheduler.persistence.DefaultJobCreator;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.server.converters.JobConverter;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.InvalidInputExceptionMapper;
import io.airbyte.server.errors.InvalidJsonExceptionMapper;
Expand Down Expand Up @@ -238,6 +248,15 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
runFlywayMigration(configs, configDatabase, jobDatabase);
configPersistence.loadData(seed);

// todo (lmossman) - this will only exist temporarily to ensure all definitions contain specs. It
// will be removed after the faux major version bump
migrateAllDefinitionsToContainSpec(
configRepository,
cachingSchedulerClient,
trackingClient,
configs.getWorkerEnvironment(),
configs.getLogConfigs());

return apiFactory.create(
schedulerJobClient,
cachingSchedulerClient,
Expand All @@ -260,6 +279,110 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
}
}

/**
* Check that each spec in the database has a spec. If it doesn't, add it. If it can't be added,
* track the failure in Segment. The goal is to try to end up in a state where all definitions in
* the db contain specs, and to understand what is stopping us from getting there.
*
* @param configRepository - access to the db
* @param schedulerClient - scheduler client so that specs can be fetched as needed
* @param trackingClient
* @param workerEnvironment
* @param logConfigs
*/
@VisibleForTesting
static void migrateAllDefinitionsToContainSpec(final ConfigRepository configRepository,
final SynchronousSchedulerClient schedulerClient,
final TrackingClient trackingClient,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs)
throws JsonValidationException, IOException {
final JobConverter jobConverter = new JobConverter(workerEnvironment, logConfigs);
for (final StandardSourceDefinition sourceDef : configRepository.listStandardSourceDefinitions()) {
try {
if (sourceDef.getSpec() == null) {
LOGGER.info(
"migrateAllDefinitionsToContainSpec - Source Definition {} does not have a spec. Attempting to retrieve spec...",
sourceDef.getName());
final SynchronousResponse<ConnectorSpecification> getSpecJob = schedulerClient
.createGetSpecJob(sourceDef.getDockerRepository() + ":" + sourceDef.getDockerImageTag());
if (getSpecJob.isSuccess()) {
LOGGER.info(
"migrateAllDefinitionsToContainSpec - Spec for Source Definition {} was successfully retrieved. Writing to the db...",
sourceDef.getName());
final StandardSourceDefinition updatedDef = Jsons.clone(sourceDef).withSpec(getSpecJob.getOutput());
configRepository.writeStandardSourceDefinition(updatedDef);
LOGGER.info(
"migrateAllDefinitionsToContainSpec - Spec for Source Definition {} was successfully written to the db record.",
sourceDef.getName());
} else {
final LogRead logRead = jobConverter.getLogRead(getSpecJob.getMetadata().getLogPath());
LOGGER.info(
"migrateAllDefinitionsToContainSpec - Failed to retrieve spec for Source Definition {}. Logs: {}",
sourceDef.getName(),
logRead.toString());
throw new RuntimeException(String.format(
"Failed to retrieve spec for Source Definition %s. Logs: %s",
sourceDef.getName(),
logRead.toString()));
}
}
} catch (final Exception e) {
trackSpecBackfillFailure(trackingClient, configRepository, sourceDef.getDockerRepository(), sourceDef.getDockerImageTag(), e);
}
}

for (final StandardDestinationDefinition destDef : configRepository.listStandardDestinationDefinitions()) {
try {
if (destDef.getSpec() == null) {
LOGGER.info(
"migrateAllDefinitionsToContainSpec - Destination Definition {} does not have a spec. Attempting to retrieve spec...",
destDef.getName());
final SynchronousResponse<ConnectorSpecification> getSpecJob = schedulerClient
.createGetSpecJob(destDef.getDockerRepository() + ":" + destDef.getDockerImageTag());
if (getSpecJob.isSuccess()) {
LOGGER.info(
"migrateAllDefinitionsToContainSpec - Spec for Destination Definition {} was successfully retrieved. Writing to the db...",
destDef.getName());
final StandardDestinationDefinition updatedDef = Jsons.clone(destDef).withSpec(getSpecJob.getOutput());
configRepository.writeStandardDestinationDefinition(updatedDef);
LOGGER.info(
"migrateAllDefinitionsToContainSpec - Spec for Destination Definition {} was successfully written to the db record.",
destDef.getName());
} else {
final LogRead logRead = jobConverter.getLogRead(getSpecJob.getMetadata().getLogPath());
LOGGER.info(
"migrateAllDefinitionsToContainSpec - Failed to retrieve spec for Destination Definition {}. Logs: {}",
destDef.getName(),
logRead.toString());
throw new RuntimeException(String.format(
"Failed to retrieve spec for Destination Definition %s. Logs: %s",
destDef.getName(),
logRead.toString()));
}
}
} catch (final Exception e) {
trackSpecBackfillFailure(trackingClient, configRepository, destDef.getDockerRepository(), destDef.getDockerImageTag(), e);
}
}
}

private static void trackSpecBackfillFailure(final TrackingClient trackingClient,
final ConfigRepository configRepository,
final String dockerRepo,
final String dockerImageTag,
final Exception exception)
throws JsonValidationException, IOException {
// There is guaranteed to be at least one workspace, because the getServer() function enforces that
final UUID workspaceId = configRepository.listStandardWorkspaces(true).get(0).getWorkspaceId();

final ImmutableMap<String, Object> metadata = ImmutableMap.of(
"docker_image_name", dockerRepo,
"docker_image_tag", dockerImageTag,
"exception", exception);
trackingClient.track(workspaceId, "failed_spec_backfill", metadata);
}

@Deprecated
@SuppressWarnings({"DeprecatedIsStillUsed"})
private static Optional<AirbyteVersion> runFileMigration(final AirbyteVersion airbyteVersion,
Expand Down
159 changes: 159 additions & 0 deletions airbyte-server/src/test/java/io/airbyte/server/BackfillSpecTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.server;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import io.airbyte.analytics.TrackingClient;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSourceDefinition;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.client.SynchronousJobMetadata;
import io.airbyte.scheduler.client.SynchronousResponse;
import io.airbyte.scheduler.client.SynchronousSchedulerClient;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class BackfillSpecTest {

private static final String SOURCE_DOCKER_REPO = "docker-repo/source";
private static final String DEST_DOCKER_REPO = "docker-repo/destination";
private static final String DOCKER_IMAGE_TAG = "tag";
private static final String FAILED_SPEC_BACKFILL_ACTION = "failed_spec_backfill";
private static final StandardWorkspace WORKSPACE = new StandardWorkspace().withWorkspaceId(UUID.randomUUID());

private ConfigRepository configRepository;
private TrackingClient trackingClient;
private SynchronousSchedulerClient schedulerClient;

@BeforeEach
void setup() throws IOException, JsonValidationException {
configRepository = mock(ConfigRepository.class);
when(configRepository.listStandardWorkspaces(true)).thenReturn(List.of(WORKSPACE));

trackingClient = mock(TrackingClient.class);
schedulerClient = mock(SynchronousSchedulerClient.class);
}

@Test
public void testBackfillSpecSuccessful() throws JsonValidationException, IOException {
final StandardSourceDefinition sourceDef = new StandardSourceDefinition().withDockerRepository(SOURCE_DOCKER_REPO)
.withDockerImageTag(DOCKER_IMAGE_TAG);
final StandardDestinationDefinition destDef = new StandardDestinationDefinition().withDockerRepository(DEST_DOCKER_REPO)
.withDockerImageTag(DOCKER_IMAGE_TAG);

when(configRepository.listStandardSourceDefinitions()).thenReturn(List.of(sourceDef));
when(configRepository.listStandardDestinationDefinitions()).thenReturn(List.of(destDef));

final ConnectorSpecification sourceSpec = new ConnectorSpecification().withDocumentationUrl(URI.create("http://source.org"));
final ConnectorSpecification destSpec = new ConnectorSpecification().withDocumentationUrl(URI.create("http://dest.org"));

final SynchronousResponse<ConnectorSpecification> successfulSourceResponse = new SynchronousResponse<>(
sourceSpec,
mockJobMetadata(true));
final SynchronousResponse<ConnectorSpecification> successfulDestResponse = new SynchronousResponse<>(
destSpec,
mockJobMetadata(true));

final SynchronousSchedulerClient schedulerClient = mock(SynchronousSchedulerClient.class);
when(schedulerClient.createGetSpecJob(SOURCE_DOCKER_REPO + ":" + DOCKER_IMAGE_TAG)).thenReturn(successfulSourceResponse);
when(schedulerClient.createGetSpecJob(DEST_DOCKER_REPO + ":" + DOCKER_IMAGE_TAG)).thenReturn(successfulDestResponse);

ServerApp.migrateAllDefinitionsToContainSpec(configRepository, schedulerClient, trackingClient, WorkerEnvironment.DOCKER, mock(LogConfigs.class));

final StandardSourceDefinition expectedSourceDef = Jsons.clone(sourceDef).withSpec(sourceSpec);
final StandardDestinationDefinition expectedDestDef = Jsons.clone(destDef).withSpec(destSpec);
verify(configRepository, times(1)).writeStandardSourceDefinition(expectedSourceDef);
verify(configRepository, times(1)).writeStandardDestinationDefinition(expectedDestDef);
}

@Test
public void testBackfillSpecFailure() throws JsonValidationException, IOException {
final StandardSourceDefinition sourceDef = new StandardSourceDefinition().withDockerRepository(SOURCE_DOCKER_REPO)
.withDockerImageTag(DOCKER_IMAGE_TAG);
final StandardDestinationDefinition destDef = new StandardDestinationDefinition().withDockerRepository(DEST_DOCKER_REPO)
.withDockerImageTag(DOCKER_IMAGE_TAG);

when(configRepository.listStandardSourceDefinitions()).thenReturn(List.of(sourceDef));
when(configRepository.listStandardDestinationDefinitions()).thenReturn(List.of(destDef));

final ConnectorSpecification sourceSpec = new ConnectorSpecification().withDocumentationUrl(URI.create("http://source.org"));
final ConnectorSpecification destSpec = new ConnectorSpecification().withDocumentationUrl(URI.create("http://dest.org"));

final SynchronousResponse<ConnectorSpecification> failureSourceResponse = new SynchronousResponse<>(
sourceSpec,
mockJobMetadata(false));
final SynchronousResponse<ConnectorSpecification> failureDestResponse = new SynchronousResponse<>(
destSpec,
mockJobMetadata(false));

when(schedulerClient.createGetSpecJob(SOURCE_DOCKER_REPO + ":" + DOCKER_IMAGE_TAG)).thenReturn(failureSourceResponse);
when(schedulerClient.createGetSpecJob(DEST_DOCKER_REPO + ":" + DOCKER_IMAGE_TAG)).thenReturn(failureDestResponse);

ServerApp.migrateAllDefinitionsToContainSpec(configRepository, schedulerClient, trackingClient, WorkerEnvironment.DOCKER, mock(LogConfigs.class));

verify(configRepository, never()).writeStandardSourceDefinition(any());
verify(configRepository, never()).writeStandardDestinationDefinition(any());

verify(trackingClient, times(2)).track(eq(WORKSPACE.getWorkspaceId()), eq(FAILED_SPEC_BACKFILL_ACTION), anyMap());
}

@Test
public void testSpecAlreadyExists() throws JsonValidationException, IOException {
final ConnectorSpecification sourceSpec = new ConnectorSpecification().withDocumentationUrl(URI.create("http://source.org"));
final ConnectorSpecification destSpec = new ConnectorSpecification().withDocumentationUrl(URI.create("http://dest.org"));
final StandardSourceDefinition sourceDef = new StandardSourceDefinition().withDockerRepository(SOURCE_DOCKER_REPO)
.withDockerImageTag(DOCKER_IMAGE_TAG).withSpec(sourceSpec);
final StandardDestinationDefinition destDef = new StandardDestinationDefinition().withDockerRepository(DEST_DOCKER_REPO)
.withDockerImageTag(DOCKER_IMAGE_TAG).withSpec(destSpec);

when(configRepository.listStandardSourceDefinitions()).thenReturn(List.of(sourceDef));
when(configRepository.listStandardDestinationDefinitions()).thenReturn(List.of(destDef));

ServerApp.migrateAllDefinitionsToContainSpec(
configRepository,
mock(SynchronousSchedulerClient.class),
trackingClient,
WorkerEnvironment.DOCKER,
mock(LogConfigs.class));

verify(schedulerClient, never()).createGetSpecJob(any());
verify(configRepository, never()).writeStandardSourceDefinition(any());
verify(configRepository, never()).writeStandardDestinationDefinition(any());
}

private SynchronousJobMetadata mockJobMetadata(final boolean succeeded) {
final long now = Instant.now().toEpochMilli();
return new SynchronousJobMetadata(
UUID.randomUUID(),
ConfigType.GET_SPEC,
UUID.randomUUID(),
now,
now,
succeeded,
Path.of("path", "to", "logs"));
}

}

0 comments on commit 058c8f8

Please sign in to comment.