Skip to content

Commit

Permalink
Persist state to reset connection job config (airbytehq#13867)
Browse files Browse the repository at this point in the history
* add current connection state to JobResetConnectionConfig

* pass state from reset connection config to job sync config in generate input activity

* format
  • Loading branch information
lmossman authored Jun 17, 2022
1 parent 9d060a9 commit 3bab4c9
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@ properties:
existingJavaType: io.airbyte.config.ResourceRequirements
resetSourceConfiguration:
"$ref": ResetSourceConfiguration.yaml
state:
description: optional current state of the connection
"$ref": State.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.State;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
Expand All @@ -24,6 +25,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import javax.annotation.Nullable;

public class DefaultJobCreator implements JobCreator {
Expand Down Expand Up @@ -76,7 +78,7 @@ public Optional<Long> createSyncJob(final SourceConnection source,
workerResourceRequirements,
JobType.SYNC));

configRepository.getConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState);
getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState);

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.SYNC)
Expand Down Expand Up @@ -116,10 +118,18 @@ public Optional<Long> createResetConnectionJob(final DestinationConnection desti
workerResourceRequirements))
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(streamsToReset));

getCurrentConnectionState(standardSync.getConnectionId()).ifPresent(resetConnectionConfig::withState);

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
.withResetConnection(resetConnectionConfig);
return jobPersistence.enqueueJob(standardSync.getConnectionId().toString(), jobConfig);
}

// TODO (https://github.com/airbytehq/airbyte/issues/13620): update this method implementation
// to fetch and serialize the new per-stream state format into a State object
private Optional<State> getCurrentConnectionState(final UUID connectionId) throws IOException {
return configRepository.getConnectionState(connectionId);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.State;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.CatalogHelpers;
Expand All @@ -42,6 +43,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -333,7 +335,10 @@ void testCreateResetConnectionJob() throws IOException {
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
});

final JobResetConnectionConfig JobResetConnectionConfig = new JobResetConnectionConfig()
final State connectionState = new State().withState(Jsons.jsonNode(Map.of("key", "val")));
when(configRepository.getConnectionState(STANDARD_SYNC.getConnectionId())).thenReturn(Optional.of(connectionState));

final JobResetConnectionConfig jobResetConnectionConfig = new JobResetConnectionConfig()
.withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition())
.withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat())
.withPrefix(STANDARD_SYNC.getPrefix())
Expand All @@ -342,22 +347,26 @@ void testCreateResetConnectionJob() throws IOException {
.withConfiguredAirbyteCatalog(expectedCatalog)
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements)
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)));
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)))
.withState(connectionState);

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
.withResetConnection(JobResetConnectionConfig);
.withResetConnection(jobResetConnectionConfig);

final String expectedScope = STANDARD_SYNC.getConnectionId().toString();
when(jobPersistence.enqueueJob(expectedScope, jobConfig)).thenReturn(Optional.of(JOB_ID));

final long jobId = jobCreator.createResetConnectionJob(
final Optional<Long> jobId = jobCreator.createResetConnectionJob(
DESTINATION_CONNECTION,
STANDARD_SYNC,
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)).orElseThrow();
assertEquals(JOB_ID, jobId);
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2));

verify(jobPersistence).enqueueJob(expectedScope, jobConfig);
assertTrue(jobId.isPresent());
assertEquals(JOB_ID, jobId.get());
}

@Test
Expand All @@ -369,7 +378,10 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
configuredAirbyteStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
});

final JobResetConnectionConfig JobResetConnectionConfig = new JobResetConnectionConfig()
final State connectionState = new State().withState(Jsons.jsonNode(Map.of("key", "val")));
when(configRepository.getConnectionState(STANDARD_SYNC.getConnectionId())).thenReturn(Optional.of(connectionState));

final JobResetConnectionConfig jobResetConnectionConfig = new JobResetConnectionConfig()
.withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition())
.withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat())
.withPrefix(STANDARD_SYNC.getPrefix())
Expand All @@ -378,21 +390,25 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
.withConfiguredAirbyteCatalog(expectedCatalog)
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements)
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)));
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)))
.withState(connectionState);

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
.withResetConnection(JobResetConnectionConfig);
.withResetConnection(jobResetConnectionConfig);

final String expectedScope = STANDARD_SYNC.getConnectionId().toString();
when(jobPersistence.enqueueJob(expectedScope, jobConfig)).thenReturn(Optional.empty());

assertTrue(jobCreator.createResetConnectionJob(
final Optional<Long> jobId = jobCreator.createResetConnectionJob(
DESTINATION_CONNECTION,
STANDARD_SYNC,
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION),
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)).isEmpty());
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2));

verify(jobPersistence).enqueueJob(expectedScope, jobConfig);
assertTrue(jobId.isEmpty());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) {
.withDestinationConfiguration(resetConnection.getDestinationConfiguration())
.withConfiguredAirbyteCatalog(resetConnection.getConfiguredAirbyteCatalog())
.withOperationSequence(resetConnection.getOperationSequence())
.withResourceRequirements(resetConnection.getResourceRequirements());
.withResourceRequirements(resetConnection.getResourceRequirements())
.withState(resetConnection.getState());
}

final JobRunConfig jobRunConfig = TemporalUtils.createJobRunConfig(jobId, attempt);
Expand Down

0 comments on commit 3bab4c9

Please sign in to comment.