Skip to content

Commit

Permalink
Repair temporal state when performing manual actions (#12289)
Browse files Browse the repository at this point in the history
* Repair temporal state when performing manual actions

* refactor temporal client and fix tests

* add unreachable workflow exception

* format

* test repeated deletion

* add acceptance tests for automatic workflow repair

* rename and DRY up manual operation methods in SchedulerHandler

* refactor temporal client to batch signal and start requests together in repair case

* add comment

* remove main method

* fix job id fetching

* only overwrite workflowState if reset flags are true on input

* fix test

* fix cancel endpoint

* Clean job state before creating new jobs in connection manager workflow (#12589)

* first working iteration of cleaning job state on first workflow run

* second iteration, with tests

* undo local testing changes

* move method

* add comment explaining placement of clean job state logic

* change connection_workflow failure origin value to platform

* remove cast from new query

* create static var for non terminal job statuses

* change failure origin value to airbyte_platform

* tweak external message wording

* remove unused variable

* reword external message

* fix merge conflict

* remove log lines

* move cleaning job state to beginning of workflow

* do not clean job state if there is already a job id for this workflow, and add test

* see if sleeping fixes test on CI

* add repeated test annotation to protect from flakiness

* fail jobs before creating new ones to protect from quarantined state

* update external message for cleaning job state error
  • Loading branch information
lmossman authored and suhomud committed May 23, 2022
1 parent a8da6d9 commit fbd01aa
Show file tree
Hide file tree
Showing 22 changed files with 994 additions and 212 deletions.
1 change: 1 addition & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3862,6 +3862,7 @@ components:
- persistence
- normalization
- dbt
- airbyte_platform
AttemptFailureType:
description: Categorizes well known errors into types for programmatic handling. If not set, the type of error is not well known.
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ properties:
- persistence
- normalization
- dbt
- airbyte_platform
failureType:
description: Categorizes well known errors into types for programmatic handling. If not set, the type of error is not well known.
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,21 @@

package io.airbyte.scheduler.client;

import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult;
import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult;
import java.util.Set;
import java.util.UUID;

public interface EventRunner {

void createNewSchedulerWorkflow(final UUID connectionId);

ManualSyncSubmissionResult startNewManualSync(final UUID connectionId);
ManualOperationResult startNewManualSync(final UUID connectionId);

ManualSyncSubmissionResult startNewCancelation(final UUID connectionId);
ManualOperationResult startNewCancellation(final UUID connectionId);

ManualSyncSubmissionResult resetConnection(final UUID connectionId);
ManualOperationResult resetConnection(final UUID connectionId);

ManualSyncSubmissionResult synchronousResetConnection(final UUID connectionId);
ManualOperationResult synchronousResetConnection(final UUID connectionId);

void deleteConnection(final UUID connectionId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.scheduler.client;

import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult;
import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult;
import java.util.Set;
import java.util.UUID;
import lombok.AllArgsConstructor;
Expand All @@ -19,19 +19,19 @@ public void createNewSchedulerWorkflow(final UUID connectionId) {
temporalClient.submitConnectionUpdaterAsync(connectionId);
}

public ManualSyncSubmissionResult startNewManualSync(final UUID connectionId) {
public ManualOperationResult startNewManualSync(final UUID connectionId) {
return temporalClient.startNewManualSync(connectionId);
}

public ManualSyncSubmissionResult startNewCancelation(final UUID connectionId) {
return temporalClient.startNewCancelation(connectionId);
public ManualOperationResult startNewCancellation(final UUID connectionId) {
return temporalClient.startNewCancellation(connectionId);
}

public ManualSyncSubmissionResult resetConnection(final UUID connectionId) {
public ManualOperationResult resetConnection(final UUID connectionId) {
return temporalClient.resetConnection(connectionId);
}

public ManualSyncSubmissionResult synchronousResetConnection(final UUID connectionId) {
public ManualOperationResult synchronousResetConnection(final UUID connectionId) {
return temporalClient.synchronousResetConnection(connectionId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ public enum JobStatus {
CANCELLED;

public static final Set<JobStatus> TERMINAL_STATUSES = Sets.newHashSet(FAILED, SUCCEEDED, CANCELLED);
public static final Set<JobStatus> NON_TERMINAL_STATUSES = Sets.difference(Set.of(values()), TERMINAL_STATUSES);

}
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,18 @@ public List<Job> listJobsWithStatus(final ConfigType configType, final JobStatus
return listJobsWithStatus(Sets.newHashSet(configType), status);
}

@Override
public List<Job> listJobsForConnectionWithStatuses(final UUID connectionId, final Set<ConfigType> configTypes, final Set<JobStatus> statuses)
throws IOException {
return jobDatabase.query(ctx -> getJobsFromResult(ctx
.fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE " +
"scope = ? AND " +
"config_type IN " + Sqls.toSqlInFragment(configTypes) + " AND " +
"jobs.status IN " + Sqls.toSqlInFragment(statuses) + " " +
ORDER_BY_JOB_TIME_ATTEMPT_TIME,
connectionId.toString())));
}

@Override
public List<JobWithStatusAndTimestamp> listJobStatusAndTimestampWithConnection(final UUID connectionId,
final Set<ConfigType> configTypes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,8 @@ public interface JobPersistence {

List<Job> listJobsWithStatus(JobConfig.ConfigType configType, JobStatus status) throws IOException;

List<Job> listJobsForConnectionWithStatuses(UUID connectionId, Set<JobConfig.ConfigType> configTypes, Set<JobStatus> statuses) throws IOException;

/**
* @param connectionId The ID of the connection
* @param configTypes The types of jobs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1168,6 +1168,45 @@ public void testListJobsWithStatusAndConfigType() throws IOException, Interrupte
assertEquals(expectedIncompleteJob, actualIncompleteJob);
}

@Test
@DisplayName("Should only list jobs for the requested connection and with the requested statuses and config types")
public void testListJobsWithStatusesAndConfigTypesForConnection() throws IOException, InterruptedException {
final UUID desiredConnectionId = UUID.randomUUID();
final UUID otherConnectionId = UUID.randomUUID();

// desired connection, statuses, and config types
final long desiredJobId1 = jobPersistence.enqueueJob(desiredConnectionId.toString(), SYNC_JOB_CONFIG).orElseThrow();
jobPersistence.succeedAttempt(desiredJobId1, jobPersistence.createAttempt(desiredJobId1, LOG_PATH));
final long desiredJobId2 = jobPersistence.enqueueJob(desiredConnectionId.toString(), SYNC_JOB_CONFIG).orElseThrow();
final long desiredJobId3 = jobPersistence.enqueueJob(desiredConnectionId.toString(), CHECK_JOB_CONFIG).orElseThrow();
jobPersistence.succeedAttempt(desiredJobId3, jobPersistence.createAttempt(desiredJobId3, LOG_PATH));
final long desiredJobId4 = jobPersistence.enqueueJob(desiredConnectionId.toString(), CHECK_JOB_CONFIG).orElseThrow();

// right connection id and status, wrong config type
final long otherJobId1 = jobPersistence.enqueueJob(desiredConnectionId.toString(), SPEC_JOB_CONFIG).orElseThrow();
// right config type and status, wrong connection id
final long otherJobId2 = jobPersistence.enqueueJob(otherConnectionId.toString(), SYNC_JOB_CONFIG).orElseThrow();
// right connection id and config type, wrong status
final long otherJobId3 = jobPersistence.enqueueJob(desiredConnectionId.toString(), CHECK_JOB_CONFIG).orElseThrow();
jobPersistence.failAttempt(otherJobId3, jobPersistence.createAttempt(otherJobId3, LOG_PATH));

final List<Job> actualJobs = jobPersistence.listJobsForConnectionWithStatuses(desiredConnectionId,
Set.of(ConfigType.SYNC, ConfigType.CHECK_CONNECTION_DESTINATION), Set.of(JobStatus.PENDING, JobStatus.SUCCEEDED));

final Job expectedDesiredJob1 = createJob(desiredJobId1, SYNC_JOB_CONFIG, JobStatus.SUCCEEDED,
Lists.newArrayList(createAttempt(0L, desiredJobId1, AttemptStatus.SUCCEEDED, LOG_PATH)),
NOW.getEpochSecond(), desiredConnectionId.toString());
final Job expectedDesiredJob2 =
createJob(desiredJobId2, SYNC_JOB_CONFIG, JobStatus.PENDING, Lists.newArrayList(), NOW.getEpochSecond(), desiredConnectionId.toString());
final Job expectedDesiredJob3 = createJob(desiredJobId3, CHECK_JOB_CONFIG, JobStatus.SUCCEEDED,
Lists.newArrayList(createAttempt(0L, desiredJobId3, AttemptStatus.SUCCEEDED, LOG_PATH)),
NOW.getEpochSecond(), desiredConnectionId.toString());
final Job expectedDesiredJob4 =
createJob(desiredJobId4, CHECK_JOB_CONFIG, JobStatus.PENDING, Lists.newArrayList(), NOW.getEpochSecond(), desiredConnectionId.toString());

assertEquals(Sets.newHashSet(expectedDesiredJob1, expectedDesiredJob2, expectedDesiredJob3, expectedDesiredJob4), Sets.newHashSet(actualJobs));
}

}

@Nested
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import io.airbyte.server.handlers.helpers.CatalogConverter;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult;
import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult;
import io.airbyte.workers.temporal.TemporalUtils;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.workflowservice.v1.RequestCancelWorkflowExecutionRequest;
Expand Down Expand Up @@ -364,7 +364,7 @@ public DestinationDefinitionSpecificationRead getDestinationSpecification(
public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody)
throws ConfigNotFoundException, IOException, JsonValidationException {
if (featureFlags.usesNewScheduler()) {
return createManualRun(connectionIdRequestBody.getConnectionId());
return submitManualSyncToWorker(connectionIdRequestBody.getConnectionId());
}
final UUID connectionId = connectionIdRequestBody.getConnectionId();
final StandardSync standardSync = configRepository.getStandardSync(connectionId);
Expand Down Expand Up @@ -411,7 +411,7 @@ public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequ
public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdRequestBody)
throws IOException, JsonValidationException, ConfigNotFoundException {
if (featureFlags.usesNewScheduler()) {
return resetConnectionWithNewScheduler(connectionIdRequestBody.getConnectionId());
return submitResetConnectionToWorker(connectionIdRequestBody.getConnectionId());
}
final UUID connectionId = connectionIdRequestBody.getConnectionId();
final StandardSync standardSync = configRepository.getStandardSync(connectionId);
Expand Down Expand Up @@ -447,7 +447,7 @@ public ConnectionState getState(final ConnectionIdRequestBody connectionIdReques
// todo (cgardens) - this method needs a test.
public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOException {
if (featureFlags.usesNewScheduler()) {
return createNewSchedulerCancellation(jobIdRequestBody.getId());
return submitCancellationToWorker(jobIdRequestBody.getId());
}

final long jobId = jobIdRequestBody.getId();
Expand Down Expand Up @@ -509,39 +509,36 @@ private ConnectorSpecification getSpecFromDestinationDefinitionId(final UUID des
return destinationDef.getSpec();
}

private JobInfoRead createNewSchedulerCancellation(final Long id) throws IOException {
final Job job = jobPersistence.getJob(id);

final ManualSyncSubmissionResult cancellationSubmissionResult = eventRunner.startNewCancelation(UUID.fromString(job.getScope()));
private JobInfoRead submitCancellationToWorker(final Long jobId) throws IOException {
final Job job = jobPersistence.getJob(jobId);

if (cancellationSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(cancellationSubmissionResult.getFailingReason().get());
final ManualOperationResult cancellationResult = eventRunner.startNewCancellation(UUID.fromString(job.getScope()));
if (cancellationResult.getFailingReason().isPresent()) {
throw new IllegalStateException(cancellationResult.getFailingReason().get());
}

final Job cancelledJob = jobPersistence.getJob(id);
return jobConverter.getJobInfoRead(cancelledJob);
// query same job ID again to get updated job info after cancellation
return jobConverter.getJobInfoRead(jobPersistence.getJob(jobId));
}

private JobInfoRead createManualRun(final UUID connectionId) throws IOException {
final ManualSyncSubmissionResult manualSyncSubmissionResult = eventRunner.startNewManualSync(connectionId);
private JobInfoRead submitManualSyncToWorker(final UUID connectionId) throws IOException {
final ManualOperationResult manualSyncResult = eventRunner.startNewManualSync(connectionId);

if (manualSyncSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(manualSyncSubmissionResult.getFailingReason().get());
}
return readJobFromResult(manualSyncResult);
}

final Job job = jobPersistence.getJob(manualSyncSubmissionResult.getJobId().get());
private JobInfoRead submitResetConnectionToWorker(final UUID connectionId) throws IOException {
final ManualOperationResult resetConnectionResult = eventRunner.resetConnection(connectionId);

return jobConverter.getJobInfoRead(job);
return readJobFromResult(resetConnectionResult);
}

private JobInfoRead resetConnectionWithNewScheduler(final UUID connectionId) throws IOException {
final ManualSyncSubmissionResult manualSyncSubmissionResult = eventRunner.resetConnection(connectionId);

if (manualSyncSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(manualSyncSubmissionResult.getFailingReason().get());
private JobInfoRead readJobFromResult(final ManualOperationResult manualOperationResult) throws IOException, IllegalStateException {
if (manualOperationResult.getFailingReason().isPresent()) {
throw new IllegalStateException(manualOperationResult.getFailingReason().get());
}

final Job job = jobPersistence.getJob(manualSyncSubmissionResult.getJobId().get());
final Job job = jobPersistence.getJob(manualOperationResult.getJobId().get());

return jobConverter.getJobInfoRead(job);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@
import io.airbyte.server.helpers.SourceHelpers;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult;
import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -720,14 +720,14 @@ void testNewSchedulerSync() throws JsonValidationException, ConfigNotFoundExcept
final UUID connectionId = UUID.randomUUID();

final long jobId = 123L;
final ManualSyncSubmissionResult manualSyncSubmissionResult = ManualSyncSubmissionResult
final ManualOperationResult manualOperationResult = ManualOperationResult
.builder()
.failingReason(Optional.empty())
.jobId(Optional.of(jobId))
.build();

when(eventRunner.startNewManualSync(connectionId))
.thenReturn(manualSyncSubmissionResult);
.thenReturn(manualOperationResult);

doReturn(new JobInfoRead())
.when(jobConverter).getJobInfoRead(any());
Expand Down
Loading

0 comments on commit fbd01aa

Please sign in to comment.