diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivity.java index f936f9d93788..50d6db3ef47b 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivity.java @@ -4,6 +4,7 @@ package io.airbyte.workers.temporal.scheduling.activities; +import io.airbyte.api.client.model.generated.ConnectionRead; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSync.Status; import io.airbyte.config.persistence.ConfigNotFoundException; @@ -47,6 +48,8 @@ class ScheduleRetrieverOutput { StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException; + ConnectionRead getConnection(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException; + /** * Return how much time to wait before running the next sync. It will query the DB to get the last * starting time of the latest terminal job (Failed, canceled or successful) and return the amount diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java index 1f8a18a7de1a..207480eba5e1 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java @@ -9,10 +9,11 @@ import com.google.common.annotations.VisibleForTesting; import datadog.trace.api.Trace; -import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.api.client.generated.ConnectionApi; import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.ConnectionRead; +import io.airbyte.commons.enums.Enums; +import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.commons.temporal.exception.RetryableException; import io.airbyte.config.Cron; import io.airbyte.config.StandardSync; @@ -74,8 +75,9 @@ public ConfigFetchActivityImpl(final ConfigRepository configRepository, final JobPersistence jobPersistence, @Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts, @Named("currentSecondsSupplier") final Supplier currentSecondsSupplier, - final ConnectionApi connectionApi) { - this(configRepository, jobPersistence, new WorkspaceHelper(configRepository, jobPersistence), syncJobMaxAttempts, currentSecondsSupplier, connectionApi); + final ConnectionApi connectionApi) { + this(configRepository, jobPersistence, new WorkspaceHelper(configRepository, jobPersistence), syncJobMaxAttempts, currentSecondsSupplier, + connectionApi); } @VisibleForTesting @@ -84,7 +86,7 @@ protected ConfigFetchActivityImpl(final ConfigRepository configRepository, final WorkspaceHelper workspaceHelper, @Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts, @Named("currentSecondsSupplier") final Supplier currentSecondsSupplier, - final ConnectionApi connectionApi) { + final ConnectionApi connectionApi) { this.configRepository = configRepository; this.jobPersistence = jobPersistence; this.workspaceHelper = workspaceHelper; @@ -99,6 +101,14 @@ public StandardSync getStandardSync(final UUID connectionId) throws JsonValidati return configRepository.getStandardSync(connectionId); } + @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) + @Override + public ConnectionRead getConnection(final UUID connectionId) throws ApiException { + final io.airbyte.api.client.model.generated.ConnectionIdRequestBody requestBody = + new io.airbyte.api.client.model.generated.ConnectionIdRequestBody().connectionId(connectionId); + return connectionApi.getConnection(requestBody); + } + @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @Override public ScheduleRetrieverOutput getTimeToWait(final ScheduleRetrieverInput input) { @@ -236,9 +246,7 @@ public GetMaxAttemptOutput getMaxAttempt() { @Override public Optional getSourceId(final UUID connectionId) { try { - final io.airbyte.api.client.model.generated.ConnectionIdRequestBody requestBody = - new io.airbyte.api.client.model.generated.ConnectionIdRequestBody().connectionId(connectionId); - final ConnectionRead connectionRead = connectionApi.getConnection(requestBody); + final ConnectionRead connectionRead = getConnection(connectionId); return Optional.ofNullable(connectionRead.getSourceId()); } catch (ApiException e) { log.info("Encountered an error fetching the connection's Source ID: ", e); @@ -249,9 +257,10 @@ public Optional getSourceId(final UUID connectionId) { @Override public Optional getStatus(final UUID connectionId) { try { - final StandardSync standardSync = getStandardSync(connectionId); - return Optional.ofNullable(standardSync.getStatus()); - } catch (final JsonValidationException | ConfigNotFoundException | IOException e) { + final ConnectionRead connectionRead = getConnection(connectionId); + final Status standardSyncStatus = Enums.convertTo(connectionRead.getStatus(), StandardSync.Status.class); + return Optional.ofNullable(standardSyncStatus); + } catch (ApiException e) { log.info("Encountered an error fetching the connection's status: ", e); return Optional.empty(); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java index c4112fb14697..b7b7d8fae438 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java @@ -6,6 +6,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.when; + import io.airbyte.api.client.generated.ConnectionApi; import io.airbyte.config.BasicSchedule; import io.airbyte.config.Cron;