Skip to content

Commit

Permalink
Add error code to ManualOperationResult (#14657)
Browse files Browse the repository at this point in the history
* add error code to ManualOperationResult

* fix a bug
  • Loading branch information
xiaohansong authored Jul 15, 2022
1 parent 1b73318 commit 8dbc6da
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableSet;
import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import io.airbyte.api.model.generated.AdvancedAuth;
Expand Down Expand Up @@ -58,9 +59,11 @@
import io.airbyte.server.converters.ConfigurationUpdate;
import io.airbyte.server.converters.JobConverter;
import io.airbyte.server.converters.OauthModelConverter;
import io.airbyte.server.errors.ValueConflictKnownException;
import io.airbyte.server.handlers.helpers.CatalogConverter;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.ErrorCode;
import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult;
import java.io.IOException;
import java.util.ArrayList;
Expand All @@ -74,6 +77,9 @@ public class SchedulerHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerHandler.class);
private static final HashFunction HASH_FUNCTION = Hashing.md5();

private static final ImmutableSet<ErrorCode> VALUE_CONFLICT_EXCEPTION_ERROR_CODE_SET =
ImmutableSet.of(ErrorCode.WORKFLOW_DELETED, ErrorCode.WORKFLOW_RUNNING);

private final ConfigRepository configRepository;
private final SecretsRepositoryWriter secretsRepositoryWriter;
private final SynchronousSchedulerClient synchronousSchedulerClient;
Expand Down Expand Up @@ -381,7 +387,11 @@ private JobInfoRead submitResetConnectionToWorker(final UUID connectionId) throw

private JobInfoRead readJobFromResult(final ManualOperationResult manualOperationResult) throws IOException, IllegalStateException {
if (manualOperationResult.getFailingReason().isPresent()) {
throw new IllegalStateException(manualOperationResult.getFailingReason().get());
if (VALUE_CONFLICT_EXCEPTION_ERROR_CODE_SET.contains(manualOperationResult.getErrorCode().get())) {
throw new ValueConflictKnownException(manualOperationResult.getFailingReason().get());
} else {
throw new IllegalStateException(manualOperationResult.getFailingReason().get());
}
}

final Job job = jobPersistence.getJob(manualOperationResult.getJobId().get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -67,10 +68,12 @@
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.converters.ConfigurationUpdate;
import io.airbyte.server.converters.JobConverter;
import io.airbyte.server.errors.ValueConflictKnownException;
import io.airbyte.server.helpers.DestinationHelpers;
import io.airbyte.server.helpers.SourceHelpers;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.ErrorCode;
import io.airbyte.workers.temporal.TemporalClient.ManualOperationResult;
import java.io.IOException;
import java.net.URI;
Expand Down Expand Up @@ -587,6 +590,25 @@ void testSyncConnection() throws IOException {
verify(eventRunner).startNewManualSync(connectionId);
}

@Test
void testSyncConnectionFailWithOtherSyncRunning() throws IOException {
final UUID connectionId = UUID.randomUUID();

final ManualOperationResult manualOperationResult = ManualOperationResult
.builder()
.failingReason(Optional.of("another sync running"))
.jobId(Optional.empty())
.errorCode(Optional.of(ErrorCode.WORKFLOW_RUNNING))
.build();

when(eventRunner.startNewManualSync(connectionId))
.thenReturn(manualOperationResult);

assertThrows(ValueConflictKnownException.class,
() -> schedulerHandler.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)));

}

@Test
void testResetConnection() throws IOException, JsonValidationException, ConfigNotFoundException {
final UUID connectionId = UUID.randomUUID();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.workers.temporal;

public enum ErrorCode {
UNKNOWN,
WORKFLOW_DELETED,
WORKFLOW_RUNNING
}
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,7 @@ public static class ManualOperationResult {

final Optional<String> failingReason;
final Optional<Long> jobId;
final Optional<ErrorCode> errorCode;

}

Expand All @@ -300,7 +301,7 @@ public ManualOperationResult startNewManualSync(final UUID connectionId) {
// TODO Bmoric: Error is running
return new ManualOperationResult(
Optional.of("A sync is already running for: " + connectionId),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.WORKFLOW_RUNNING));
}

