Skip to content

Commit

Permalink
Save streams to reset in job config when creating reset job (#13703)
Browse files Browse the repository at this point in the history
* save streams to reset in job config when creating reset job

* change streamDescriptors to streamsToReset
  • Loading branch information
lmossman authored Jun 13, 2022
1 parent 287b5bf commit e7f8128
Show file tree
Hide file tree
Showing 7 changed files with 80 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ description: configuration of the reset source
type: object
additionalProperties: true
required:
- streamDescriptors
- streamsToReset
properties:
streamDescriptors:
streamsToReset:
type: array
items:
"$ref": StreamDescriptor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
import io.airbyte.config.JobResetConnectionConfig;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.JobTypeResourceLimit.JobType;
import io.airbyte.config.ResetSourceConfiguration;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.DestinationSyncMode;
Expand Down Expand Up @@ -93,7 +95,8 @@ public Optional<Long> createSyncJob(final SourceConnection source,
public Optional<Long> createResetConnectionJob(final DestinationConnection destination,
final StandardSync standardSync,
final String destinationDockerImage,
final List<StandardSyncOperation> standardSyncOperations)
final List<StandardSyncOperation> standardSyncOperations,
final List<StreamDescriptor> streamsToReset)
throws IOException {
final ConfiguredAirbyteCatalog configuredAirbyteCatalog = standardSync.getCatalog();
configuredAirbyteCatalog.getStreams().forEach(configuredAirbyteStream -> {
Expand All @@ -110,7 +113,8 @@ public Optional<Long> createResetConnectionJob(final DestinationConnection desti
.withConfiguredAirbyteCatalog(configuredAirbyteCatalog)
.withResourceRequirements(ResourceRequirementsUtils.getResourceRequirements(
standardSync.getResourceRequirements(),
workerResourceRequirements));
workerResourceRequirements))
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(streamsToReset));

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StreamDescriptor;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -41,13 +42,15 @@ Optional<Long> createSyncJob(SourceConnection source,
* @param destination db model representing where data goes
* @param standardSync sync options
* @param destinationDockerImage docker image to use for the destination
* @param streamsToReset
* @return the new job if no other conflicting job was running, otherwise empty
* @throws IOException if something wrong happens
*/
Optional<Long> createResetConnectionJob(DestinationConnection destination,
StandardSync standardSync,
String destinationDockerImage,
List<StandardSyncOperation> standardSyncOperations)
List<StandardSyncOperation> standardSyncOperations,
List<StreamDescriptor> streamsToReset)
throws IOException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import io.airbyte.config.JobTypeResourceLimit.JobType;
import io.airbyte.config.OperatorNormalization;
import io.airbyte.config.OperatorNormalization.Option;
import io.airbyte.config.ResetSourceConfiguration;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StandardSyncOperation.OperatorType;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -57,6 +59,8 @@ public class DefaultJobCreatorTest {
private static final StandardSync STANDARD_SYNC;
private static final StandardSyncOperation STANDARD_SYNC_OPERATION;
private static final long JOB_ID = 12L;
private static final StreamDescriptor STREAM_DESCRIPTOR1 = new StreamDescriptor().withName("stream 1").withNamespace("namespace 1");
private static final StreamDescriptor STREAM_DESCRIPTOR2 = new StreamDescriptor().withName("stream 2").withNamespace("namespace 2");

private JobPersistence jobPersistence;
private ConfigRepository configRepository;
Expand Down Expand Up @@ -337,7 +341,8 @@ void testCreateResetConnectionJob() throws IOException {
.withDestinationDockerImage(DESTINATION_IMAGE_NAME)
.withConfiguredAirbyteCatalog(expectedCatalog)
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements);
.withResourceRequirements(workerResourceRequirements)
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)));

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
Expand All @@ -350,7 +355,8 @@ void testCreateResetConnectionJob() throws IOException {
DESTINATION_CONNECTION,
STANDARD_SYNC,
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION)).orElseThrow();
List.of(STANDARD_SYNC_OPERATION),
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)).orElseThrow();
assertEquals(JOB_ID, jobId);
}

