Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove configRepo dependencies #20690

Merged
merged 4 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.auth0.jwt.algorithms.Algorithm;
import com.google.auth.oauth2.ServiceAccountCredentials;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.generated.SourceApi;
import io.airbyte.api.client.invoker.generated.ApiClient;
import io.airbyte.commons.temporal.config.WorkerMode;
Expand Down Expand Up @@ -67,6 +68,11 @@ public SourceApi sourceApi(final ApiClient apiClient) {
return new SourceApi(apiClient);
}

@Singleton
public ConnectionApi connectionApi(final ApiClient apiClient) {
return new ConnectionApi(apiClient);
}

@Singleton
public HttpClient httpClient() {
return HttpClient.newHttpClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

import com.google.common.annotations.VisibleForTesting;
import datadog.trace.api.Trace;
import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.ConnectionRead;
import io.airbyte.commons.temporal.config.WorkerMode;
import io.airbyte.commons.temporal.exception.RetryableException;
import io.airbyte.config.Cron;
Expand Down Expand Up @@ -65,25 +68,30 @@ public class ConfigFetchActivityImpl implements ConfigFetchActivity {
private final WorkspaceHelper workspaceHelper;
private final Integer syncJobMaxAttempts;
private final Supplier<Long> currentSecondsSupplier;
private final ConnectionApi connectionApi;

public ConfigFetchActivityImpl(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier) {
this(configRepository, jobPersistence, new WorkspaceHelper(configRepository, jobPersistence), syncJobMaxAttempts, currentSecondsSupplier);
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier,
final ConnectionApi connectionApi) {
this(configRepository, jobPersistence, new WorkspaceHelper(configRepository, jobPersistence), syncJobMaxAttempts, currentSecondsSupplier,
connectionApi);
}

@VisibleForTesting
protected ConfigFetchActivityImpl(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final WorkspaceHelper workspaceHelper,
@Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts,
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier) {
@Named("currentSecondsSupplier") final Supplier<Long> currentSecondsSupplier,
final ConnectionApi connectionApi) {
this.configRepository = configRepository;
this.jobPersistence = jobPersistence;
this.workspaceHelper = workspaceHelper;
this.syncJobMaxAttempts = syncJobMaxAttempts;
this.currentSecondsSupplier = currentSecondsSupplier;
this.connectionApi = connectionApi;
}

@Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME)
Expand Down Expand Up @@ -229,9 +237,11 @@ public GetMaxAttemptOutput getMaxAttempt() {
@Override
public Optional<UUID> getSourceId(final UUID connectionId) {
try {
final StandardSync standardSync = getStandardSync(connectionId);
return Optional.ofNullable(standardSync.getSourceId());
} catch (final JsonValidationException | ConfigNotFoundException | IOException e) {
final io.airbyte.api.client.model.generated.ConnectionIdRequestBody requestBody =
new io.airbyte.api.client.model.generated.ConnectionIdRequestBody().connectionId(connectionId);
final ConnectionRead connectionRead = connectionApi.getConnection(requestBody);
return Optional.ofNullable(connectionRead.getSourceId());
} catch (ApiException e) {
log.info("Encountered an error fetching the connection's Source ID: ", e);
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;

import io.airbyte.api.client.generated.ConnectionApi;
import io.airbyte.config.BasicSchedule;
import io.airbyte.config.Cron;
import io.airbyte.config.Schedule;
Expand Down Expand Up @@ -55,6 +56,9 @@ class ConfigFetchActivityTest {
@Mock
private Job mJob;

@Mock
private ConnectionApi mConnectionApi;

private ConfigFetchActivityImpl configFetchActivity;

private final static UUID connectionId = UUID.randomUUID();
Expand Down Expand Up @@ -102,7 +106,7 @@ class ConfigFetchActivityTest {
void setup() {
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
() -> Instant.now().getEpochSecond());
() -> Instant.now().getEpochSecond(), mConnectionApi);
}

@Nested
Expand Down Expand Up @@ -170,7 +174,8 @@ void testDeleted() throws IOException, JsonValidationException, ConfigNotFoundEx
@Test
@DisplayName("Test we will wait the required amount of time with legacy config")
void testWait() throws IOException, JsonValidationException, ConfigNotFoundException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3);
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);

when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand All @@ -192,7 +197,8 @@ void testWait() throws IOException, JsonValidationException, ConfigNotFoundExcep
@Test
@DisplayName("Test we will not wait if we are late in the legacy schedule schema")
void testNotWaitIfLate() throws IOException, JsonValidationException, ConfigNotFoundException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10);
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10, mConnectionApi);

when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand Down Expand Up @@ -247,7 +253,7 @@ void testBasicScheduleTypeFirstRun() throws IOException, JsonValidationException
@Test
@DisplayName("Test that we will wait the required amount of time with a BASIC_SCHEDULE type on a subsequent run")
void testBasicScheduleSubsequentRun() throws IOException, JsonValidationException, ConfigNotFoundException {
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3);
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi);

when(mJob.getStartedAtInSecond())
.thenReturn(Optional.of(60L));
Expand Down Expand Up @@ -279,7 +285,7 @@ void testCronScheduleSubsequentRun() throws IOException, JsonValidationException

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
() -> mockRightNow.getTimeInMillis() / 1000L);
() -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi);

when(mJobPersistence.getLastReplicationJob(connectionId))
.thenReturn(Optional.of(mJob));
Expand Down Expand Up @@ -308,7 +314,7 @@ void testCronScheduleMinimumInterval() throws IOException, JsonValidationExcepti

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
() -> mockRightNow.getTimeInMillis() / 1000L);
() -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi);

when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L));
when(mJobPersistence.getLastReplicationJob(connectionId))
Expand Down Expand Up @@ -338,7 +344,7 @@ void testCronSchedulingNoise() throws IOException, JsonValidationException, Conf

configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS,
() -> mockRightNow.getTimeInMillis() / 1000L);
() -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi);

when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L));
when(mJobPersistence.getLastReplicationJob(connectionId))
Expand All @@ -362,7 +368,8 @@ class TestGetMaxAttempt {
@DisplayName("Test that we are using to right service to get the maximum amount of attempt")
void testGetMaxAttempt() {
final int maxAttempt = 15031990;
configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, maxAttempt, () -> Instant.now().getEpochSecond());
configFetchActivity =
new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, maxAttempt, () -> Instant.now().getEpochSecond(), mConnectionApi);
Assertions.assertThat(configFetchActivity.getMaxAttempt().getMaxAttempt())
.isEqualTo(maxAttempt);
}
Expand Down