try {
Expand All @@ -309,7 +310,7 @@ public ManualOperationResult startNewManualSync(final UUID connectionId) {
log.error("Can't sync a deleted connection.", e);
return new ManualOperationResult(
Optional.of(e.getMessage()),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.WORKFLOW_DELETED));
}

do {
Expand All @@ -318,7 +319,7 @@ public ManualOperationResult startNewManualSync(final UUID connectionId) {
} catch (final InterruptedException e) {
return new ManualOperationResult(
Optional.of("Didn't managed to start a sync for: " + connectionId),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.UNKNOWN));
}
} while (!ConnectionManagerUtils.isWorkflowStateRunning(client, connectionId));

Expand All @@ -328,7 +329,7 @@ public ManualOperationResult startNewManualSync(final UUID connectionId) {

return new ManualOperationResult(
Optional.empty(),
Optional.of(jobId));
Optional.of(jobId), Optional.empty());
}

public ManualOperationResult startNewCancellation(final UUID connectionId) {
Expand All @@ -342,7 +343,7 @@ public ManualOperationResult startNewCancellation(final UUID connectionId) {
log.error("Can't cancel a deleted workflow", e);
return new ManualOperationResult(
Optional.of(e.getMessage()),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.WORKFLOW_DELETED));
}

do {
Expand All @@ -351,15 +352,15 @@ public ManualOperationResult startNewCancellation(final UUID connectionId) {
} catch (final InterruptedException e) {
return new ManualOperationResult(
Optional.of("Didn't manage to cancel a sync for: " + connectionId),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.UNKNOWN));
}
} while (ConnectionManagerUtils.isWorkflowStateRunning(client, connectionId));

log.info("end of manual cancellation");

return new ManualOperationResult(
Optional.empty(),
Optional.of(jobId));
Optional.of(jobId), Optional.empty());
}

public ManualOperationResult resetConnection(final UUID connectionId, final List<StreamDescriptor> streamsToReset) {
Expand All @@ -371,7 +372,7 @@ public ManualOperationResult resetConnection(final UUID connectionId, final List
log.error("Could not persist streams to reset.", e);
return new ManualOperationResult(
Optional.of(e.getMessage()),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.UNKNOWN));
}

// get the job ID before the reset, defaulting to NON_RUNNING_JOB_ID if workflow is unreachable
Expand All @@ -383,7 +384,7 @@ public ManualOperationResult resetConnection(final UUID connectionId, final List
log.error("Can't reset a deleted workflow", e);
return new ManualOperationResult(
Optional.of(e.getMessage()),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.UNKNOWN));
}

Optional<Long> newJobId;
Expand All @@ -394,7 +395,7 @@ public ManualOperationResult resetConnection(final UUID connectionId, final List
} catch (final InterruptedException e) {
return new ManualOperationResult(
Optional.of("Didn't manage to reset a sync for: " + connectionId),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.UNKNOWN));
}
newJobId = getNewJobId(connectionId, oldJobId);
} while (newJobId.isEmpty());
Expand All @@ -403,7 +404,7 @@ public ManualOperationResult resetConnection(final UUID connectionId, final List

return new ManualOperationResult(
Optional.empty(),
newJobId);
newJobId, Optional.empty());
}

private Optional<Long> getNewJobId(final UUID connectionId, final long oldJobId) {
Expand All @@ -428,22 +429,21 @@ public ManualOperationResult synchronousResetConnection(final UUID connectionId,
}

final long resetJobId = resetResult.getJobId().get();

do {
try {
Thread.sleep(DELAY_BETWEEN_QUERY_MS);
} catch (final InterruptedException e) {
return new ManualOperationResult(
Optional.of("Didn't manage to reset a sync for: " + connectionId),
Optional.empty());
Optional.empty(), Optional.of(ErrorCode.UNKNOWN));
}
} while (ConnectionManagerUtils.getCurrentJobId(client, connectionId) == resetJobId);

log.info("End of reset");

return new ManualOperationResult(
Optional.empty(),
Optional.of(resetJobId));
Optional.of(resetJobId), Optional.empty());
}

private <T> T getWorkflowStub(final Class<T> workflowClass, final TemporalJobType jobType) {
Expand Down

0 comments on commit 8dbc6da

Please sign in to comment.