diff --git a/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml b/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml index ce65b2315dcb..32696ab6afb6 100644 --- a/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml +++ b/airbyte-config/config-models/src/main/resources/types/JobResetConnectionConfig.yaml @@ -41,3 +41,6 @@ properties: existingJavaType: io.airbyte.config.ResourceRequirements resetSourceConfiguration: "$ref": ResetSourceConfiguration.yaml + state: + description: optional current state of the connection + "$ref": State.yaml diff --git a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobCreator.java b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobCreator.java index a0b222d2f5df..70641433dfbf 100644 --- a/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobCreator.java +++ b/airbyte-scheduler/scheduler-persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobCreator.java @@ -16,6 +16,7 @@ import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncOperation; +import io.airbyte.config.State; import io.airbyte.config.StreamDescriptor; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; @@ -24,6 +25,7 @@ import java.io.IOException; import java.util.List; import java.util.Optional; +import java.util.UUID; import javax.annotation.Nullable; public class DefaultJobCreator implements JobCreator { @@ -76,7 +78,7 @@ public Optional createSyncJob(final SourceConnection source, workerResourceRequirements, JobType.SYNC)); - configRepository.getConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState); + getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState); final JobConfig jobConfig = new JobConfig() .withConfigType(ConfigType.SYNC) @@ -116,10 +118,18 @@ public Optional createResetConnectionJob(final DestinationConnection desti workerResourceRequirements)) .withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(streamsToReset)); + getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(resetConnectionConfig::withState); + final JobConfig jobConfig = new JobConfig() .withConfigType(ConfigType.RESET_CONNECTION) .withResetConnection(resetConnectionConfig); return jobPersistence.enqueueJob(standardSync.getConnectionId().toString(), jobConfig); } + // TODO (https://github.com/airbytehq/airbyte/issues/13620): update this method implementation + // to fetch and serialize the new per-stream state format into a State object + private Optional getCurrentConnectionState(final UUID connectionId) throws IOException { + return configRepository.getConnectionState(connectionId); + } + } diff --git a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobCreatorTest.java b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobCreatorTest.java index 3e500d4f210c..1625aeb1ae16 100644 --- a/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobCreatorTest.java +++ b/airbyte-scheduler/scheduler-persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobCreatorTest.java @@ -31,6 +31,7 @@ import io.airbyte.config.StandardSync; import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.StandardSyncOperation.OperatorType; +import io.airbyte.config.State; import io.airbyte.config.StreamDescriptor; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.CatalogHelpers; @@ -42,6 +43,7 @@ import java.io.IOException; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.UUID; import org.junit.jupiter.api.BeforeEach; @@ -333,7 +335,10 @@ void testCreateResetConnectionJob() throws IOException { configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE); }); - final JobResetConnectionConfig JobResetConnectionConfig = new JobResetConnectionConfig() + final State connectionState = new State().withState(Jsons.jsonNode(Map.of("key", "val"))); + when(configRepository.getConnectionState(STANDARD_SYNC.getConnectionId())).thenReturn(Optional.of(connectionState)); + + final JobResetConnectionConfig jobResetConnectionConfig = new JobResetConnectionConfig() .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) .withPrefix(STANDARD_SYNC.getPrefix()) @@ -342,22 +347,26 @@ void testCreateResetConnectionJob() throws IOException { .withConfiguredAirbyteCatalog(expectedCatalog) .withOperationSequence(List.of(STANDARD_SYNC_OPERATION)) .withResourceRequirements(workerResourceRequirements) - .withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2))); + .withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2))) + .withState(connectionState); final JobConfig jobConfig = new JobConfig() .withConfigType(ConfigType.RESET_CONNECTION) - .withResetConnection(JobResetConnectionConfig); + .withResetConnection(jobResetConnectionConfig); final String expectedScope = STANDARD_SYNC.getConnectionId().toString(); when(jobPersistence.enqueueJob(expectedScope, jobConfig)).thenReturn(Optional.of(JOB_ID)); - final long jobId = jobCreator.createResetConnectionJob( + final Optional jobId = jobCreator.createResetConnectionJob( DESTINATION_CONNECTION, STANDARD_SYNC, DESTINATION_IMAGE_NAME, List.of(STANDARD_SYNC_OPERATION), - List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)).orElseThrow(); - assertEquals(JOB_ID, jobId); + List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)); + + verify(jobPersistence).enqueueJob(expectedScope, jobConfig); + assertTrue(jobId.isPresent()); + assertEquals(JOB_ID, jobId.get()); } @Test @@ -369,7 +378,10 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException { configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE); }); - final JobResetConnectionConfig JobResetConnectionConfig = new JobResetConnectionConfig() + final State connectionState = new State().withState(Jsons.jsonNode(Map.of("key", "val"))); + when(configRepository.getConnectionState(STANDARD_SYNC.getConnectionId())).thenReturn(Optional.of(connectionState)); + + final JobResetConnectionConfig jobResetConnectionConfig = new JobResetConnectionConfig() .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) .withPrefix(STANDARD_SYNC.getPrefix()) @@ -378,21 +390,25 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException { .withConfiguredAirbyteCatalog(expectedCatalog) .withOperationSequence(List.of(STANDARD_SYNC_OPERATION)) .withResourceRequirements(workerResourceRequirements) - .withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2))); + .withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2))) + .withState(connectionState); final JobConfig jobConfig = new JobConfig() .withConfigType(ConfigType.RESET_CONNECTION) - .withResetConnection(JobResetConnectionConfig); + .withResetConnection(jobResetConnectionConfig); final String expectedScope = STANDARD_SYNC.getConnectionId().toString(); when(jobPersistence.enqueueJob(expectedScope, jobConfig)).thenReturn(Optional.empty()); - assertTrue(jobCreator.createResetConnectionJob( + final Optional jobId = jobCreator.createResetConnectionJob( DESTINATION_CONNECTION, STANDARD_SYNC, DESTINATION_IMAGE_NAME, List.of(STANDARD_SYNC_OPERATION), - List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)).isEmpty()); + List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)); + + verify(jobPersistence).enqueueJob(expectedScope, jobConfig); + assertTrue(jobId.isEmpty()); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 2bf47602526c..9dece60e12ed 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -45,7 +45,8 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withDestinationConfiguration(resetConnection.getDestinationConfiguration()) .withConfiguredAirbyteCatalog(resetConnection.getConfiguredAirbyteCatalog()) .withOperationSequence(resetConnection.getOperationSequence()) - .withResourceRequirements(resetConnection.getResourceRequirements()); + .withResourceRequirements(resetConnection.getResourceRequirements()) + .withState(resetConnection.getState()); } final JobRunConfig jobRunConfig = TemporalUtils.createJobRunConfig(jobId, attempt);