Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use spec when persisting source configs #6036

Merged
merged 7 commits into from
Sep 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte-config/persistence/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ dependencies {
implementation project(':airbyte-db:lib')
implementation project(':airbyte-db:jooq')
implementation project(':airbyte-config:models')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-config:init')
implementation project(':airbyte-json-validation')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardWorkspace;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -168,7 +170,12 @@ public SourceConnection getSourceConnection(final UUID sourceId) throws JsonVali
return persistence.getConfig(ConfigSchema.SOURCE_CONNECTION, sourceId.toString(), SourceConnection.class);
}

public void writeSourceConnection(final SourceConnection source) throws JsonValidationException, IOException {
public void writeSourceConnection(final SourceConnection source, final ConnectorSpecification connectorSpecification)
throws JsonValidationException, IOException {
// actual validation is only for sanity checking
final JsonSchemaValidator validator = new JsonSchemaValidator();
validator.ensure(connectorSpecification.getConnectionSpecification(), source.getConfiguration());

persistence.writeConfig(ConfigSchema.SOURCE_CONNECTION, source.getSourceId().toString(), source);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.airbyte.analytics.Deployment;
import io.airbyte.analytics.TrackingClientSingleton;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.invoker.ApiClient;
import io.airbyte.api.client.invoker.ApiException;
import io.airbyte.api.client.model.HealthCheckRead;
import io.airbyte.commons.concurrency.GracefulShutdownHandler;
import io.airbyte.commons.version.AirbyteVersion;
import io.airbyte.config.Configs;
Expand All @@ -46,7 +50,6 @@
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.workers.WorkerApp;
import io.airbyte.workers.temporal.TemporalClient;
import java.io.IOException;
import java.nio.file.Path;
Expand Down Expand Up @@ -153,6 +156,25 @@ private void cleanupZombies(JobPersistence jobPersistence, JobNotifier jobNotifi
}
}

public static void waitForServer(Configs configs) throws InterruptedException {
final AirbyteApiClient apiClient = new AirbyteApiClient(
new ApiClient().setScheme("http")
.setHost(configs.getAirbyteApiHost())
.setPort(configs.getAirbyteApiPort())
.setBasePath("/api"));

boolean isHealthy = false;
while (!isHealthy) {
try {
HealthCheckRead healthCheck = apiClient.getHealthApi().getHealthCheck();
isHealthy = healthCheck.getDb();
} catch (ApiException e) {
LOGGER.info("Waiting for server to become available...");
Thread.sleep(2000);
}
}
}

public static void main(String[] args) throws IOException, InterruptedException {

final Configs configs = new EnvConfigs();
Expand All @@ -166,7 +188,7 @@ public static void main(String[] args) throws IOException, InterruptedException
LOGGER.info("temporalHost = " + temporalHost);

// Wait for the server to initialize the database and run migration
WorkerApp.waitForServer(configs);
waitForServer(configs);

LOGGER.info("Creating Job DB connection pool...");
final Database jobDatabase = new JobsDatabaseInstance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.FileSystemConfigPersistence;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.validation.json.JsonValidationException;
Expand Down Expand Up @@ -96,6 +97,7 @@ class WorkspaceHelperTest {
ConfigRepository configRepository;
JobPersistence jobPersistence;
WorkspaceHelper workspaceHelper;
ConnectorSpecification emptyConnectorSpec;

@BeforeEach
public void setup() throws IOException {
Expand All @@ -105,6 +107,9 @@ public void setup() throws IOException {
jobPersistence = mock(JobPersistence.class);

workspaceHelper = new WorkspaceHelper(configRepository, jobPersistence);

emptyConnectorSpec = mock(ConnectorSpecification.class);
when(emptyConnectorSpec.getConnectionSpecification()).thenReturn(Jsons.emptyObject());
}

@Test
Expand All @@ -130,13 +135,13 @@ public void testMissingObjectsProperException() {
@Test
public void testSource() throws IOException, JsonValidationException {
configRepository.writeStandardSource(SOURCE_DEF);
configRepository.writeSourceConnection(SOURCE);
configRepository.writeSourceConnection(SOURCE, emptyConnectorSpec);

final UUID retrievedWorkspace = workspaceHelper.getWorkspaceForSourceIdIgnoreExceptions(SOURCE_ID);
assertEquals(WORKSPACE_ID, retrievedWorkspace);

// check that caching is working
configRepository.writeSourceConnection(Jsons.clone(SOURCE).withWorkspaceId(UUID.randomUUID()));
configRepository.writeSourceConnection(Jsons.clone(SOURCE).withWorkspaceId(UUID.randomUUID()), emptyConnectorSpec);
final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForSourceIdIgnoreExceptions(SOURCE_ID);
assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate);
}
Expand All @@ -158,7 +163,7 @@ public void testDestination() throws IOException, JsonValidationException {
@Test
public void testConnection() throws IOException, JsonValidationException {
configRepository.writeStandardSource(SOURCE_DEF);
configRepository.writeSourceConnection(SOURCE);
configRepository.writeSourceConnection(SOURCE, emptyConnectorSpec);
configRepository.writeStandardDestinationDefinition(DEST_DEF);
configRepository.writeDestinationConnection(DEST);

Expand All @@ -175,7 +180,7 @@ public void testConnection() throws IOException, JsonValidationException {

// check that caching is working
final UUID newWorkspace = UUID.randomUUID();
configRepository.writeSourceConnection(Jsons.clone(SOURCE).withWorkspaceId(newWorkspace));
configRepository.writeSourceConnection(Jsons.clone(SOURCE).withWorkspaceId(newWorkspace), emptyConnectorSpec);
configRepository.writeDestinationConnection(Jsons.clone(DEST).withWorkspaceId(newWorkspace));
final UUID retrievedWorkspaceAfterUpdate = workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(DEST_ID);
assertEquals(WORKSPACE_ID, retrievedWorkspaceAfterUpdate);
Expand All @@ -198,7 +203,7 @@ public void testOperation() throws IOException, JsonValidationException {
@Test
public void testConnectionAndJobs() throws IOException, JsonValidationException {
configRepository.writeStandardSource(SOURCE_DEF);
configRepository.writeSourceConnection(SOURCE);
configRepository.writeSourceConnection(SOURCE, emptyConnectorSpec);
configRepository.writeStandardDestinationDefinition(DEST_DEF);
configRepository.writeDestinationConnection(DEST);
configRepository.writeStandardSync(CONNECTION);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,13 @@
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.db.instance.jobs.JobsDatabaseSchema;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.scheduler.persistence.DefaultJobPersistence;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.IdNotFoundKnownException;
import io.airbyte.server.handlers.SourceHandler;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import java.io.File;
Expand Down Expand Up @@ -90,23 +93,29 @@ public class ConfigDumpImporter {

private final ConfigRepository configRepository;
private final WorkspaceHelper workspaceHelper;
private final SpecFetcher specFetcher;
private final JsonSchemaValidator jsonSchemaValidator;
private final JobPersistence jobPersistence;
private final Path stagedResourceRoot;

public ConfigDumpImporter(ConfigRepository configRepository, JobPersistence jobPersistence, WorkspaceHelper workspaceHelper) {
this(configRepository, jobPersistence, workspaceHelper, new JsonSchemaValidator());
public ConfigDumpImporter(ConfigRepository configRepository,
JobPersistence jobPersistence,
WorkspaceHelper workspaceHelper,
SpecFetcher specFetcher) {
this(configRepository, jobPersistence, workspaceHelper, new JsonSchemaValidator(), specFetcher);
}

@VisibleForTesting
public ConfigDumpImporter(ConfigRepository configRepository,
JobPersistence jobPersistence,
WorkspaceHelper workspaceHelper,
JsonSchemaValidator jsonSchemaValidator) {
JsonSchemaValidator jsonSchemaValidator,
SpecFetcher specFetcher) {
this.jsonSchemaValidator = jsonSchemaValidator;
this.jobPersistence = jobPersistence;
this.configRepository = configRepository;
this.workspaceHelper = workspaceHelper;
this.specFetcher = specFetcher;
try {
this.stagedResourceRoot = Path.of(TMP_AIRBYTE_STAGED_RESOURCES);
if (stagedResourceRoot.toFile().exists()) {
Expand Down Expand Up @@ -405,15 +414,19 @@ private <T> void importConfigsIntoWorkspace(Path sourceRoot, UUID workspaceId, b
return sourceConnection;
},
(sourceConnection) -> {
final ConnectorSpecification spec;
// make sure connector definition exists
try {
if (configRepository.getStandardSourceDefinition(sourceConnection.getSourceDefinitionId()) == null) {
final StandardSourceDefinition sourceDefinition =
configRepository.getStandardSourceDefinition(sourceConnection.getSourceDefinitionId());
if (sourceDefinition == null) {
return;
}
spec = SourceHandler.getSpecFromSourceDefinitionId(specFetcher, sourceDefinition);
} catch (ConfigNotFoundException e) {
return;
}
configRepository.writeSourceConnection(sourceConnection);
configRepository.writeSourceConnection(sourceConnection, spec);
}));
case STANDARD_DESTINATION_DEFINITION -> importDestinationDefinitionIntoWorkspace(configs);
case DESTINATION_CONNECTION -> destinationIdMap.putAll(importIntoWorkspace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.airbyte.migrate.MigrateConfig;
import io.airbyte.migrate.MigrationRunner;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.validation.json.JsonValidationException;
import java.io.File;
import java.io.IOException;
Expand All @@ -52,11 +53,12 @@ public class RunMigration implements Runnable, AutoCloseable {
public RunMigration(JobPersistence jobPersistence,
ConfigRepository configRepository,
String targetVersion,
ConfigPersistence seedPersistence) {
ConfigPersistence seedPersistence,
SpecFetcher specFetcher) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is injecting the SpecFetcher here safe? While the migration is running, I think there is no guarantee that we can launch jobs in temporal. I know we block the scheduler starting up during a migration. I guess we technically don't block temporal?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needing to have access to a spec without having to run a docker container seems to be a recurring (stupid) problem. Wondering if we should just be storing the spec as past of source definition. Obviously out of scope for what you're doing, but what do you think we should be doing here overall?

If this is safe for now, then we should move forward with it. If it's not, then we should figure something else out 😄 . Either way, I think this might bite us in the future even if it is safe now, so it might be good to start to define what we truly want here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduler/worker wait until the server is live before serving work orders from the Temporal service.

The SpecFetcher uses a "scheduler" client that actually doesn't interact with the scheduler at all; it just interacts with tracking (Segment) and the temporal service. No db tables are touched, no actual Job is created/scheduled.

I'm wondering if we can just remove the wait on the server from the worker. Then it could service these requests on demand. Maybe we should also rename DefaultSynchronousSchedulerClient...

this.targetVersion = targetVersion;
this.seedPersistence = seedPersistence;
this.configDumpExporter = new ConfigDumpExporter(configRepository, jobPersistence, null);
this.configDumpImporter = new ConfigDumpImporter(configRepository, jobPersistence, null);
this.configDumpImporter = new ConfigDumpImporter(configRepository, jobPersistence, null, specFetcher);
}

@Override
Expand Down
30 changes: 17 additions & 13 deletions airbyte-server/src/main/java/io/airbyte/server/ServerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
import io.airbyte.scheduler.persistence.job_tracker.JobTracker;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.InvalidInputExceptionMapper;
import io.airbyte.server.errors.InvalidJsonExceptionMapper;
import io.airbyte.server.errors.InvalidJsonInputExceptionMapper;
Expand Down Expand Up @@ -212,13 +213,25 @@ public static ServerRunnable getServer(final ServerFactory apiFactory) throws Ex
jobPersistence.setVersion(airbyteVersion);
}

final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence);
final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(configs.getTemporalHost());
final TemporalClient temporalClient = TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot());
final OAuthConfigSupplier oAuthConfigSupplier = new OAuthConfigSupplier(configRepository, false);
final SchedulerJobClient schedulerJobClient = new DefaultSchedulerJobClient(jobPersistence, new DefaultJobCreator(jobPersistence));
final DefaultSynchronousSchedulerClient syncSchedulerClient =
new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier);
final SynchronousSchedulerClient bucketSpecCacheSchedulerClient =
new BucketSpecCacheSchedulerClient(syncSchedulerClient, configs.getSpecCacheBucket());
final SpecCachingSynchronousSchedulerClient cachingSchedulerClient = new SpecCachingSynchronousSchedulerClient(bucketSpecCacheSchedulerClient);
final SpecFetcher specFetcher = new SpecFetcher(cachingSchedulerClient);

Optional<String> airbyteDatabaseVersion = jobPersistence.getVersion();
if (airbyteDatabaseVersion.isPresent() && isDatabaseVersionBehindAppVersion(airbyteVersion, airbyteDatabaseVersion.get())) {
final boolean isKubernetes = configs.getWorkerEnvironment() == WorkerEnvironment.KUBERNETES;
final boolean versionSupportsAutoMigrate =
new AirbyteVersion(airbyteDatabaseVersion.get()).patchVersionCompareTo(KUBE_SUPPORT_FOR_AUTOMATIC_MIGRATION) >= 0;
if (!isKubernetes || versionSupportsAutoMigrate) {
runAutomaticMigration(configRepository, jobPersistence, airbyteVersion, airbyteDatabaseVersion.get());
runAutomaticMigration(configRepository, jobPersistence, specFetcher, airbyteVersion, airbyteDatabaseVersion.get());
// After migration, upgrade the DB version
airbyteDatabaseVersion = jobPersistence.getVersion();
} else {
Expand All @@ -231,17 +244,6 @@ public static ServerRunnable getServer(final ServerFactory apiFactory) throws Ex
if (airbyteDatabaseVersion.isPresent() && AirbyteVersion.isCompatible(airbyteVersion, airbyteDatabaseVersion.get())) {
LOGGER.info("Starting server...");

final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence);
final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(configs.getTemporalHost());
final TemporalClient temporalClient = TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot());
final OAuthConfigSupplier oAuthConfigSupplier = new OAuthConfigSupplier(configRepository, false);
final SchedulerJobClient schedulerJobClient = new DefaultSchedulerJobClient(jobPersistence, new DefaultJobCreator(jobPersistence));
final DefaultSynchronousSchedulerClient syncSchedulerClient =
new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier);
final SynchronousSchedulerClient bucketSpecCacheSchedulerClient =
new BucketSpecCacheSchedulerClient(syncSchedulerClient, configs.getSpecCacheBucket());
final SpecCachingSynchronousSchedulerClient cachingSchedulerClient = new SpecCachingSynchronousSchedulerClient(bucketSpecCacheSchedulerClient);

return apiFactory.create(
schedulerJobClient,
cachingSchedulerClient,
Expand All @@ -267,14 +269,16 @@ public static void main(final String[] args) throws Exception {
*/
private static void runAutomaticMigration(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final SpecFetcher specFetcher,
final String airbyteVersion,
final String airbyteDatabaseVersion) {
LOGGER.info("Running Automatic Migration from version : " + airbyteDatabaseVersion + " to version : " + airbyteVersion);
try (final RunMigration runMigration = new RunMigration(
jobPersistence,
configRepository,
airbyteVersion,
YamlSeedConfigPersistence.get())) {
YamlSeedConfigPersistence.get(),
specFetcher)) {
runMigration.run();
} catch (final Exception e) {
LOGGER.error("Automatic Migration failed ", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public ConfigurationApi(final ConfigRepository configRepository,
webBackendSourcesHandler = new WebBackendSourcesHandler(sourceHandler, configRepository);
webBackendDestinationsHandler = new WebBackendDestinationsHandler(destinationHandler, configRepository);
healthCheckHandler = new HealthCheckHandler(configRepository);
archiveHandler = new ArchiveHandler(configs.getAirbyteVersion(), configRepository, jobPersistence, workspaceHelper, archiveTtlManager);
archiveHandler =
new ArchiveHandler(configs.getAirbyteVersion(), configRepository, jobPersistence, workspaceHelper, archiveTtlManager, specFetcher);
logsHandler = new LogsHandler();
openApiConfigHandler = new OpenApiConfigHandler();
dbMigrationHandler = new DbMigrationHandler(configsDatabase, jobsDatabase);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.airbyte.scheduler.persistence.WorkspaceHelper;
import io.airbyte.server.ConfigDumpExporter;
import io.airbyte.server.ConfigDumpImporter;
import io.airbyte.server.converters.SpecFetcher;
import io.airbyte.server.errors.InternalServerKnownException;
import io.airbyte.validation.json.JsonValidationException;
import java.io.File;
Expand All @@ -58,12 +59,13 @@ public ArchiveHandler(final String version,
final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final WorkspaceHelper workspaceHelper,
final FileTtlManager fileTtlManager) {
final FileTtlManager fileTtlManager,
final SpecFetcher specFetcher) {
this(
version,
fileTtlManager,
new ConfigDumpExporter(configRepository, jobPersistence, workspaceHelper),
new ConfigDumpImporter(configRepository, jobPersistence, workspaceHelper));
new ConfigDumpImporter(configRepository, jobPersistence, workspaceHelper, specFetcher));
}

public ArchiveHandler(final String version,
Expand Down
Loading