Expand All @@ -371,7 +377,8 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
.withDestinationDockerImage(DESTINATION_IMAGE_NAME)
.withConfiguredAirbyteCatalog(expectedCatalog)
.withOperationSequence(List.of(STANDARD_SYNC_OPERATION))
.withResourceRequirements(workerResourceRequirements);
.withResourceRequirements(workerResourceRequirements)
.withResetSourceConfiguration(new ResetSourceConfiguration().withStreamsToReset(List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)));

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
Expand All @@ -384,7 +391,8 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException {
DESTINATION_CONNECTION,
STANDARD_SYNC,
DESTINATION_IMAGE_NAME,
List.of(STANDARD_SYNC_OPERATION)).isEmpty());
List.of(STANDARD_SYNC_OPERATION),
List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2)).isEmpty());
}

}
10 changes: 8 additions & 2 deletions airbyte-workers/src/main/java/io/airbyte/workers/WorkerApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.airbyte.config.persistence.ConfigPersistence;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.DatabaseConfigPersistence;
import io.airbyte.config.persistence.StreamResetPersistence;
import io.airbyte.config.persistence.split_secrets.JsonSecretsProcessor;
import io.airbyte.config.persistence.split_secrets.SecretPersistence;
import io.airbyte.config.persistence.split_secrets.SecretsHydrator;
Expand Down Expand Up @@ -133,6 +134,7 @@ public class WorkerApp {
private final Optional<ContainerOrchestratorConfig> containerOrchestratorConfig;
private final JobNotifier jobNotifier;
private final JobTracker jobTracker;
private final StreamResetPersistence streamResetPersistence;

public void start() {
final Map<String, String> mdc = MDC.getCopyOfContextMap();
Expand Down Expand Up @@ -190,7 +192,8 @@ private void registerConnectionManager(final WorkerFactory factory) {
jobNotifier,
jobTracker,
configRepository,
jobCreator),
jobCreator,
streamResetPersistence),
new ConfigFetchActivityImpl(configRepository, jobPersistence, configs, () -> Instant.now().getEpochSecond()),
new ConnectionDeletionActivityImpl(connectionHelper),
new CheckConnectionActivityImpl(
Expand Down Expand Up @@ -436,6 +439,8 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf

final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient);

final StreamResetPersistence streamResetPersistence = new StreamResetPersistence(configDatabase);

new WorkerApp(
workspaceRoot,
defaultProcessFactory,
Expand All @@ -462,7 +467,8 @@ private static void launchWorkerApp(final Configs configs, final DSLContext conf
connectionHelper,
containerOrchestratorConfig,
jobNotifier,
jobTracker).start();
jobTracker,
streamResetPersistence).start();
}

