From 8ef100ba058c500897fec250886d94a046ce8a82 Mon Sep 17 00:00:00 2001 From: cgardens Date: Thu, 21 Oct 2021 11:18:41 -0700 Subject: [PATCH] save state in temporal --- .../persistence/ConfigPersistence2.java | 83 +++++++++++++ .../persistence/ConfigPersistence2Test.java | 7 ++ .../airbyte/scheduler/app/JobScheduler.java | 4 +- .../airbyte/scheduler/app/SchedulerApp.java | 24 +++- .../worker_run/TemporalWorkerRunFactory.java | 12 +- .../persistence/DefaultJobCreator.java | 7 +- .../persistence/DefaultJobPersistence.java | 75 +----------- .../scheduler/persistence/JobPersistence.java | 14 --- .../DefaultJobPersistenceTest.java | 112 +----------------- .../server/ConfigurationApiFactory.java | 5 + .../java/io/airbyte/server/ServerApp.java | 8 +- .../java/io/airbyte/server/ServerFactory.java | 4 + .../airbyte/server/apis/ConfigurationApi.java | 3 + .../server/handlers/SchedulerHandler.java | 10 +- .../server/handlers/ArchiveHandlerTest.java | 2 +- .../server/handlers/SchedulerHandlerTest.java | 8 +- .../migration/DatabaseArchiverTest.java | 2 +- .../server/migration/RunMigrationTest.java | 2 +- .../java/io/airbyte/workers/WorkerApp.java | 24 +++- .../workers/temporal/SyncWorkflow.java | 75 +++++++++--- .../temporal/TemporalAttemptExecution.java | 2 +- .../workers/temporal/TemporalClient.java | 5 +- 22 files changed, 248 insertions(+), 240 deletions(-) create mode 100644 airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence2.java create mode 100644 airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistence2Test.java diff --git a/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence2.java b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence2.java new file mode 100644 index 000000000000..c15bdafd72d5 --- /dev/null +++ b/airbyte-config/persistence/src/main/java/io/airbyte/config/persistence/ConfigPersistence2.java @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.config.persistence; + +import static io.airbyte.db.instance.configs.jooq.Tables.SYNC_STATE; + +import com.google.common.annotations.VisibleForTesting; +import io.airbyte.commons.json.Jsons; +import io.airbyte.config.State; +import io.airbyte.db.Database; +import io.airbyte.db.ExceptionWrappingDatabase; +import java.io.IOException; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Optional; +import java.util.UUID; +import java.util.function.Supplier; +import org.jooq.JSONB; +import org.jooq.Record1; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ConfigPersistence2 { + + private static final Logger LOGGER = LoggerFactory.getLogger(ConfigPersistence2.class); + + private final ExceptionWrappingDatabase configDatabase; + private final Supplier timeSupplier; + + public ConfigPersistence2(final Database configDatabase) { + this(configDatabase, Instant::now); + } + + @VisibleForTesting + ConfigPersistence2(final Database configDatabase, final Supplier timeSupplier) { + this.configDatabase = new ExceptionWrappingDatabase(configDatabase); + + this.timeSupplier = timeSupplier; + } + + public Optional getCurrentState(final UUID connectionId) throws IOException { + return configDatabase.query(ctx -> { + final Record1 record = ctx.select(SYNC_STATE.STATE) + .from(SYNC_STATE) + .where(SYNC_STATE.SYNC_ID.eq(connectionId)) + .fetchAny(); + if (record == null) { + return Optional.empty(); + } + + return Optional.of(Jsons.deserialize(record.value1().data(), State.class)); + }); + } + + public void updateSyncState(final UUID connectionId, final State state) throws IOException { + final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); + + configDatabase.transaction( + ctx -> { + final boolean hasExistingRecord = ctx.fetchExists(SYNC_STATE, SYNC_STATE.SYNC_ID.eq(connectionId)); + if (hasExistingRecord) { + LOGGER.info("Updating connection {} state", connectionId); + return ctx.update(SYNC_STATE) + .set(SYNC_STATE.STATE, JSONB.valueOf(Jsons.serialize(state))) + .set(SYNC_STATE.UPDATED_AT, now) + .where(SYNC_STATE.SYNC_ID.eq(connectionId)) + .execute(); + } else { + LOGGER.info("Inserting new state for connection {}", connectionId); + return ctx.insertInto(SYNC_STATE) + .set(SYNC_STATE.SYNC_ID, connectionId) + .set(SYNC_STATE.STATE, JSONB.valueOf(Jsons.serialize(state))) + .set(SYNC_STATE.CREATED_AT, now) + .set(SYNC_STATE.UPDATED_AT, now) + .execute(); + } + }); + } + +} diff --git a/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistence2Test.java b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistence2Test.java new file mode 100644 index 000000000000..fca39e71cf21 --- /dev/null +++ b/airbyte-config/persistence/src/test/java/io/airbyte/config/persistence/ConfigPersistence2Test.java @@ -0,0 +1,7 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.config.persistence; + +class ConfigPersistence2Test {} diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java index ddd943bb8c36..d6004a797ef6 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java @@ -9,6 +9,7 @@ import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSync.Status; import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigPersistence2; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.persistence.DefaultJobCreator; @@ -47,6 +48,7 @@ public class JobScheduler implements Runnable { } public JobScheduler(final JobPersistence jobPersistence, + final ConfigPersistence2 configPersistence2, final ConfigRepository configRepository, final TrackingClient trackingClient) { this( @@ -54,7 +56,7 @@ public JobScheduler(final JobPersistence jobPersistence, configRepository, new ScheduleJobPredicate(Instant::now), new DefaultSyncJobFactory( - new DefaultJobCreator(jobPersistence), + new DefaultJobCreator(jobPersistence, configPersistence2), configRepository, new OAuthConfigSupplier(configRepository, false, trackingClient))); } diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index 64a59a4667dd..0793e16a165d 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -18,6 +18,7 @@ import io.airbyte.config.EnvConfigs; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.persistence.ConfigPersistence; +import io.airbyte.config.persistence.ConfigPersistence2; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.DatabaseConfigPersistence; import io.airbyte.config.persistence.split_secrets.SecretPersistence; @@ -75,6 +76,7 @@ public class SchedulerApp { private final Path workspaceRoot; private final JobPersistence jobPersistence; private final ConfigRepository configRepository; + private final ConfigPersistence2 configPersistence2; private final JobCleaner jobCleaner; private final JobNotifier jobNotifier; private final TemporalClient temporalClient; @@ -85,6 +87,7 @@ public class SchedulerApp { public SchedulerApp(final Path workspaceRoot, final JobPersistence jobPersistence, final ConfigRepository configRepository, + final ConfigPersistence2 configPersistence2, final JobCleaner jobCleaner, final JobNotifier jobNotifier, final TemporalClient temporalClient, @@ -94,6 +97,7 @@ public SchedulerApp(final Path workspaceRoot, this.workspaceRoot = workspaceRoot; this.jobPersistence = jobPersistence; this.configRepository = configRepository; + this.configPersistence2 = configPersistence2; this.jobCleaner = jobCleaner; this.jobNotifier = jobNotifier; this.temporalClient = temporalClient; @@ -110,7 +114,7 @@ public void start() throws IOException { final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot, airbyteVersionOrWarnings); final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier, maxSyncJobAttempts); final TrackingClient trackingClient = TrackingClientSingleton.get(); - final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository, trackingClient); + final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configPersistence2, configRepository, trackingClient); final JobSubmitter jobSubmitter = new JobSubmitter( workerThreadPool, jobPersistence, @@ -209,18 +213,19 @@ public static void main(final String[] args) throws IOException, InterruptedExce configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl()) .getInitialized(); + final ConfigPersistence2 configPersistence2 = new ConfigPersistence2(configDatabase); final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase).withValidation(); final Optional secretPersistence = SecretPersistence.getLongLived(configs); final Optional ephemeralSecretPersistence = SecretPersistence.getEphemeral(configs); final SecretsHydrator secretsHydrator = SecretPersistence.getSecretsHydrator(configs); final ConfigRepository configRepository = new ConfigRepository(configPersistence, secretsHydrator, secretPersistence, ephemeralSecretPersistence); - final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase, configDatabase); + final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); final JobCleaner jobCleaner = new JobCleaner( configs.getWorkspaceRetentionConfig(), workspaceRoot, jobPersistence); - AirbyteVersion.assertIsCompatible(configs.getAirbyteVersion(), jobPersistence.getVersion().get()); + AirbyteVersion.assertIsCompatible(configs.getAirbyteVersion(), jobPersistence.getVersion().orElseThrow()); TrackingClientSingleton.initialize( configs.getTrackingStrategy(), @@ -239,8 +244,17 @@ public static void main(final String[] args) throws IOException, InterruptedExce MetricSingleton.initializeMonitoringServiceDaemon("8082", mdc, configs.getPublishMetrics()); LOGGER.info("Launching scheduler..."); - new SchedulerApp(workspaceRoot, jobPersistence, configRepository, jobCleaner, jobNotifier, temporalClient, - Integer.parseInt(configs.getSubmitterNumThreads()), configs.getMaxSyncJobAttempts(), configs.getAirbyteVersionOrWarning()) + new SchedulerApp( + workspaceRoot, + jobPersistence, + configRepository, + configPersistence2, + jobCleaner, + jobNotifier, + temporalClient, + Integer.parseInt(configs.getSubmitterNumThreads()), + configs.getMaxSyncJobAttempts(), + configs.getAirbyteVersionOrWarning()) .start(); } diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/worker_run/TemporalWorkerRunFactory.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/worker_run/TemporalWorkerRunFactory.java index 579e7b5e9266..65aed379df31 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/worker_run/TemporalWorkerRunFactory.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/worker_run/TemporalWorkerRunFactory.java @@ -20,6 +20,7 @@ import io.airbyte.workers.temporal.TemporalJobType; import io.airbyte.workers.temporal.TemporalResponse; import java.nio.file.Path; +import java.util.UUID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,9 +45,14 @@ public WorkerRun create(final Job job) { public CheckedSupplier, Exception> createSupplier(final Job job, final int attemptId) { final TemporalJobType temporalJobType = toTemporalJobType(job.getConfigType()); + final UUID connectionId = UUID.fromString(job.getScope()); return switch (job.getConfigType()) { case SYNC -> () -> { - final TemporalResponse output = temporalClient.submitSync(job.getId(), attemptId, job.getConfig().getSync()); + final TemporalResponse output = temporalClient.submitSync( + job.getId(), + attemptId, + job.getConfig().getSync(), + connectionId); return toOutputAndStatus(output); }; case RESET_CONNECTION -> () -> { @@ -63,7 +69,7 @@ public CheckedSupplier, Exception> createSupplier(fin .withOperationSequence(resetConnection.getOperationSequence()) .withResourceRequirements(resetConnection.getResourceRequirements()); - final TemporalResponse output = temporalClient.submitSync(job.getId(), attemptId, config); + final TemporalResponse output = temporalClient.submitSync(job.getId(), attemptId, config, connectionId); return toOutputAndStatus(output); }; default -> throw new IllegalArgumentException("Does not support job type: " + temporalJobType); @@ -84,7 +90,7 @@ private OutputAndStatus toOutputAndStatus(final TemporalResponse createSyncJob(final SourceConnection source, .withState(null) .withResourceRequirements(standardSync.getResourceRequirements()); - jobPersistence.getCurrentState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState); + configPersistence2.getCurrentState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState); final JobConfig jobConfig = new JobConfig() .withConfigType(ConfigType.SYNC) diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java index 50dd07ccd110..4313844715fd 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobPersistence.java @@ -4,9 +4,7 @@ package io.airbyte.scheduler.persistence; -import static io.airbyte.db.instance.configs.jooq.Tables.SYNC_STATE; import static io.airbyte.db.instance.jobs.jooq.Tables.ATTEMPTS; -import static io.airbyte.db.instance.jobs.jooq.Tables.JOBS; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.JsonNodeType; @@ -22,7 +20,6 @@ import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobOutput; -import io.airbyte.config.State; import io.airbyte.db.Database; import io.airbyte.db.ExceptionWrappingDatabase; import io.airbyte.db.instance.jobs.JobsDatabaseSchema; @@ -60,7 +57,6 @@ import org.jooq.JSONFormat.RecordFormat; import org.jooq.Named; import org.jooq.Record; -import org.jooq.Record1; import org.jooq.Result; import org.jooq.Sequence; import org.jooq.Table; @@ -112,29 +108,23 @@ public class DefaultJobPersistence implements JobPersistence { "ORDER BY jobs.created_at DESC, jobs.id DESC, attempts.created_at ASC, attempts.id ASC "; private final ExceptionWrappingDatabase jobDatabase; - private final ExceptionWrappingDatabase configDatabase; private final Supplier timeSupplier; @VisibleForTesting DefaultJobPersistence(final Database jobDatabase, - final Database configDatabase, final Supplier timeSupplier, final int minimumAgeInDays, final int excessiveNumberOfJobs, final int minimumRecencyCount) { this.jobDatabase = new ExceptionWrappingDatabase(jobDatabase); - this.configDatabase = new ExceptionWrappingDatabase(configDatabase); this.timeSupplier = timeSupplier; JOB_HISTORY_MINIMUM_AGE_IN_DAYS = minimumAgeInDays; JOB_HISTORY_EXCESSIVE_NUMBER_OF_JOBS = excessiveNumberOfJobs; JOB_HISTORY_MINIMUM_RECENCY = minimumRecencyCount; } - /** - * @param configDatabase The config database is only needed for reading and writing sync state. - */ - public DefaultJobPersistence(final Database jobDatabase, final Database configDatabase) { - this(jobDatabase, configDatabase, Instant::now, 30, 500, 10); + public DefaultJobPersistence(final Database jobDatabase) { + this(jobDatabase, Instant::now, 30, 500, 10); } /** @@ -324,7 +314,7 @@ public Optional getAttemptTemporalWorkflowId(final long jobId, final int public void writeOutput(final long jobId, final int attemptNumber, final T output) throws IOException { final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC); writeOutputToAttemptTable(jobId, attemptNumber, output, now); - writeOutputToSyncStateTable(jobId, output, now); + // writeOutputToSyncStateTable(jobId, output, now); } private void writeOutputToAttemptTable(final long jobId, @@ -340,50 +330,6 @@ private void writeOutputToAttemptTable(final long jobId, .execute()); } - private void writeOutputToSyncStateTable(final long jobId, final T output, final OffsetDateTime now) throws IOException { - if (!(output instanceof JobOutput)) { - return; - } - final JobOutput jobOutput = (JobOutput) output; - if (jobOutput.getSync() == null) { - return; - } - - final Record1 jobConnectionId = jobDatabase.query(ctx -> ctx - .select(JOBS.SCOPE) - .from(JOBS) - .where(JOBS.ID.eq(jobId)) - .fetchAny()); - final State syncState = jobOutput.getSync().getState(); - - if (jobConnectionId == null) { - LOGGER.error("No job can be found for id {}", jobId); - return; - } - - final UUID connectionId = UUID.fromString(jobConnectionId.value1()); - configDatabase.transaction( - ctx -> { - final boolean hasExistingRecord = ctx.fetchExists(SYNC_STATE, SYNC_STATE.SYNC_ID.eq(connectionId)); - if (hasExistingRecord) { - LOGGER.info("Updating connection {} state", connectionId); - return ctx.update(SYNC_STATE) - .set(SYNC_STATE.STATE, JSONB.valueOf(Jsons.serialize(syncState))) - .set(SYNC_STATE.UPDATED_AT, now) - .where(SYNC_STATE.SYNC_ID.eq(connectionId)) - .execute(); - } else { - LOGGER.info("Inserting new state for connection {}", connectionId); - return ctx.insertInto(SYNC_STATE) - .set(SYNC_STATE.SYNC_ID, UUID.fromString(jobConnectionId.value1())) - .set(SYNC_STATE.STATE, JSONB.valueOf(Jsons.serialize(syncState))) - .set(SYNC_STATE.CREATED_AT, now) - .set(SYNC_STATE.UPDATED_AT, now) - .execute(); - } - }); - } - @Override public Job getJob(final long jobId) throws IOException { return jobDatabase.query(ctx -> getJob(ctx, jobId)); @@ -448,21 +394,6 @@ public Optional getLastReplicationJob(final UUID connectionId) throws IOExc .flatMap(r -> getJobOptional(ctx, r.get("job_id", Long.class)))); } - @Override - public Optional getCurrentState(final UUID connectionId) throws IOException { - return configDatabase.query(ctx -> { - final Record1 record = ctx.select(SYNC_STATE.STATE) - .from(SYNC_STATE) - .where(SYNC_STATE.SYNC_ID.eq(connectionId)) - .fetchAny(); - if (record == null) { - return Optional.empty(); - } - - return Optional.of(Jsons.deserialize(record.value1().data(), State.class)); - }); - } - @Override public Optional getNextJob() throws IOException { // rules: diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java index 825936ef4b41..26260753e566 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobPersistence.java @@ -7,7 +7,6 @@ import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; -import io.airbyte.config.State; import io.airbyte.db.instance.jobs.JobsDatabaseSchema; import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.models.JobStatus; @@ -147,19 +146,6 @@ public interface JobPersistence { Optional getLastReplicationJob(UUID connectionId) throws IOException; - /** - * if a job does not succeed, we assume that it synced nothing. that is the most conservative - * assumption we can make. as long as all destinations write the final data output in a - * transactional way, this will be true. if this changes, then we may end up writing duplicate data - * with our incremental append only. this is preferable to failing to send data at all. our - * incremental append only most closely resembles a deliver at least once strategy anyway. - * - * @param connectionId - id of the connection whose state we want to fetch. - * @return the current state, if any of, the connection - * @throws IOException exception due to interaction with persistence - */ - Optional getCurrentState(UUID connectionId) throws IOException; - Optional getNextJob() throws IOException; /// ARCHIVE diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java index 08c950c1cea5..43836c83336f 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobPersistenceTest.java @@ -18,7 +18,6 @@ import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import io.airbyte.commons.json.Jsons; @@ -27,10 +26,7 @@ import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobGetSpecConfig; import io.airbyte.config.JobOutput; -import io.airbyte.config.JobOutput.OutputType; import io.airbyte.config.JobSyncConfig; -import io.airbyte.config.StandardSyncOutput; -import io.airbyte.config.State; import io.airbyte.db.Database; import io.airbyte.db.instance.jobs.JobsDatabaseSchema; import io.airbyte.db.instance.test.TestDatabaseProviders; @@ -163,13 +159,12 @@ private static Job createJob( public void setup() throws Exception { final TestDatabaseProviders databaseProviders = new TestDatabaseProviders(container); jobDatabase = databaseProviders.createNewJobsDatabase(); - configDatabase = databaseProviders.createNewConfigsDatabase(); resetDb(); timeSupplier = mock(Supplier.class); when(timeSupplier.get()).thenReturn(NOW); - jobPersistence = new DefaultJobPersistence(jobDatabase, configDatabase, timeSupplier, 30, 500, 10); + jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, 30, 500, 10); } @AfterEach @@ -340,7 +335,7 @@ void testListJobsWithTimestamp() throws IOException { now.plusSeconds(14), now.plusSeconds(15), now.plusSeconds(16)); - jobPersistence = new DefaultJobPersistence(jobDatabase, configDatabase, timeSupplier, 30, 500, 10); + jobPersistence = new DefaultJobPersistence(jobDatabase, timeSupplier, 30, 500, 10); final long syncJobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow(); final int syncJobAttemptNumber0 = jobPersistence.createAttempt(syncJobId, LOG_PATH); jobPersistence.failAttempt(syncJobId, syncJobAttemptNumber0); @@ -728,107 +723,6 @@ public void testGetLastSyncJobForConnectionId() throws IOException { } - @Nested - @DisplayName("When getting current state") - class GetCurrentState { - - @Test - @DisplayName("Should take latest state (regardless of attempt status)") - void testGetCurrentStateWithMultipleAttempts() throws IOException { - // just have the time monotonically increase. - when(timeSupplier.get()).thenAnswer(a -> Instant.now()); - - // create a job - final long jobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow(); - - // fail the first attempt - jobPersistence.failAttempt(jobId, jobPersistence.createAttempt(jobId, LOG_PATH)); - assertTrue(jobPersistence.getCurrentState(UUID.fromString(SCOPE)).isEmpty()); - - // succeed the second attempt - final State state = new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", 4))); - final JobOutput jobOutput = new JobOutput().withOutputType(OutputType.SYNC).withSync(new StandardSyncOutput().withState(state)); - final int attemptId = jobPersistence.createAttempt(jobId, LOG_PATH); - jobPersistence.writeOutput(jobId, attemptId, jobOutput); - jobPersistence.succeedAttempt(jobId, attemptId); - - // verify we get that state back. - assertEquals(Optional.of(state), jobPersistence.getCurrentState(UUID.fromString(SCOPE))); - - // create a second job - final long jobId2 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow(); - - // check that when we have a failed attempt with no state, we still use old state. - jobPersistence.failAttempt(jobId2, jobPersistence.createAttempt(jobId2, LOG_PATH)); - assertEquals(Optional.of(state), jobPersistence.getCurrentState(UUID.fromString(SCOPE))); - - // check that when we have a failed attempt, if we have a state we use it. - final State state2 = new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", 5))); - final JobOutput jobOutput2 = new JobOutput().withSync(new StandardSyncOutput().withState(state2)); - final int attemptId2 = jobPersistence.createAttempt(jobId2, LOG_PATH); - jobPersistence.writeOutput(jobId2, attemptId2, jobOutput2); - jobPersistence.failAttempt(jobId2, attemptId2); - assertEquals(Optional.of(state2), jobPersistence.getCurrentState(UUID.fromString(SCOPE))); - } - - @Test - @DisplayName("Should not have state if the latest attempt did not succeeded and have state otherwise") - public void testGetCurrentStateForConnectionIdNoState() throws IOException { - // no state when the connection has never had a job. - assertEquals(Optional.empty(), jobPersistence.getCurrentState(CONNECTION_ID)); - - final long jobId = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow(); - final int attemptNumber = jobPersistence.createAttempt(jobId, LOG_PATH); - - // no state when connection has a job but it has not completed that has not completed - assertEquals(Optional.empty(), jobPersistence.getCurrentState(CONNECTION_ID)); - - jobPersistence.failJob(jobId); - - // no state when connection has a job but it is failed. - assertEquals(Optional.empty(), jobPersistence.getCurrentState(CONNECTION_ID)); - - jobPersistence.cancelJob(jobId); - - // no state when connection has a job but it is cancelled. - assertEquals(Optional.empty(), jobPersistence.getCurrentState(CONNECTION_ID)); - - final JobOutput jobOutput1 = new JobOutput() - .withSync(new StandardSyncOutput().withState(new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", "1"))))); - jobPersistence.writeOutput(jobId, attemptNumber, jobOutput1); - jobPersistence.succeedAttempt(jobId, attemptNumber); - - // job 1 state, after first success. - assertEquals(Optional.of(jobOutput1.getSync().getState()), jobPersistence.getCurrentState(CONNECTION_ID)); - - when(timeSupplier.get()).thenReturn(NOW.plusSeconds(1000)); - final long jobId2 = jobPersistence.enqueueJob(SCOPE, SYNC_JOB_CONFIG).orElseThrow(); - final int attemptNumber2 = jobPersistence.createAttempt(jobId2, LOG_PATH); - - // job 1 state, second job created. - assertEquals(Optional.of(jobOutput1.getSync().getState()), jobPersistence.getCurrentState(CONNECTION_ID)); - - jobPersistence.failJob(jobId2); - - // job 1 state, second job failed. - assertEquals(Optional.of(jobOutput1.getSync().getState()), jobPersistence.getCurrentState(CONNECTION_ID)); - - jobPersistence.cancelJob(jobId2); - - // job 1 state, second job cancelled - assertEquals(Optional.of(jobOutput1.getSync().getState()), jobPersistence.getCurrentState(CONNECTION_ID)); - - final JobOutput jobOutput2 = new JobOutput() - .withSync(new StandardSyncOutput().withState(new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", "2"))))); - jobPersistence.writeOutput(jobId2, attemptNumber2, jobOutput2); - jobPersistence.succeedAttempt(jobId2, attemptNumber2); - - // job 2 state, after second job success. - assertEquals(Optional.of(jobOutput2.getSync().getState()), jobPersistence.getCurrentState(CONNECTION_ID)); - } - - } - @Nested @DisplayName("When getting next job") class GetNextJob { @@ -1300,7 +1194,7 @@ void testPurgeJobHistory(final int numJobs, // Reconfigure constants to test various combinations of tuning knobs and make sure all work. final DefaultJobPersistence jobPersistence = - new DefaultJobPersistence(jobDatabase, configDatabase, timeSupplier, ageCutoff, tooManyJobs, recencyCutoff); + new DefaultJobPersistence(jobDatabase, timeSupplier, ageCutoff, tooManyJobs, recencyCutoff); final LocalDateTime fakeNow = LocalDateTime.of(2021, 6, 20, 0, 0); diff --git a/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java b/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java index cb1a52c61e2a..519b3cfef094 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ConfigurationApiFactory.java @@ -8,6 +8,7 @@ import io.airbyte.commons.io.FileTtlManager; import io.airbyte.config.Configs; import io.airbyte.config.persistence.ConfigPersistence; +import io.airbyte.config.persistence.ConfigPersistence2; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.db.Database; import io.airbyte.scheduler.client.CachingSynchronousSchedulerClient; @@ -23,6 +24,7 @@ public class ConfigurationApiFactory implements Factory { private static WorkflowServiceStubs temporalService; private static ConfigRepository configRepository; + private static ConfigPersistence2 configPersistence2; private static JobPersistence jobPersistence; private static ConfigPersistence seed; private static SchedulerJobClient schedulerJobClient; @@ -37,6 +39,7 @@ public class ConfigurationApiFactory implements Factory { public static void setValues( final WorkflowServiceStubs temporalService, final ConfigRepository configRepository, + final ConfigPersistence2 configPersistence2, final JobPersistence jobPersistence, final ConfigPersistence seed, final SchedulerJobClient schedulerJobClient, @@ -48,6 +51,7 @@ public static void setValues( final Database jobsDatabase, final TrackingClient trackingClient) { ConfigurationApiFactory.configRepository = configRepository; + ConfigurationApiFactory.configPersistence2 = configPersistence2; ConfigurationApiFactory.jobPersistence = jobPersistence; ConfigurationApiFactory.seed = seed; ConfigurationApiFactory.schedulerJobClient = schedulerJobClient; @@ -67,6 +71,7 @@ public ConfigurationApi provide() { return new ConfigurationApi( ConfigurationApiFactory.configRepository, + ConfigurationApiFactory.configPersistence2, ConfigurationApiFactory.jobPersistence, ConfigurationApiFactory.seed, ConfigurationApiFactory.schedulerJobClient, diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java index 242c4267714b..78e8f2417b79 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerApp.java @@ -16,6 +16,7 @@ import io.airbyte.config.StandardWorkspace; import io.airbyte.config.helpers.LogClientSingleton; import io.airbyte.config.persistence.ConfigPersistence; +import io.airbyte.config.persistence.ConfigPersistence2; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.config.persistence.DatabaseConfigPersistence; import io.airbyte.config.persistence.YamlSeedConfigPersistence; @@ -176,6 +177,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con final ConfigRepository configRepository = new ConfigRepository(configPersistence.withValidation(), secretsHydrator, secretPersistence, ephemeralSecretPersistence); + final ConfigPersistence2 configPersistence2 = new ConfigPersistence2(configDatabase); LOGGER.info("Creating Scheduler persistence..."); final Database jobDatabase = new JobsDatabaseInstance( @@ -183,7 +185,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con configs.getDatabasePassword(), configs.getDatabaseUrl()) .getAndInitialize(); - final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase, configDatabase); + final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); createDeploymentIfNoneExists(jobPersistence); @@ -209,7 +211,8 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(configs.getTemporalHost()); final TemporalClient temporalClient = TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot()); final OAuthConfigSupplier oAuthConfigSupplier = new OAuthConfigSupplier(configRepository, false, trackingClient); - final SchedulerJobClient schedulerJobClient = new DefaultSchedulerJobClient(jobPersistence, new DefaultJobCreator(jobPersistence)); + final SchedulerJobClient schedulerJobClient = + new DefaultSchedulerJobClient(jobPersistence, new DefaultJobCreator(jobPersistence, configPersistence2)); final DefaultSynchronousSchedulerClient syncSchedulerClient = new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier); final SynchronousSchedulerClient bucketSpecCacheSchedulerClient = @@ -245,6 +248,7 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con cachingSchedulerClient, temporalService, configRepository, + configPersistence2, jobPersistence, seed, configDatabase, diff --git a/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java b/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java index d58f0fce32d8..93ebb0a6b05c 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java +++ b/airbyte-server/src/main/java/io/airbyte/server/ServerFactory.java @@ -8,6 +8,7 @@ import io.airbyte.commons.io.FileTtlManager; import io.airbyte.config.Configs; import io.airbyte.config.persistence.ConfigPersistence; +import io.airbyte.config.persistence.ConfigPersistence2; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.db.Database; import io.airbyte.scheduler.client.SchedulerJobClient; @@ -25,6 +26,7 @@ ServerRunnable create(SchedulerJobClient schedulerJobClient, SpecCachingSynchronousSchedulerClient cachingSchedulerClient, WorkflowServiceStubs temporalService, ConfigRepository configRepository, + ConfigPersistence2 configPersistence2, JobPersistence jobPersistence, ConfigPersistence seed, Database configsDatabase, @@ -39,6 +41,7 @@ public ServerRunnable create(final SchedulerJobClient schedulerJobClient, final SpecCachingSynchronousSchedulerClient cachingSchedulerClient, final WorkflowServiceStubs temporalService, final ConfigRepository configRepository, + final ConfigPersistence2 configPersistence2, final JobPersistence jobPersistence, final ConfigPersistence seed, final Database configsDatabase, @@ -49,6 +52,7 @@ public ServerRunnable create(final SchedulerJobClient schedulerJobClient, ConfigurationApiFactory.setValues( temporalService, configRepository, + configPersistence2, jobPersistence, seed, schedulerJobClient, diff --git a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java index 3b2dc4f0e846..3acee99bd292 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java +++ b/airbyte-server/src/main/java/io/airbyte/server/apis/ConfigurationApi.java @@ -84,6 +84,7 @@ import io.airbyte.config.Configs; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigPersistence; +import io.airbyte.config.persistence.ConfigPersistence2; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.db.Database; import io.airbyte.scheduler.client.CachingSynchronousSchedulerClient; @@ -145,6 +146,7 @@ public class ConfigurationApi implements io.airbyte.api.V1Api { private final Configs configs; public ConfigurationApi(final ConfigRepository configRepository, + final ConfigPersistence2 configPersistence2, final JobPersistence jobPersistence, final ConfigPersistence seed, final SchedulerJobClient schedulerJobClient, @@ -164,6 +166,7 @@ public ConfigurationApi(final ConfigRepository configRepository, trackingClient); schedulerHandler = new SchedulerHandler( configRepository, + configPersistence2, schedulerJobClient, synchronousSchedulerClient, jobPersistence, diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index b9709985d165..a75beab35f06 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -37,6 +37,7 @@ import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.State; import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigPersistence2; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; @@ -68,7 +69,6 @@ public class SchedulerHandler { private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerHandler.class); - private static final UUID NO_WORKSPACE = UUID.fromString("00000000-0000-0000-0000-000000000000"); private final ConfigRepository configRepository; private final SchedulerJobClient schedulerJobClient; @@ -80,8 +80,10 @@ public class SchedulerHandler { private final JobNotifier jobNotifier; private final WorkflowServiceStubs temporalService; private final OAuthConfigSupplier oAuthConfigSupplier; + private final ConfigPersistence2 configPersistence2; public SchedulerHandler(final ConfigRepository configRepository, + final ConfigPersistence2 configPersistence2, final SchedulerJobClient schedulerJobClient, final SynchronousSchedulerClient synchronousSchedulerClient, final JobPersistence jobPersistence, @@ -90,6 +92,7 @@ public SchedulerHandler(final ConfigRepository configRepository, final OAuthConfigSupplier oAuthConfigSupplier) { this( configRepository, + configPersistence2, schedulerJobClient, synchronousSchedulerClient, new ConfigurationUpdate(configRepository, new SpecFetcher(synchronousSchedulerClient)), @@ -103,6 +106,7 @@ public SchedulerHandler(final ConfigRepository configRepository, @VisibleForTesting SchedulerHandler(final ConfigRepository configRepository, + final ConfigPersistence2 configPersistence2, final SchedulerJobClient schedulerJobClient, final SynchronousSchedulerClient synchronousSchedulerClient, final ConfigurationUpdate configurationUpdate, @@ -113,6 +117,7 @@ public SchedulerHandler(final ConfigRepository configRepository, final WorkflowServiceStubs temporalService, final OAuthConfigSupplier oAuthConfigSupplier) { this.configRepository = configRepository; + this.configPersistence2 = configPersistence2; this.schedulerJobClient = schedulerJobClient; this.synchronousSchedulerClient = synchronousSchedulerClient; this.configurationUpdate = configurationUpdate; @@ -354,7 +359,7 @@ public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdReq } public ConnectionState getState(final ConnectionIdRequestBody connectionIdRequestBody) throws IOException { - final Optional currentState = jobPersistence.getCurrentState(connectionIdRequestBody.getConnectionId()); + final Optional currentState = configPersistence2.getCurrentState(connectionIdRequestBody.getConnectionId()); LOGGER.info("currentState server: {}", currentState); final ConnectionState connectionState = new ConnectionState() @@ -365,6 +370,7 @@ public ConnectionState getState(final ConnectionIdRequestBody connectionIdReques return connectionState; } + // todo (cgardens) - this method needs a test. public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOException { final long jobId = jobIdRequestBody.getId(); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java index e387da384786..5784c58bbbfd 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/ArchiveHandlerTest.java @@ -105,7 +105,7 @@ public static void dbDown() { public void setup() throws Exception { jobDatabase = new JobsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); configDatabase = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); - jobPersistence = new DefaultJobPersistence(jobDatabase, configDatabase); + jobPersistence = new DefaultJobPersistence(jobDatabase); seedPersistence = YamlSeedConfigPersistence.getDefault(); configPersistence = new DatabaseConfigPersistence(jobDatabase); configPersistence.replaceAllConfigs(Collections.emptyMap(), false); diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 85c1b0b76ba5..ccfcf7c6e82f 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -49,6 +49,7 @@ import io.airbyte.config.StandardSyncOperation.OperatorType; import io.airbyte.config.State; import io.airbyte.config.persistence.ConfigNotFoundException; +import io.airbyte.config.persistence.ConfigPersistence2; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.CatalogHelpers; @@ -117,6 +118,7 @@ class SchedulerHandlerTest { private SchedulerHandler schedulerHandler; private ConfigRepository configRepository; + private ConfigPersistence2 configPersistence2; private Job completedJob; private SchedulerJobClient schedulerJobClient; private SynchronousSchedulerClient synchronousSchedulerClient; @@ -140,11 +142,13 @@ void setup() { schedulerJobClient = spy(SchedulerJobClient.class); synchronousSchedulerClient = mock(SynchronousSchedulerClient.class); configRepository = mock(ConfigRepository.class); + configPersistence2 = mock(ConfigPersistence2.class); jobPersistence = mock(JobPersistence.class); final JobNotifier jobNotifier = mock(JobNotifier.class); schedulerHandler = new SchedulerHandler( configRepository, + configPersistence2, schedulerJobClient, synchronousSchedulerClient, configurationUpdate, @@ -557,7 +561,7 @@ void testResetConnection() throws JsonValidationException, IOException, ConfigNo void testGetCurrentState() throws IOException { final UUID connectionId = UUID.randomUUID(); final State state = new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", 1))); - when(jobPersistence.getCurrentState(connectionId)).thenReturn(Optional.of(state)); + when(configPersistence2.getCurrentState(connectionId)).thenReturn(Optional.of(state)); final ConnectionState connectionState = schedulerHandler.getState(new ConnectionIdRequestBody().connectionId(connectionId)); assertEquals(new ConnectionState().connectionId(connectionId).state(state.getState()), connectionState); @@ -566,7 +570,7 @@ void testGetCurrentState() throws IOException { @Test void testGetCurrentStateEmpty() throws IOException { final UUID connectionId = UUID.randomUUID(); - when(jobPersistence.getCurrentState(connectionId)).thenReturn(Optional.empty()); + when(configPersistence2.getCurrentState(connectionId)).thenReturn(Optional.empty()); final ConnectionState connectionState = schedulerHandler.getState(new ConnectionIdRequestBody().connectionId(connectionId)); assertEquals(new ConnectionState().connectionId(connectionId), connectionState); diff --git a/airbyte-server/src/test/java/io/airbyte/server/migration/DatabaseArchiverTest.java b/airbyte-server/src/test/java/io/airbyte/server/migration/DatabaseArchiverTest.java index 864fb7da8795..f9f4d060301a 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/migration/DatabaseArchiverTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/migration/DatabaseArchiverTest.java @@ -43,7 +43,7 @@ void setUp() throws IOException { jobDatabase = new JobsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); configDatabase = new ConfigsDatabaseInstance(container.getUsername(), container.getPassword(), container.getJdbcUrl()).getAndInitialize(); - final JobPersistence persistence = new DefaultJobPersistence(jobDatabase, configDatabase); + final JobPersistence persistence = new DefaultJobPersistence(jobDatabase); databaseArchiver = new DatabaseArchiver(persistence); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java b/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java index 97a4e010fa3a..2eb6aa7dc951 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/migration/RunMigrationTest.java @@ -363,7 +363,7 @@ private void runMigration(final ConfigPersistence configPersistence, final JobPe @SuppressWarnings("SameParameterValue") private JobPersistence getJobPersistence(final File file, final String version) throws IOException { - final DefaultJobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase, configDatabase); + final DefaultJobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); final Path tempFolder = Files.createTempDirectory(Path.of("/tmp"), "db_init"); resourceToBeCleanedUp.add(tempFolder.toFile()); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java index c7cf3c100ec5..307eddd162ba 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java @@ -9,8 +9,11 @@ import io.airbyte.config.EnvConfigs; import io.airbyte.config.MaxWorkersConfig; import io.airbyte.config.helpers.LogClientSingleton; +import io.airbyte.config.persistence.ConfigPersistence2; import io.airbyte.config.persistence.split_secrets.SecretPersistence; import io.airbyte.config.persistence.split_secrets.SecretsHydrator; +import io.airbyte.db.Database; +import io.airbyte.db.instance.configs.ConfigsDatabaseInstance; import io.airbyte.workers.process.DockerProcessFactory; import io.airbyte.workers.process.KubeProcessFactory; import io.airbyte.workers.process.ProcessFactory; @@ -50,7 +53,7 @@ public class WorkerApp { private final WorkflowServiceStubs temporalService; private final MaxWorkersConfig maxWorkers; private final WorkerEnvironment workerEnvironment; - private final String airbyteVersion; + private final ConfigPersistence2 configPersistence2; public WorkerApp(final Path workspaceRoot, final ProcessFactory processFactory, @@ -58,14 +61,14 @@ public WorkerApp(final Path workspaceRoot, final WorkflowServiceStubs temporalService, final MaxWorkersConfig maxWorkers, final WorkerEnvironment workerEnvironment, - final String airbyteVersion) { + final ConfigPersistence2 configPersistence2) { this.workspaceRoot = workspaceRoot; this.processFactory = processFactory; this.secretsHydrator = secretsHydrator; this.temporalService = temporalService; this.maxWorkers = maxWorkers; this.workerEnvironment = workerEnvironment; - this.airbyteVersion = airbyteVersion; + this.configPersistence2 = configPersistence2; } public void start() { @@ -101,8 +104,9 @@ public void start() { syncWorker.registerWorkflowImplementationTypes(SyncWorkflow.WorkflowImpl.class); syncWorker.registerActivitiesImplementations( new SyncWorkflow.ReplicationActivityImpl(processFactory, secretsHydrator, workspaceRoot), - new SyncWorkflow.NormalizationActivityImpl(processFactory, secretsHydrator, workspaceRoot, workerEnvironment, airbyteVersion), - new SyncWorkflow.DbtTransformationActivityImpl(processFactory, secretsHydrator, workspaceRoot, airbyteVersion)); + new SyncWorkflow.NormalizationActivityImpl(processFactory, secretsHydrator, workspaceRoot, workerEnvironment), + new SyncWorkflow.DbtTransformationActivityImpl(processFactory, secretsHydrator, workspaceRoot), + new SyncWorkflow.PersistStateActivityImpl(workspaceRoot, configPersistence2)); factory.start(); } @@ -146,6 +150,14 @@ public static void main(final String[] args) throws IOException, InterruptedExce final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost); + // todo (cgardens) - make sure appropriate env variables are passed to this container. + final Database configDatabase = new ConfigsDatabaseInstance( + configs.getConfigDatabaseUser(), + configs.getConfigDatabasePassword(), + configs.getConfigDatabaseUrl()) + .getInitialized(); + final ConfigPersistence2 configPersistence2 = new ConfigPersistence2(configDatabase); + new WorkerApp( workspaceRoot, processFactory, @@ -153,7 +165,7 @@ public static void main(final String[] args) throws IOException, InterruptedExce temporalService, configs.getMaxWorkers(), configs.getWorkerEnvironment(), - configs.getAirbyteVersion()).start(); + configPersistence2).start(); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java index 496c6d83c758..b3725e978e96 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/SyncWorkflow.java @@ -20,6 +20,8 @@ import io.airbyte.config.StandardSyncOperation.OperatorType; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; +import io.airbyte.config.State; +import io.airbyte.config.persistence.ConfigPersistence2; import io.airbyte.config.persistence.split_secrets.SecretsHydrator; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; @@ -43,11 +45,14 @@ import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; import io.temporal.activity.ActivityOptions; +import io.temporal.common.RetryOptions; import io.temporal.workflow.Workflow; import io.temporal.workflow.WorkflowInterface; import io.temporal.workflow.WorkflowMethod; +import java.io.IOException; import java.nio.file.Path; import java.time.Duration; +import java.util.UUID; import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,7 +64,8 @@ public interface SyncWorkflow { StandardSyncOutput run(JobRunConfig jobRunConfig, IntegrationLauncherConfig sourceLauncherConfig, IntegrationLauncherConfig destinationLauncherConfig, - StandardSyncInput syncInput); + StandardSyncInput syncInput, + UUID connectionId); class WorkflowImpl implements SyncWorkflow { @@ -73,15 +79,23 @@ class WorkflowImpl implements SyncWorkflow { .setRetryOptions(TemporalUtils.NO_RETRY) .build(); + private static final ActivityOptions persistOptions = options.toBuilder() + .setRetryOptions(RetryOptions.newBuilder() + .setMaximumAttempts(10) + .build()) + .build(); + private final ReplicationActivity replicationActivity = Workflow.newActivityStub(ReplicationActivity.class, options); private final NormalizationActivity normalizationActivity = Workflow.newActivityStub(NormalizationActivity.class, options); private final DbtTransformationActivity dbtTransformationActivity = Workflow.newActivityStub(DbtTransformationActivity.class, options); + private final PersistStateActivity persistActivity = Workflow.newActivityStub(PersistStateActivity.class, persistOptions); @Override public StandardSyncOutput run(final JobRunConfig jobRunConfig, final IntegrationLauncherConfig sourceLauncherConfig, final IntegrationLauncherConfig destinationLauncherConfig, - final StandardSyncInput syncInput) { + final StandardSyncInput syncInput, + final UUID connectionId) { final StandardSyncOutput run = replicationActivity.replicate(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput); if (syncInput.getOperationSequence() != null && !syncInput.getOperationSequence().isEmpty()) { @@ -104,6 +118,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, LOGGER.error(message); throw new IllegalArgumentException(message); } + persistActivity.persist(connectionId, run); } } @@ -255,14 +270,12 @@ class NormalizationActivityImpl implements NormalizationActivity { private final Path workspaceRoot; private final AirbyteConfigValidator validator; private final WorkerEnvironment workerEnvironment; - private final String airbyteVersion; public NormalizationActivityImpl(final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, final Path workspaceRoot, - final WorkerEnvironment workerEnvironment, - final String airbyteVersion) { - this(processFactory, secretsHydrator, workspaceRoot, new AirbyteConfigValidator(), workerEnvironment, airbyteVersion); + final WorkerEnvironment workerEnvironment) { + this(processFactory, secretsHydrator, workspaceRoot, new AirbyteConfigValidator(), workerEnvironment); } @VisibleForTesting @@ -270,14 +283,12 @@ public NormalizationActivityImpl(final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, final Path workspaceRoot, final AirbyteConfigValidator validator, - final WorkerEnvironment workerEnvironment, - final String airbyteVersion) { + final WorkerEnvironment workerEnvironment) { this.processFactory = processFactory; this.secretsHydrator = secretsHydrator; this.workspaceRoot = workspaceRoot; this.validator = validator; this.workerEnvironment = workerEnvironment; - this.airbyteVersion = airbyteVersion; } @Override @@ -335,27 +346,23 @@ class DbtTransformationActivityImpl implements DbtTransformationActivity { private final SecretsHydrator secretsHydrator; private final Path workspaceRoot; private final AirbyteConfigValidator validator; - private final String airbyteVersion; public DbtTransformationActivityImpl( final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, - final Path workspaceRoot, - final String airbyteVersion) { - this(processFactory, secretsHydrator, workspaceRoot, new AirbyteConfigValidator(), airbyteVersion); + final Path workspaceRoot) { + this(processFactory, secretsHydrator, workspaceRoot, new AirbyteConfigValidator()); } @VisibleForTesting DbtTransformationActivityImpl(final ProcessFactory processFactory, final SecretsHydrator secretsHydrator, final Path workspaceRoot, - final AirbyteConfigValidator validator, - final String airbyteVersion) { + final AirbyteConfigValidator validator) { this.processFactory = processFactory; this.secretsHydrator = secretsHydrator; this.workspaceRoot = workspaceRoot; this.validator = validator; - this.airbyteVersion = airbyteVersion; } @Override @@ -397,4 +404,40 @@ private CheckedSupplier, Exception> getWorkerFact } + @ActivityInterface + interface PersistStateActivity { + + @ActivityMethod + boolean persist(final UUID connectionId, final StandardSyncOutput syncOutput); + + } + + class PersistStateActivityImpl implements PersistStateActivity { + + private static final Logger LOGGER = LoggerFactory.getLogger(PersistStateActivityImpl.class); + private final Path workspaceRoot; + private final ConfigPersistence2 configPersistence2; + + public PersistStateActivityImpl(final Path workspaceRoot, final ConfigPersistence2 configPersistence2) { + this.workspaceRoot = workspaceRoot; + this.configPersistence2 = configPersistence2; + } + + @Override + public boolean persist(final UUID connectionId, final StandardSyncOutput syncOutput) { + final State state = syncOutput.getState(); + if (state != null) { + try { + configPersistence2.updateSyncState(connectionId, state); + } catch (final IOException e) { + throw new RuntimeException(e); + } + return true; + } else { + return false; + } + } + + } + } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java index 3580852ee6dd..4c1dcd9380d8 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalAttemptExecution.java @@ -137,7 +137,7 @@ private void saveWorkflowIdForCancellation() throws IOException { configs.getConfigDatabasePassword(), configs.getConfigDatabaseUrl()) .getInitialized(); - final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase, configDatabase); + final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); final String workflowId = workflowIdProvider.get(); jobPersistence.setAttemptTemporalWorkflowId(Long.parseLong(jobRunConfig.getJobId()), jobRunConfig.getAttemptId().intValue(), workflowId); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index 94510d187799..d3139315a766 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -78,7 +78,7 @@ public TemporalResponse submitDiscoverSchema(final UUID jobId, f () -> getWorkflowStub(DiscoverCatalogWorkflow.class, TemporalJobType.DISCOVER_SCHEMA).run(jobRunConfig, launcherConfig, input)); } - public TemporalResponse submitSync(final long jobId, final int attempt, final JobSyncConfig config) { + public TemporalResponse submitSync(final long jobId, final int attempt, final JobSyncConfig config, final UUID connectionId) { final JobRunConfig jobRunConfig = TemporalUtils.createJobRunConfig(jobId, attempt); final IntegrationLauncherConfig sourceLauncherConfig = new IntegrationLauncherConfig() @@ -107,7 +107,8 @@ public TemporalResponse submitSync(final long jobId, final i jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, - input)); + input, + connectionId)); } private T getWorkflowStub(final Class workflowClass, final TemporalJobType jobType) {