Skip to content

Commit

Permalink
Save state in temporal (#7253)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Oct 24, 2021
1 parent 7b65523 commit bba689a
Show file tree
Hide file tree
Showing 22 changed files with 248 additions and 240 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.persistence;

import static io.airbyte.db.instance.configs.jooq.Tables.SYNC_STATE;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.State;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import java.io.IOException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Supplier;
import org.jooq.JSONB;
import org.jooq.Record1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConfigPersistence2 {

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigPersistence2.class);

private final ExceptionWrappingDatabase configDatabase;
private final Supplier<Instant> timeSupplier;

public ConfigPersistence2(final Database configDatabase) {
this(configDatabase, Instant::now);
}

@VisibleForTesting
ConfigPersistence2(final Database configDatabase, final Supplier<Instant> timeSupplier) {
this.configDatabase = new ExceptionWrappingDatabase(configDatabase);

this.timeSupplier = timeSupplier;
}

public Optional<State> getCurrentState(final UUID connectionId) throws IOException {
return configDatabase.query(ctx -> {
final Record1<JSONB> record = ctx.select(SYNC_STATE.STATE)
.from(SYNC_STATE)
.where(SYNC_STATE.SYNC_ID.eq(connectionId))
.fetchAny();
if (record == null) {
return Optional.empty();
}

return Optional.of(Jsons.deserialize(record.value1().data(), State.class));
});
}

public void updateSyncState(final UUID connectionId, final State state) throws IOException {
final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);

configDatabase.transaction(
ctx -> {
final boolean hasExistingRecord = ctx.fetchExists(SYNC_STATE, SYNC_STATE.SYNC_ID.eq(connectionId));
if (hasExistingRecord) {
LOGGER.info("Updating connection {} state", connectionId);
return ctx.update(SYNC_STATE)
.set(SYNC_STATE.STATE, JSONB.valueOf(Jsons.serialize(state)))
.set(SYNC_STATE.UPDATED_AT, now)
.where(SYNC_STATE.SYNC_ID.eq(connectionId))
.execute();
} else {
LOGGER.info("Inserting new state for connection {}", connectionId);
return ctx.insertInto(SYNC_STATE)
.set(SYNC_STATE.SYNC_ID, connectionId)
.set(SYNC_STATE.STATE, JSONB.valueOf(Jsons.serialize(state)))
.set(SYNC_STATE.CREATED_AT, now)
.set(SYNC_STATE.UPDATED_AT, now)
.execute();
}
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.persistence;

class ConfigPersistence2Test {}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigPersistence2;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.persistence.DefaultJobCreator;
Expand Down Expand Up @@ -47,14 +48,15 @@ public class JobScheduler implements Runnable {
}

public JobScheduler(final JobPersistence jobPersistence,
final ConfigPersistence2 configPersistence2,
final ConfigRepository configRepository,
final TrackingClient trackingClient) {
this(
jobPersistence,
configRepository,
new ScheduleJobPredicate(Instant::now),
new DefaultSyncJobFactory(
new DefaultJobCreator(jobPersistence),
new DefaultJobCreator(jobPersistence, configPersistence2),
configRepository,
new OAuthConfigSupplier(configRepository, false, trackingClient)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigPersistence2;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
Expand Down Expand Up @@ -75,6 +76,7 @@ public class SchedulerApp {
private final Path workspaceRoot;
private final JobPersistence jobPersistence;
private final ConfigRepository configRepository;
private final ConfigPersistence2 configPersistence2;
private final JobCleaner jobCleaner;
private final JobNotifier jobNotifier;
private final TemporalClient temporalClient;
Expand All @@ -85,6 +87,7 @@ public class SchedulerApp {
public SchedulerApp(final Path workspaceRoot,
final JobPersistence jobPersistence,
final ConfigRepository configRepository,
final ConfigPersistence2 configPersistence2,
final JobCleaner jobCleaner,
final JobNotifier jobNotifier,
final TemporalClient temporalClient,
Expand All @@ -94,6 +97,7 @@ public SchedulerApp(final Path workspaceRoot,
this.workspaceRoot = workspaceRoot;
this.jobPersistence = jobPersistence;
this.configRepository = configRepository;
this.configPersistence2 = configPersistence2;
this.jobCleaner = jobCleaner;
this.jobNotifier = jobNotifier;
this.temporalClient = temporalClient;
Expand All @@ -110,7 +114,7 @@ public void start() throws IOException {
final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot, airbyteVersionOrWarnings);
final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier, maxSyncJobAttempts);
final TrackingClient trackingClient = TrackingClientSingleton.get();
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository, trackingClient);
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configPersistence2, configRepository, trackingClient);
final JobSubmitter jobSubmitter = new JobSubmitter(
workerThreadPool,
jobPersistence,
Expand Down Expand Up @@ -209,18 +213,19 @@ public static void main(final String[] args) throws IOException, InterruptedExce
configs.getConfigDatabasePassword(),
configs.getConfigDatabaseUrl())
.getInitialized();
final ConfigPersistence2 configPersistence2 = new ConfigPersistence2(configDatabase);
final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase).withValidation();
final Optional<SecretPersistence> secretPersistence = SecretPersistence.getLongLived(configs);
final Optional<SecretPersistence> ephemeralSecretPersistence = SecretPersistence.getEphemeral(configs);
final SecretsHydrator secretsHydrator = SecretPersistence.getSecretsHydrator(configs);
final ConfigRepository configRepository = new ConfigRepository(configPersistence, secretsHydrator, secretPersistence, ephemeralSecretPersistence);

final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase, configDatabase);
final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase);
final JobCleaner jobCleaner = new JobCleaner(
configs.getWorkspaceRetentionConfig(),
workspaceRoot,
jobPersistence);
AirbyteVersion.assertIsCompatible(configs.getAirbyteVersion(), jobPersistence.getVersion().get());
AirbyteVersion.assertIsCompatible(configs.getAirbyteVersion(), jobPersistence.getVersion().orElseThrow());

TrackingClientSingleton.initialize(
configs.getTrackingStrategy(),
Expand All @@ -239,8 +244,17 @@ public static void main(final String[] args) throws IOException, InterruptedExce
MetricSingleton.initializeMonitoringServiceDaemon("8082", mdc, configs.getPublishMetrics());

LOGGER.info("Launching scheduler...");
new SchedulerApp(workspaceRoot, jobPersistence, configRepository, jobCleaner, jobNotifier, temporalClient,
Integer.parseInt(configs.getSubmitterNumThreads()), configs.getMaxSyncJobAttempts(), configs.getAirbyteVersionOrWarning())
new SchedulerApp(
workspaceRoot,
jobPersistence,
configRepository,
configPersistence2,
jobCleaner,
jobNotifier,
temporalClient,
Integer.parseInt(configs.getSubmitterNumThreads()),
configs.getMaxSyncJobAttempts(),
configs.getAirbyteVersionOrWarning())
.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.workers.temporal.TemporalJobType;
import io.airbyte.workers.temporal.TemporalResponse;
import java.nio.file.Path;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,9 +45,14 @@ public WorkerRun create(final Job job) {

public CheckedSupplier<OutputAndStatus<JobOutput>, Exception> createSupplier(final Job job, final int attemptId) {
final TemporalJobType temporalJobType = toTemporalJobType(job.getConfigType());
final UUID connectionId = UUID.fromString(job.getScope());
return switch (job.getConfigType()) {
case SYNC -> () -> {
final TemporalResponse<StandardSyncOutput> output = temporalClient.submitSync(job.getId(), attemptId, job.getConfig().getSync());
final TemporalResponse<StandardSyncOutput> output = temporalClient.submitSync(
job.getId(),
attemptId,
job.getConfig().getSync(),
connectionId);
return toOutputAndStatus(output);
};
case RESET_CONNECTION -> () -> {
Expand All @@ -63,7 +69,7 @@ public CheckedSupplier<OutputAndStatus<JobOutput>, Exception> createSupplier(fin
.withOperationSequence(resetConnection.getOperationSequence())
.withResourceRequirements(resetConnection.getResourceRequirements());

final TemporalResponse<StandardSyncOutput> output = temporalClient.submitSync(job.getId(), attemptId, config);
final TemporalResponse<StandardSyncOutput> output = temporalClient.submitSync(job.getId(), attemptId, config, connectionId);
return toOutputAndStatus(output);
};
default -> throw new IllegalArgumentException("Does not support job type: " + temporalJobType);
Expand All @@ -84,7 +90,7 @@ private OutputAndStatus<JobOutput> toOutputAndStatus(final TemporalResponse<Stan
if (!response.isSuccess()) {
status = JobStatus.FAILED;
} else {
final ReplicationStatus replicationStatus = response.getOutput().get().getStandardSyncSummary().getStatus();
final ReplicationStatus replicationStatus = response.getOutput().orElseThrow().getStandardSyncSummary().getStatus();
if (replicationStatus == ReplicationStatus.FAILED || replicationStatus == ReplicationStatus.CANCELLED) {
status = JobStatus.FAILED;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.persistence.ConfigPersistence2;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.SyncMode;
Expand All @@ -22,9 +23,11 @@
public class DefaultJobCreator implements JobCreator {

private final JobPersistence jobPersistence;
private final ConfigPersistence2 configPersistence2;

public DefaultJobCreator(final JobPersistence jobPersistence) {
public DefaultJobCreator(final JobPersistence jobPersistence, final ConfigPersistence2 configPersistence2) {
this.jobPersistence = jobPersistence;
this.configPersistence2 = configPersistence2;
}

@Override
Expand All @@ -49,7 +52,7 @@ public Optional<Long> createSyncJob(final SourceConnection source,
.withState(null)
.withResourceRequirements(standardSync.getResourceRequirements());

jobPersistence.getCurrentState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState);
configPersistence2.getCurrentState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState);

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.SYNC)
Expand Down
Loading

0 comments on commit bba689a

Please sign in to comment.