diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java index 0b4476d544cd..60eb6604e7bf 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/config/ApiClientBeanFactory.java @@ -11,6 +11,7 @@ import io.airbyte.api.client.AirbyteApiClient; import io.airbyte.api.client.generated.ConnectionApi; import io.airbyte.api.client.generated.SourceApi; +import io.airbyte.api.client.generated.WorkspaceApi; import io.airbyte.api.client.invoker.generated.ApiClient; import io.airbyte.commons.temporal.config.WorkerMode; import io.micronaut.context.BeanProvider; @@ -73,6 +74,11 @@ public ConnectionApi connectionApi(final ApiClient apiClient) { return new ConnectionApi(apiClient); } + @Singleton + public WorkspaceApi workspaceApi(final ApiClient apiClient) { + return new WorkspaceApi(apiClient); + } + @Singleton public HttpClient httpClient() { return HttpClient.newHttpClient(); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionNotificationWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionNotificationWorkflowImpl.java index 3a233a340168..fb16b41346e2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionNotificationWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/ConnectionNotificationWorkflowImpl.java @@ -9,7 +9,6 @@ import io.airbyte.config.Notification; import io.airbyte.config.Notification.NotificationType; import io.airbyte.config.SlackNotificationConfiguration; -import io.airbyte.config.StandardSync; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.notification.SlackNotificationClient; import io.airbyte.validation.json.JsonValidationException; @@ -17,6 +16,7 @@ import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity; import io.airbyte.workers.temporal.scheduling.activities.NotifySchemaChangeActivity; import io.airbyte.workers.temporal.scheduling.activities.SlackConfigActivity; +import io.temporal.workflow.Workflow; import java.io.IOException; import java.util.Optional; import java.util.UUID; @@ -25,6 +25,9 @@ @Slf4j public class ConnectionNotificationWorkflowImpl implements ConnectionNotificationWorkflow { + private static final String GET_BREAKING_CHANGE_TAG = "get_breaking_change"; + private static final int GET_BREAKING_CHANGE_VERSION = 1; + @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") private NotifySchemaChangeActivity notifySchemaChangeActivity; @TemporalActivityStub(activityOptionsBeanName = "shortActivityOptions") @@ -35,14 +38,20 @@ public class ConnectionNotificationWorkflowImpl implements ConnectionNotificatio @Override public boolean sendSchemaChangeNotification(final UUID connectionId) throws IOException, InterruptedException, ApiException, ConfigNotFoundException, JsonValidationException { - final StandardSync standardSync = configFetchActivity.getStandardSync(connectionId); - final Optional slackConfig = slackConfigActivity.fetchSlackConfiguration(connectionId); - if (slackConfig.isPresent()) { - final Notification notification = - new Notification().withNotificationType(NotificationType.SLACK).withSendOnFailure(false).withSendOnSuccess(false) - .withSlackConfiguration(slackConfig.get()); - final SlackNotificationClient notificationClient = new SlackNotificationClient(notification); - return notifySchemaChangeActivity.notifySchemaChange(notificationClient, connectionId, standardSync.getBreakingChange()); + final int getBreakingChangeVersion = + Workflow.getVersion(GET_BREAKING_CHANGE_TAG, Workflow.DEFAULT_VERSION, GET_BREAKING_CHANGE_VERSION); + if (getBreakingChangeVersion >= GET_BREAKING_CHANGE_VERSION) { + final Optional breakingChange = configFetchActivity.getBreakingChange(connectionId); + final Optional slackConfig = slackConfigActivity.fetchSlackConfiguration(connectionId); + if (slackConfig.isPresent() && breakingChange.isPresent()) { + final Notification notification = + new Notification().withNotificationType(NotificationType.SLACK).withSendOnFailure(false).withSendOnSuccess(false) + .withSlackConfiguration(slackConfig.get()); + final SlackNotificationClient notificationClient = new SlackNotificationClient(notification); + return notifySchemaChangeActivity.notifySchemaChange(notificationClient, connectionId, breakingChange.get()); + } else { + return false; + } } else { return false; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivity.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivity.java index 214cb323cd2e..6ad9c9c0f5f2 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivity.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivity.java @@ -5,12 +5,8 @@ package io.airbyte.workers.temporal.scheduling.activities; import io.airbyte.api.client.model.generated.ConnectionStatus; -import io.airbyte.config.StandardSync; -import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.validation.json.JsonValidationException; import io.temporal.activity.ActivityInterface; import io.temporal.activity.ActivityMethod; -import java.io.IOException; import java.time.Duration; import java.util.Optional; import java.util.UUID; @@ -27,6 +23,9 @@ public interface ConfigFetchActivity { @ActivityMethod Optional getStatus(UUID connectionId); + @ActivityMethod + public Optional getBreakingChange(final UUID connectionId); + @Data @NoArgsConstructor @AllArgsConstructor @@ -45,8 +44,6 @@ class ScheduleRetrieverOutput { } - StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException; - /** * Return how much time to wait before running the next sync. It will query the DB to get the last * starting time of the latest terminal job (Failed, canceled or successful) and return the amount diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java index 7aff6ee2c3cd..36b046fa4f82 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityImpl.java @@ -10,6 +10,7 @@ import com.google.common.annotations.VisibleForTesting; import datadog.trace.api.Trace; import io.airbyte.api.client.generated.ConnectionApi; +import io.airbyte.api.client.generated.WorkspaceApi; import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; import io.airbyte.api.client.model.generated.ConnectionRead; @@ -19,16 +20,12 @@ import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron; import io.airbyte.api.client.model.generated.ConnectionScheduleType; import io.airbyte.api.client.model.generated.ConnectionStatus; +import io.airbyte.api.client.model.generated.WorkspaceRead; import io.airbyte.commons.temporal.config.WorkerMode; import io.airbyte.commons.temporal.exception.RetryableException; -import io.airbyte.config.StandardSync; -import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.metrics.lib.ApmTraceUtils; import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.persistence.job.WorkspaceHelper; import io.airbyte.persistence.job.models.Job; -import io.airbyte.validation.json.JsonValidationException; import io.micronaut.context.annotation.Requires; import io.micronaut.context.annotation.Value; import jakarta.inject.Named; @@ -67,43 +64,25 @@ public class ConfigFetchActivityImpl implements ConfigFetchActivity { UUID.fromString("226edbc1-4a9c-4401-95a9-90435d667d9d")); private static final long SCHEDULING_NOISE_CONSTANT = 15; - private final ConfigRepository configRepository; private final JobPersistence jobPersistence; - private final WorkspaceHelper workspaceHelper; + private final WorkspaceApi workspaceApi; private final Integer syncJobMaxAttempts; private final Supplier currentSecondsSupplier; private final ConnectionApi connectionApi; - public ConfigFetchActivityImpl(final ConfigRepository configRepository, - final JobPersistence jobPersistence, - @Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts, - @Named("currentSecondsSupplier") final Supplier currentSecondsSupplier, - final ConnectionApi connectionApi) { - this(configRepository, jobPersistence, new WorkspaceHelper(configRepository, jobPersistence), syncJobMaxAttempts, currentSecondsSupplier, - connectionApi); - } - @VisibleForTesting - protected ConfigFetchActivityImpl(final ConfigRepository configRepository, - final JobPersistence jobPersistence, - final WorkspaceHelper workspaceHelper, + protected ConfigFetchActivityImpl(final JobPersistence jobPersistence, + final WorkspaceApi workspaceApi, @Value("${airbyte.worker.sync.max-attempts}") final Integer syncJobMaxAttempts, @Named("currentSecondsSupplier") final Supplier currentSecondsSupplier, final ConnectionApi connectionApi) { - this.configRepository = configRepository; this.jobPersistence = jobPersistence; - this.workspaceHelper = workspaceHelper; + this.workspaceApi = workspaceApi; this.syncJobMaxAttempts = syncJobMaxAttempts; this.currentSecondsSupplier = currentSecondsSupplier; this.connectionApi = connectionApi; } - @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) - @Override - public StandardSync getStandardSync(final UUID connectionId) throws JsonValidationException, ConfigNotFoundException, IOException { - return configRepository.getStandardSync(connectionId); - } - @Trace(operationName = ACTIVITY_TRACE_OPERATION_NAME) @Override public ScheduleRetrieverOutput getTimeToWait(final ScheduleRetrieverInput input) { @@ -176,10 +155,12 @@ private ScheduleRetrieverOutput getTimeToWaitFromScheduleType(final ConnectionRe } private Duration addSchedulingNoiseForAllowListedWorkspace(Duration timeToWait, ConnectionRead connectionRead) { - final UUID workspaceId; + UUID workspaceId; try { - workspaceId = workspaceHelper.getWorkspaceForConnectionId(connectionRead.getConnectionId()); - } catch (JsonValidationException | ConfigNotFoundException e) { + ConnectionIdRequestBody connectionIdRequestBody = new ConnectionIdRequestBody().connectionId(connectionRead.getConnectionId()); + final WorkspaceRead workspaceRead = workspaceApi.getWorkspaceByConnectionId(connectionIdRequestBody); + workspaceId = workspaceRead.getWorkspaceId(); + } catch (ApiException e) { // We tolerate exceptions and fail open by doing nothing. return timeToWait; } @@ -264,6 +245,19 @@ public Optional getStatus(final UUID connectionId) { } } + @Override + public Optional getBreakingChange(final UUID connectionId) { + try { + final io.airbyte.api.client.model.generated.ConnectionIdRequestBody requestBody = + new io.airbyte.api.client.model.generated.ConnectionIdRequestBody().connectionId(connectionId); + final ConnectionRead connectionRead = connectionApi.getConnection(requestBody); + return Optional.ofNullable(connectionRead.getBreakingChange()); + } catch (ApiException e) { + log.info("Encountered an error fetching the connection's breaking change status: ", e); + return Optional.empty(); + } + } + private Long getIntervalInSecond(final ConnectionScheduleDataBasicSchedule schedule) { return getSecondsInUnit(schedule.getTimeUnit()) * schedule.getUnits(); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionNotificationWorkflowTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionNotificationWorkflowTest.java index 97b2dd5a416a..c6fa40328443 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionNotificationWorkflowTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/ConnectionNotificationWorkflowTest.java @@ -13,7 +13,6 @@ import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.commons.temporal.scheduling.ConnectionNotificationWorkflow; import io.airbyte.config.SlackNotificationConfiguration; -import io.airbyte.config.StandardSync; import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.notification.SlackNotificationClient; import io.airbyte.validation.json.JsonValidationException; @@ -102,7 +101,7 @@ void sendSchemaChangeNotificationNonBreakingChangeTest() final UUID connectionId = UUID.randomUUID(); - when(mConfigFetchActivity.getStandardSync(connectionId)).thenReturn(new StandardSync().withBreakingChange(false)); + when(mConfigFetchActivity.getBreakingChange(connectionId)).thenReturn(Optional.of(false)); workflow.sendSchemaChangeNotification(connectionId); verify(mNotifySchemaChangeActivity, times(1)).notifySchemaChange(any(SlackNotificationClient.class), any(UUID.class), any(boolean.class)); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java index e6cb60e04d4a..320df0ac0af4 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/temporal/scheduling/activities/ConfigFetchActivityTest.java @@ -8,6 +8,7 @@ import static org.mockito.Mockito.when; import io.airbyte.api.client.generated.ConnectionApi; +import io.airbyte.api.client.generated.WorkspaceApi; import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.ConnectionRead; import io.airbyte.api.client.model.generated.ConnectionSchedule; @@ -17,10 +18,9 @@ import io.airbyte.api.client.model.generated.ConnectionScheduleDataCron; import io.airbyte.api.client.model.generated.ConnectionScheduleType; import io.airbyte.api.client.model.generated.ConnectionStatus; +import io.airbyte.api.client.model.generated.WorkspaceRead; import io.airbyte.config.persistence.ConfigNotFoundException; -import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.persistence.job.JobPersistence; -import io.airbyte.persistence.job.WorkspaceHelper; import io.airbyte.persistence.job.models.Job; import io.airbyte.validation.json.JsonValidationException; import io.airbyte.workers.temporal.scheduling.activities.ConfigFetchActivity.ScheduleRetrieverInput; @@ -46,15 +46,11 @@ class ConfigFetchActivityTest { private static final Integer SYNC_JOB_MAX_ATTEMPTS = 3; - @Mock - private ConfigRepository mConfigRepository; - @Mock private JobPersistence mJobPersistence; @Mock - private WorkspaceHelper mWorkspaceHelper; - + private WorkspaceApi mWorkspaceApi; @Mock private Job mJob; @@ -107,7 +103,7 @@ class ConfigFetchActivityTest { @BeforeEach void setup() { configFetchActivity = - new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS, + new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> Instant.now().getEpochSecond(), mConnectionApi); } @@ -177,7 +173,7 @@ void testDeleted() throws ApiException { @DisplayName("Test we will wait the required amount of time with legacy config") void testWait() throws IOException, JsonValidationException, ConfigNotFoundException, ApiException { configFetchActivity = - new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi); + new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi); when(mJob.getStartedAtInSecond()) .thenReturn(Optional.of(60L)); @@ -200,7 +196,7 @@ void testWait() throws IOException, JsonValidationException, ConfigNotFoundExcep @DisplayName("Test we will not wait if we are late in the legacy schedule schema") void testNotWaitIfLate() throws IOException, ApiException { configFetchActivity = - new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10, mConnectionApi); + new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 10, mConnectionApi); when(mJob.getStartedAtInSecond()) .thenReturn(Optional.of(60L)); @@ -255,7 +251,7 @@ void testBasicScheduleTypeFirstRun() throws IOException, ApiException { @Test @DisplayName("Test that we will wait the required amount of time with a BASIC_SCHEDULE type on a subsequent run") void testBasicScheduleSubsequentRun() throws IOException, ApiException { - configFetchActivity = new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi); + configFetchActivity = new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> 60L * 3, mConnectionApi); when(mJob.getStartedAtInSecond()) .thenReturn(Optional.of(60L)); @@ -283,10 +279,10 @@ void testCronScheduleSubsequentRun() throws IOException, JsonValidationException mockRightNow.set(Calendar.SECOND, 0); mockRightNow.set(Calendar.MILLISECOND, 0); - when(mWorkspaceHelper.getWorkspaceForConnectionId(any())).thenReturn(UUID.randomUUID()); + when(mWorkspaceApi.getWorkspaceByConnectionId(any())).thenReturn(new WorkspaceRead().workspaceId(UUID.randomUUID())); configFetchActivity = - new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS, + new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi); when(mJobPersistence.getLastReplicationJob(connectionId)) @@ -312,10 +308,10 @@ void testCronScheduleMinimumInterval() throws IOException, JsonValidationExcepti mockRightNow.set(Calendar.SECOND, 0); mockRightNow.set(Calendar.MILLISECOND, 0); - when(mWorkspaceHelper.getWorkspaceForConnectionId(any())).thenReturn(UUID.randomUUID()); + when(mWorkspaceApi.getWorkspaceByConnectionId(any())).thenReturn(new WorkspaceRead().workspaceId(UUID.randomUUID())); configFetchActivity = - new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS, + new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi); when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L)); @@ -342,10 +338,11 @@ void testCronSchedulingNoise() throws IOException, JsonValidationException, Conf mockRightNow.set(Calendar.SECOND, 0); mockRightNow.set(Calendar.MILLISECOND, 0); - when(mWorkspaceHelper.getWorkspaceForConnectionId(any())).thenReturn(UUID.fromString("226edbc1-4a9c-4401-95a9-90435d667d9d")); + when(mWorkspaceApi.getWorkspaceByConnectionId(any())) + .thenReturn(new WorkspaceRead().workspaceId(UUID.fromString("226edbc1-4a9c-4401-95a9-90435d667d9d"))); configFetchActivity = - new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, mWorkspaceHelper, SYNC_JOB_MAX_ATTEMPTS, + new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, SYNC_JOB_MAX_ATTEMPTS, () -> mockRightNow.getTimeInMillis() / 1000L, mConnectionApi); when(mJob.getStartedAtInSecond()).thenReturn(Optional.of(mockRightNow.getTimeInMillis() / 1000L)); @@ -371,7 +368,7 @@ class TestGetMaxAttempt { void testGetMaxAttempt() { final int maxAttempt = 15031990; configFetchActivity = - new ConfigFetchActivityImpl(mConfigRepository, mJobPersistence, maxAttempt, () -> Instant.now().getEpochSecond(), mConnectionApi); + new ConfigFetchActivityImpl(mJobPersistence, mWorkspaceApi, maxAttempt, () -> Instant.now().getEpochSecond(), mConnectionApi); Assertions.assertThat(configFetchActivity.getMaxAttempt().getMaxAttempt()) .isEqualTo(maxAttempt); }