public static void main(final String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.StreamResetPersistence;
import io.airbyte.db.instance.configs.jooq.generated.enums.ReleaseStage;
import io.airbyte.metrics.lib.MetricClientFactory;
import io.airbyte.metrics.lib.MetricTags;
Expand Down Expand Up @@ -58,6 +60,7 @@ public class JobCreationAndStatusUpdateActivityImpl implements JobCreationAndSta
private final JobTracker jobTracker;
private final ConfigRepository configRepository;
private final JobCreator jobCreator;
private final StreamResetPersistence streamResetPersistence;

@Override
public JobCreationOutput createNewJob(final JobCreationInput input) {
Expand All @@ -83,8 +86,9 @@ public JobCreationOutput createNewJob(final JobCreationInput input) {
standardSyncOperations.add(standardSyncOperation);
}

final List<StreamDescriptor> streamsToReset = streamResetPersistence.getStreamResets(input.getConnectionId());
final Optional<Long> jobIdOptional =
jobCreator.createResetConnectionJob(destination, standardSync, destinationImageName, standardSyncOperations);
jobCreator.createResetConnectionJob(destination, standardSync, destinationImageName, standardSyncOperations, streamsToReset);

final long jobId = jobIdOptional.isEmpty()
? jobPersistence.getLastReplicationJob(standardSync.getConnectionId()).orElseThrow(() -> new RuntimeException("No job available")).getId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,32 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;

import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.config.AttemptFailureSummary;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.FailureReason;
import io.airbyte.config.FailureReason.FailureOrigin;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobOutput;
import io.airbyte.config.NormalizationSummary;
import io.airbyte.config.StandardDestinationDefinition;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOutput;
import io.airbyte.config.StandardSyncSummary;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.config.StreamDescriptor;
import io.airbyte.config.helpers.LogClientSingleton;
import io.airbyte.config.helpers.LogConfigs;
import io.airbyte.config.persistence.ConfigNotFoundException;
import io.airbyte.config.persistence.ConfigRepository;
import io.airbyte.config.persistence.StreamResetPersistence;
import io.airbyte.scheduler.models.Attempt;
import io.airbyte.scheduler.models.AttemptStatus;
import io.airbyte.scheduler.models.Job;
import io.airbyte.scheduler.models.JobStatus;
import io.airbyte.scheduler.persistence.JobCreator;
import io.airbyte.scheduler.persistence.JobNotifier;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory;
Expand All @@ -50,6 +56,7 @@
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.DisplayName;
Expand Down Expand Up @@ -90,13 +97,27 @@ public class JobCreationAndStatusUpdateActivityTest {
@Mock
private ConfigRepository mConfigRepository;

@Mock
private JobCreator mJobCreator;

@Mock
private StreamResetPersistence mStreamResetPersistence;

@InjectMocks
private JobCreationAndStatusUpdateActivityImpl jobCreationAndStatusUpdateActivity;

private static final UUID CONNECTION_ID = UUID.randomUUID();
private static final UUID DESTINATION_ID = UUID.randomUUID();
private static final UUID DESTINATION_DEFINITION_ID = UUID.randomUUID();
private static final String DOCKER_REPOSITORY = "docker-repo";
private static final String DOCKER_IMAGE_TAG = "0.0.1";
private static final String DOCKER_IMAGE_NAME = DockerUtils.getTaggedImageName(DOCKER_REPOSITORY, DOCKER_IMAGE_TAG);
private static final long JOB_ID = 123L;
private static final int ATTEMPT_ID = 0;
private static final int ATTEMPT_NUMBER = 1;
private static final StreamDescriptor STREAM_DESCRIPTOR1 = new StreamDescriptor().withName("stream 1").withNamespace("namespace 1");
private static final StreamDescriptor STREAM_DESCRIPTOR2 = new StreamDescriptor().withName("stream 2").withNamespace("namespace 2");

private static final StandardSyncOutput standardSyncOutput = new StandardSyncOutput()
.withStandardSyncSummary(
new StandardSyncSummary()
Expand Down Expand Up @@ -127,6 +148,28 @@ public void createJob() throws JsonValidationException, ConfigNotFoundException,
Assertions.assertThat(output.getJobId()).isEqualTo(JOB_ID);
}

@Test
@DisplayName("Test reset job creation")
public void createResetJob() throws JsonValidationException, ConfigNotFoundException, IOException {
final StandardSync standardSync = new StandardSync().withDestinationId(DESTINATION_ID);
Mockito.when(mConfigRepository.getStandardSync(CONNECTION_ID)).thenReturn(standardSync);
final DestinationConnection destination = new DestinationConnection().withDestinationDefinitionId(DESTINATION_DEFINITION_ID);
Mockito.when(mConfigRepository.getDestinationConnection(DESTINATION_ID)).thenReturn(destination);
final StandardDestinationDefinition destinationDefinition = new StandardDestinationDefinition()
.withDockerRepository(DOCKER_REPOSITORY)
.withDockerImageTag(DOCKER_IMAGE_TAG);
Mockito.when(mConfigRepository.getStandardDestinationDefinition(DESTINATION_DEFINITION_ID)).thenReturn(destinationDefinition);
final List<StreamDescriptor> streamsToReset = List.of(STREAM_DESCRIPTOR1, STREAM_DESCRIPTOR2);
Mockito.when(mStreamResetPersistence.getStreamResets(CONNECTION_ID)).thenReturn(streamsToReset);

Mockito.when(mJobCreator.createResetConnectionJob(destination, standardSync, DOCKER_IMAGE_NAME, List.of(), streamsToReset))
.thenReturn(Optional.of(JOB_ID));

final JobCreationOutput output = jobCreationAndStatusUpdateActivity.createNewJob(new JobCreationInput(CONNECTION_ID, true));

Assertions.assertThat(output.getJobId()).isEqualTo(JOB_ID);
}

@Test
@DisplayName("Test attempt creation")
public void createAttempt() throws IOException {
Expand Down

0 comments on commit e7f8128

Please sign in to comment.