Skip to content

Commit

Permalink
Bmoric/move flag check to handler (#10469)
Browse files Browse the repository at this point in the history
Move the feature flag checks to the handler instead of the configuration API. This could have avoid some bug related to the missing flag check in the cloud project.
  • Loading branch information
benmoriceau authored Feb 23, 2022
1 parent 6cd20e6 commit 2c09037
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ public class ConfigurationApi implements io.airbyte.api.V1Api {
private final WorkerEnvironment workerEnvironment;
private final LogConfigs logConfigs;
private final Path workspaceRoot;
private final FeatureFlags featureFlags;

public ConfigurationApi(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
Expand Down Expand Up @@ -190,7 +189,8 @@ public ConfigurationApi(final ConfigRepository configRepository,
jobPersistence,
jobNotifier,
temporalService,
new OAuthConfigSupplier(configRepository, trackingClient), workerEnvironment, logConfigs, temporalWorkerRunFactory);
new OAuthConfigSupplier(configRepository, trackingClient), workerEnvironment, logConfigs, temporalWorkerRunFactory, featureFlags);

connectionsHandler = new ConnectionsHandler(
configRepository,
workspaceHelper,
Expand Down Expand Up @@ -228,7 +228,6 @@ public ConfigurationApi(final ConfigRepository configRepository,
logsHandler = new LogsHandler();
openApiConfigHandler = new OpenApiConfigHandler();
dbMigrationHandler = new DbMigrationHandler(configsDatabase, jobsDatabase);
this.featureFlags = featureFlags;
}

// WORKSPACE
Expand Down Expand Up @@ -566,19 +565,11 @@ public void deleteConnection(final ConnectionIdRequestBody connectionIdRequestBo

@Override
public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
if (featureFlags.usesNewScheduler()) {
return execute(() -> schedulerHandler.createManualRun(connectionIdRequestBody.getConnectionId()));
}

return execute(() -> schedulerHandler.syncConnection(connectionIdRequestBody));
}

@Override
public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdRequestBody) {
if (featureFlags.usesNewScheduler()) {
return execute(() -> schedulerHandler.resetConnection(connectionIdRequestBody.getConnectionId()));
}

return execute(() -> schedulerHandler.resetConnection(connectionIdRequestBody));
}

Expand Down Expand Up @@ -640,10 +631,6 @@ public SourceDiscoverSchemaRead executeSourceDiscoverSchema(final SourceCoreConf

@Override
public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) {
if (featureFlags.usesNewScheduler()) {
return execute(() -> schedulerHandler.createNewSchedulerCancellation(jobIdRequestBody.getId()));
}

return execute(() -> schedulerHandler.cancelJob(jobIdRequestBody));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.airbyte.api.model.SourceUpdate;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobConfig.ConfigType;
Expand Down Expand Up @@ -88,6 +89,7 @@ public class SchedulerHandler {
private final WorkerEnvironment workerEnvironment;
private final LogConfigs logConfigs;
private final TemporalWorkerRunFactory temporalWorkerRunFactory;
private final FeatureFlags featureFlags;

public SchedulerHandler(final ConfigRepository configRepository,
final SchedulerJobClient schedulerJobClient,
Expand All @@ -98,7 +100,8 @@ public SchedulerHandler(final ConfigRepository configRepository,
final OAuthConfigSupplier oAuthConfigSupplier,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final TemporalWorkerRunFactory temporalWorkerRunFactory) {
final TemporalWorkerRunFactory temporalWorkerRunFactory,
final FeatureFlags featureFlags) {
this(
configRepository,
schedulerJobClient,
Expand All @@ -111,7 +114,9 @@ public SchedulerHandler(final ConfigRepository configRepository,
oAuthConfigSupplier,
workerEnvironment,
logConfigs,
temporalWorkerRunFactory);
temporalWorkerRunFactory,
featureFlags,
new JobConverter(workerEnvironment, logConfigs));
}

@VisibleForTesting
Expand All @@ -126,7 +131,9 @@ public SchedulerHandler(final ConfigRepository configRepository,
final OAuthConfigSupplier oAuthConfigSupplier,
final WorkerEnvironment workerEnvironment,
final LogConfigs logConfigs,
final TemporalWorkerRunFactory temporalWorkerRunFactory) {
final TemporalWorkerRunFactory temporalWorkerRunFactory,
final FeatureFlags featureFlags,
final JobConverter jobConverter) {
this.configRepository = configRepository;
this.schedulerJobClient = schedulerJobClient;
this.synchronousSchedulerClient = synchronousSchedulerClient;
Expand All @@ -138,8 +145,9 @@ public SchedulerHandler(final ConfigRepository configRepository,
this.oAuthConfigSupplier = oAuthConfigSupplier;
this.workerEnvironment = workerEnvironment;
this.logConfigs = logConfigs;
this.jobConverter = new JobConverter(workerEnvironment, logConfigs);
this.temporalWorkerRunFactory = temporalWorkerRunFactory;
this.featureFlags = featureFlags;
this.jobConverter = jobConverter;
}

public CheckConnectionRead checkSourceConnectionFromSourceId(final SourceIdRequestBody sourceIdRequestBody)
Expand Down Expand Up @@ -307,6 +315,9 @@ public DestinationDefinitionSpecificationRead getDestinationSpecification(

public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequestBody)
throws ConfigNotFoundException, IOException, JsonValidationException {
if (featureFlags.usesNewScheduler()) {
return createManualRun(connectionIdRequestBody.getConnectionId());
}
final UUID connectionId = connectionIdRequestBody.getConnectionId();
final StandardSync standardSync = configRepository.getStandardSync(connectionId);

Expand Down Expand Up @@ -347,20 +358,11 @@ public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequ
return jobConverter.getJobInfoRead(job);
}

public JobInfoRead resetConnection(final UUID connectionId) throws IOException {
final ManualSyncSubmissionResult manualSyncSubmissionResult = temporalWorkerRunFactory.resetConnection(connectionId);

if (manualSyncSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(manualSyncSubmissionResult.getFailingReason().get());
}

final Job job = jobPersistence.getJob(manualSyncSubmissionResult.getJobId().get());

return jobConverter.getJobInfoRead(job);
}

public JobInfoRead resetConnection(final ConnectionIdRequestBody connectionIdRequestBody)
throws IOException, JsonValidationException, ConfigNotFoundException {
if (featureFlags.usesNewScheduler()) {
return resetConnectionWithNewScheduler(connectionIdRequestBody.getConnectionId());
}
final UUID connectionId = connectionIdRequestBody.getConnectionId();
final StandardSync standardSync = configRepository.getStandardSync(connectionId);

Expand Down Expand Up @@ -394,6 +396,10 @@ public ConnectionState getState(final ConnectionIdRequestBody connectionIdReques

// todo (cgardens) - this method needs a test.
public JobInfoRead cancelJob(final JobIdRequestBody jobIdRequestBody) throws IOException {
if (featureFlags.usesNewScheduler()) {
createNewSchedulerCancellation(jobIdRequestBody.getId());
}

final long jobId = jobIdRequestBody.getId();

// prevent this job from being scheduled again
Expand Down Expand Up @@ -453,7 +459,20 @@ private ConnectorSpecification getSpecFromDestinationDefinitionId(final UUID des
return destinationDef.getSpec();
}

public JobInfoRead createManualRun(final UUID connectionId) throws IOException {
private JobInfoRead createNewSchedulerCancellation(final Long id) throws IOException {
final Job job = jobPersistence.getJob(id);

final ManualSyncSubmissionResult cancellationSubmissionResult = temporalWorkerRunFactory.startNewCancelation(UUID.fromString(job.getScope()));

if (cancellationSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(cancellationSubmissionResult.getFailingReason().get());
}

final Job cancelledJob = jobPersistence.getJob(id);
return jobConverter.getJobInfoRead(cancelledJob);
}

private JobInfoRead createManualRun(final UUID connectionId) throws IOException {
final ManualSyncSubmissionResult manualSyncSubmissionResult = temporalWorkerRunFactory.startNewManualSync(connectionId);

if (manualSyncSubmissionResult.getFailingReason().isPresent()) {
Expand All @@ -465,17 +484,16 @@ public JobInfoRead createManualRun(final UUID connectionId) throws IOException {
return jobConverter.getJobInfoRead(job);
}

public JobInfoRead createNewSchedulerCancellation(final Long id) throws IOException {
final Job job = jobPersistence.getJob(id);

final ManualSyncSubmissionResult cancellationSubmissionResult = temporalWorkerRunFactory.startNewCancelation(UUID.fromString(job.getScope()));
private JobInfoRead resetConnectionWithNewScheduler(final UUID connectionId) throws IOException {
final ManualSyncSubmissionResult manualSyncSubmissionResult = temporalWorkerRunFactory.resetConnection(connectionId);

if (cancellationSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(cancellationSubmissionResult.getFailingReason().get());
if (manualSyncSubmissionResult.getFailingReason().isPresent()) {
throw new IllegalStateException(manualSyncSubmissionResult.getFailingReason().get());
}

final Job cancelledJob = jobPersistence.getJob(id);
return jobConverter.getJobInfoRead(cancelledJob);
final Job job = jobPersistence.getJob(manualSyncSubmissionResult.getJobId().get());

return jobConverter.getJobInfoRead(job);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
Expand All @@ -35,6 +36,7 @@
import io.airbyte.api.model.SourceUpdate;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.config.Configs.WorkerEnvironment;
Expand Down Expand Up @@ -69,11 +71,13 @@
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier;
import io.airbyte.server.converters.ConfigurationUpdate;
import io.airbyte.server.converters.JobConverter;
import io.airbyte.server.helpers.ConnectionHelpers;
import io.airbyte.server.helpers.DestinationHelpers;
import io.airbyte.server.helpers.SourceHelpers;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.TemporalClient.ManualSyncSubmissionResult;
import io.airbyte.workers.worker_run.TemporalWorkerRunFactory;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
Expand Down Expand Up @@ -129,6 +133,8 @@ class SchedulerHandlerTest {
private JsonSchemaValidator jsonSchemaValidator;
private JobPersistence jobPersistence;
private TemporalWorkerRunFactory temporalWorkerRunFactory;
private FeatureFlags featureFlags;
private JobConverter jobConverter;

@BeforeEach
void setup() {
Expand All @@ -147,6 +153,11 @@ void setup() {
final JobNotifier jobNotifier = mock(JobNotifier.class);
temporalWorkerRunFactory = mock(TemporalWorkerRunFactory.class);

featureFlags = mock(FeatureFlags.class);
when(featureFlags.usesNewScheduler()).thenReturn(false);

jobConverter = spy(new JobConverter(WorkerEnvironment.DOCKER, LogConfigs.EMPTY));

schedulerHandler = new SchedulerHandler(
configRepository,
schedulerJobClient,
Expand All @@ -159,7 +170,9 @@ void setup() {
mock(OAuthConfigSupplier.class),
WorkerEnvironment.DOCKER,
LogConfigs.EMPTY,
temporalWorkerRunFactory);
temporalWorkerRunFactory,
featureFlags,
jobConverter);
}

@Test
Expand Down Expand Up @@ -581,6 +594,30 @@ void testEnumConversion() {
assertTrue(Enums.isCompatible(JobStatus.class, io.airbyte.api.model.JobStatus.class));
}

@Test
void testNewSchedulerSync() throws JsonValidationException, ConfigNotFoundException, IOException {
when(featureFlags.usesNewScheduler()).thenReturn(true);

UUID connectionId = UUID.randomUUID();

long jobId = 123L;
ManualSyncSubmissionResult manualSyncSubmissionResult = ManualSyncSubmissionResult
.builder()
.failingReason(Optional.empty())
.jobId(Optional.of(jobId))
.build();

when(temporalWorkerRunFactory.startNewManualSync(connectionId))
.thenReturn(manualSyncSubmissionResult);

doReturn(new JobInfoRead())
.when(jobConverter).getJobInfoRead(any());

schedulerHandler.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));

verify(temporalWorkerRunFactory).startNewManualSync(connectionId);
}

private static List<StandardSyncOperation> getOperations(final StandardSync standardSync) {
if (standardSync.getOperationIds() != null && !standardSync.getOperationIds().isEmpty()) {
return List.of(getOperation(standardSync.getOperationIds().get(0)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.UUID;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Builder;
import lombok.Value;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.time.StopWatch;
Expand Down Expand Up @@ -235,7 +236,8 @@ public void update(final ConnectionUpdate connectionUpdate) throws JsonValidatio
}

@Value
public class ManualSyncSubmissionResult {
@Builder
public static class ManualSyncSubmissionResult {

final Optional<String> failingReason;
final Optional<Long> jobId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,15 +441,16 @@ private void reportJobStarting() {
}

/**
* Start the child {@link SyncWorkflow}. We are using a child workflow here for two main reason:
* <<<<<<< HEAD Start the child SyncWorkflow ======= Start the child {@link SyncWorkflow}. We are
* using a child workflow here for two main reason:
* <p>
* - Originally the Sync workflow was living by himself and was launch by the scheduler. In order to
* limit the potential migration issues, we kept the {@link SyncWorkflow} as is and launch it as a
* child workflow.
* <p>
* - The {@link SyncWorkflow} has different requirements than the {@link ConnectionManagerWorkflow}
* since the latter is a long running workflow, in the future, using a different Node pool would
* make sense.
* make sense. >>>>>>> 76e969f2e5e1b869648142c3565b7375b1892999
*/
private StandardSyncOutput runChildWorkflow(GeneratedJobInput jobInputs) {
final SyncWorkflow childSync = Workflow.newChildWorkflowStub(SyncWorkflow.class,
Expand Down

0 comments on commit 2c09037

Please sign in to comment.