Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

suggestion: save state from temporal temporal #7253

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.persistence;

import static io.airbyte.db.instance.configs.jooq.Tables.SYNC_STATE;

import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.State;
import io.airbyte.db.Database;
import io.airbyte.db.ExceptionWrappingDatabase;
import java.io.IOException;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Optional;
import java.util.UUID;
import java.util.function.Supplier;
import org.jooq.JSONB;
import org.jooq.Record1;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConfigPersistence2 {

private static final Logger LOGGER = LoggerFactory.getLogger(ConfigPersistence2.class);

private final ExceptionWrappingDatabase configDatabase;
private final Supplier<Instant> timeSupplier;

public ConfigPersistence2(final Database configDatabase) {
this(configDatabase, Instant::now);
}

@VisibleForTesting
ConfigPersistence2(final Database configDatabase, final Supplier<Instant> timeSupplier) {
this.configDatabase = new ExceptionWrappingDatabase(configDatabase);

this.timeSupplier = timeSupplier;
}

public Optional<State> getCurrentState(final UUID connectionId) throws IOException {
return configDatabase.query(ctx -> {
final Record1<JSONB> record = ctx.select(SYNC_STATE.STATE)
.from(SYNC_STATE)
.where(SYNC_STATE.SYNC_ID.eq(connectionId))
.fetchAny();
if (record == null) {
return Optional.empty();
}

return Optional.of(Jsons.deserialize(record.value1().data(), State.class));
});
}

public void updateSyncState(final UUID connectionId, final State state) throws IOException {
final OffsetDateTime now = OffsetDateTime.ofInstant(timeSupplier.get(), ZoneOffset.UTC);

configDatabase.transaction(
ctx -> {
final boolean hasExistingRecord = ctx.fetchExists(SYNC_STATE, SYNC_STATE.SYNC_ID.eq(connectionId));
if (hasExistingRecord) {
LOGGER.info("Updating connection {} state", connectionId);
return ctx.update(SYNC_STATE)
.set(SYNC_STATE.STATE, JSONB.valueOf(Jsons.serialize(state)))
.set(SYNC_STATE.UPDATED_AT, now)
.where(SYNC_STATE.SYNC_ID.eq(connectionId))
.execute();
} else {
LOGGER.info("Inserting new state for connection {}", connectionId);
return ctx.insertInto(SYNC_STATE)
.set(SYNC_STATE.SYNC_ID, connectionId)
.set(SYNC_STATE.STATE, JSONB.valueOf(Jsons.serialize(state)))
.set(SYNC_STATE.CREATED_AT, now)
.set(SYNC_STATE.UPDATED_AT, now)
.execute();
}
});
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.config.persistence;

class ConfigPersistence2Test {}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigPersistence2;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.persistence.DefaultJobCreator;
Expand Down Expand Up @@ -47,14 +48,15 @@ public class JobScheduler implements Runnable {
}

public JobScheduler(final JobPersistence jobPersistence,
final ConfigPersistence2 configPersistence2,
final ConfigRepository configRepository,
final TrackingClient trackingClient) {
this(
jobPersistence,
configRepository,
new ScheduleJobPredicate(Instant::now),
new DefaultSyncJobFactory(
new DefaultJobCreator(jobPersistence),
new DefaultJobCreator(jobPersistence, configPersistence2),
configRepository,
new OAuthConfigSupplier(configRepository, false, trackingClient)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigPersistence2;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
Expand Down Expand Up @@ -75,6 +76,7 @@ public class SchedulerApp {
private final Path workspaceRoot;
private final JobPersistence jobPersistence;
private final ConfigRepository configRepository;
private final ConfigPersistence2 configPersistence2;
private final JobCleaner jobCleaner;
private final JobNotifier jobNotifier;
private final TemporalClient temporalClient;
Expand All @@ -85,6 +87,7 @@ public class SchedulerApp {
public SchedulerApp(final Path workspaceRoot,
final JobPersistence jobPersistence,
final ConfigRepository configRepository,
final ConfigPersistence2 configPersistence2,
final JobCleaner jobCleaner,
final JobNotifier jobNotifier,
final TemporalClient temporalClient,
Expand All @@ -94,6 +97,7 @@ public SchedulerApp(final Path workspaceRoot,
this.workspaceRoot = workspaceRoot;
this.jobPersistence = jobPersistence;
this.configRepository = configRepository;
this.configPersistence2 = configPersistence2;
this.jobCleaner = jobCleaner;
this.jobNotifier = jobNotifier;
this.temporalClient = temporalClient;
Expand All @@ -110,7 +114,7 @@ public void start() throws IOException {
final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot, airbyteVersionOrWarnings);
final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier, maxSyncJobAttempts);
final TrackingClient trackingClient = TrackingClientSingleton.get();
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository, trackingClient);
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configPersistence2, configRepository, trackingClient);
final JobSubmitter jobSubmitter = new JobSubmitter(
workerThreadPool,
jobPersistence,
Expand Down Expand Up @@ -209,18 +213,19 @@ public static void main(final String[] args) throws IOException, InterruptedExce
configs.getConfigDatabasePassword(),
configs.getConfigDatabaseUrl())
.getInitialized();
final ConfigPersistence2 configPersistence2 = new ConfigPersistence2(configDatabase);
final ConfigPersistence configPersistence = new DatabaseConfigPersistence(configDatabase).withValidation();
final Optional<SecretPersistence> secretPersistence = SecretPersistence.getLongLived(configs);
final Optional<SecretPersistence> ephemeralSecretPersistence = SecretPersistence.getEphemeral(configs);
final SecretsHydrator secretsHydrator = SecretPersistence.getSecretsHydrator(configs);
final ConfigRepository configRepository = new ConfigRepository(configPersistence, secretsHydrator, secretPersistence, ephemeralSecretPersistence);

final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase, configDatabase);
final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase);
final JobCleaner jobCleaner = new JobCleaner(
configs.getWorkspaceRetentionConfig(),
workspaceRoot,
jobPersistence);
AirbyteVersion.assertIsCompatible(configs.getAirbyteVersion(), jobPersistence.getVersion().get());
AirbyteVersion.assertIsCompatible(configs.getAirbyteVersion(), jobPersistence.getVersion().orElseThrow());

TrackingClientSingleton.initialize(
configs.getTrackingStrategy(),
Expand All @@ -239,8 +244,17 @@ public static void main(final String[] args) throws IOException, InterruptedExce
MetricSingleton.initializeMonitoringServiceDaemon("8082", mdc, configs.getPublishMetrics());

LOGGER.info("Launching scheduler...");
new SchedulerApp(workspaceRoot, jobPersistence, configRepository, jobCleaner, jobNotifier, temporalClient,
Integer.parseInt(configs.getSubmitterNumThreads()), configs.getMaxSyncJobAttempts(), configs.getAirbyteVersionOrWarning())
new SchedulerApp(
workspaceRoot,
jobPersistence,
configRepository,
configPersistence2,
jobCleaner,
jobNotifier,
temporalClient,
Integer.parseInt(configs.getSubmitterNumThreads()),
configs.getMaxSyncJobAttempts(),
configs.getAirbyteVersionOrWarning())
.start();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.airbyte.workers.temporal.TemporalJobType;
import io.airbyte.workers.temporal.TemporalResponse;
import java.nio.file.Path;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,9 +45,14 @@ public WorkerRun create(final Job job) {

public CheckedSupplier<OutputAndStatus<JobOutput>, Exception> createSupplier(final Job job, final int attemptId) {
final TemporalJobType temporalJobType = toTemporalJobType(job.getConfigType());
final UUID connectionId = UUID.fromString(job.getScope());
return switch (job.getConfigType()) {
case SYNC -> () -> {
final TemporalResponse<StandardSyncOutput> output = temporalClient.submitSync(job.getId(), attemptId, job.getConfig().getSync());
final TemporalResponse<StandardSyncOutput> output = temporalClient.submitSync(
job.getId(),
attemptId,
job.getConfig().getSync(),
connectionId);
return toOutputAndStatus(output);
};
case RESET_CONNECTION -> () -> {
Expand All @@ -63,7 +69,7 @@ public CheckedSupplier<OutputAndStatus<JobOutput>, Exception> createSupplier(fin
.withOperationSequence(resetConnection.getOperationSequence())
.withResourceRequirements(resetConnection.getResourceRequirements());

final TemporalResponse<StandardSyncOutput> output = temporalClient.submitSync(job.getId(), attemptId, config);
final TemporalResponse<StandardSyncOutput> output = temporalClient.submitSync(job.getId(), attemptId, config, connectionId);
return toOutputAndStatus(output);
};
default -> throw new IllegalArgumentException("Does not support job type: " + temporalJobType);
Expand All @@ -84,7 +90,7 @@ private OutputAndStatus<JobOutput> toOutputAndStatus(final TemporalResponse<Stan
if (!response.isSuccess()) {
status = JobStatus.FAILED;
} else {
final ReplicationStatus replicationStatus = response.getOutput().get().getStandardSyncSummary().getStatus();
final ReplicationStatus replicationStatus = response.getOutput().orElseThrow().getStandardSyncSummary().getStatus();
if (replicationStatus == ReplicationStatus.FAILED || replicationStatus == ReplicationStatus.CANCELLED) {
status = JobStatus.FAILED;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.persistence.ConfigPersistence2;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.SyncMode;
Expand All @@ -22,9 +23,11 @@
public class DefaultJobCreator implements JobCreator {

private final JobPersistence jobPersistence;
private final ConfigPersistence2 configPersistence2;

public DefaultJobCreator(final JobPersistence jobPersistence) {
public DefaultJobCreator(final JobPersistence jobPersistence, final ConfigPersistence2 configPersistence2) {
this.jobPersistence = jobPersistence;
this.configPersistence2 = configPersistence2;
}

@Override
Expand All @@ -49,7 +52,7 @@ public Optional<Long> createSyncJob(final SourceConnection source,
.withState(null)
.withResourceRequirements(standardSync.getResourceRequirements());

jobPersistence.getCurrentState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState);
configPersistence2.getCurrentState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState);

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.SYNC)
Expand Down
Loading