Skip to content

Commit

Permalink
Remove configRepository dependency from getStatus (#20685)
Browse files Browse the repository at this point in the history
* remove dependence on config repository for getStatus
  • Loading branch information
alovew authored Dec 21, 2022
1 parent eee6193 commit 9efc4ff
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.workers.temporal.scheduling.activities;

import io.airbyte.api.client.model.generated.ConnectionRead;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSync.Status;
import io.airbyte.config.persistence.ConfigNotFoundException;
Expand Down Expand Up @@ -47,6 +48,8 @@ class ScheduleRetrieverOutput {

StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException;

ConnectionRead getConnection(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException;

/**
* Return how much time to wait before running the next sync. It will query the DB to get the last
* starting time of the latest terminal job (Failed, canceled or successful) and return the amount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@

import com.google.common.annotations.VisibleForTesting;
import datadog.trace.api.Trace;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionRead;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.config.Cron;
import io.airbyte.config.StandardSync;
Expand Down Expand Up @@ -74,8 +75,9 @@ public ConfigFetchActivityImpl(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier,
final ConnectionApi connectionApi) {
this(configRepository, jobPersistence, new WorkspaceHelper(configRepository, jobPersistence), syncJobMaxAttempts, currentSecondsSupplier, connectionApi);
final ConnectionApi connectionApi) {
this(configRepository, jobPersistence, new WorkspaceHelper(configRepository, jobPersistence), syncJobMaxAttempts, currentSecondsSupplier,
connectionApi);
}

@VisibleForTesting
Expand All @@ -84,7 +86,7 @@ protected ConfigFetchActivityImpl(final ConfigRepository configRepository,
final WorkspaceHelper workspaceHelper,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier,
final ConnectionApi connectionApi) {
final ConnectionApi connectionApi) {
this.configRepository = configRepository;
this.jobPersistence = jobPersistence;
this.workspaceHelper = workspaceHelper;
Expand All @@ -99,6 +101,14 @@ public StandardSync getStandardSync(final UUID connectionId) throws JsonValidati
return configRepository.getStandardSync(connectionId);
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public ConnectionRead getConnection(final UUID connectionId) throws ApiException {
final io.airbyte.api.client.model.generated.ConnectionIdRequestBody requestBody =
new io.airbyte.api.client.model.generated.ConnectionIdRequestBody().connectionId(connectionId);
return connectionApi.getConnection(requestBody);
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
@Override
public ScheduleRetrieverOutput getTimeToWait(final ScheduleRetrieverInput input) {
Expand Down Expand Up @@ -236,9 +246,7 @@ public GetMaxAttemptOutput getMaxAttempt() {
@Override
public Optional<UUID> getSourceId(final UUID connectionId) {
try {
final io.airbyte.api.client.model.generated.ConnectionIdRequestBody requestBody =
new io.airbyte.api.client.model.generated.ConnectionIdRequestBody().connectionId(connectionId);
final ConnectionRead connectionRead = connectionApi.getConnection(requestBody);
final ConnectionRead connectionRead = getConnection(connectionId);
return Optional.ofNullable(connectionRead.getSourceId());
} catch (ApiException e) {
log.info("Encountered an error fetching the connection's Source ID: ", e);
Expand All @@ -249,9 +257,10 @@ public Optional<UUID> getSourceId(final UUID connectionId) {
@Override
public Optional<Status> getStatus(final UUID connectionId) {
try {
final StandardSync standardSync = getStandardSync(connectionId);
return Optional.ofNullable(standardSync.getStatus());
} catch (final JsonValidationException | ConfigNotFoundException | IOException e) {
final ConnectionRead connectionRead = getConnection(connectionId);
final Status standardSyncStatus = Enums.convertTo(connectionRead.getStatus(), StandardSync.Status.class);
return Optional.ofNullable(standardSyncStatus);
} catch (ApiException e) {
log.info("Encountered an error fetching the connection's status: ", e);
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.Cron;
Expand Down

0 comments on commit 9efc4ff

Please sign in to comment.