From 1242aa8b4aabfabcb28250aa5c20dec769068af7 Mon Sep 17 00:00:00 2001 From: Charles Date: Fri, 25 Feb 2022 10:00:30 -0800 Subject: [PATCH] Set resource limits for connector definitions: expose in worker (#10483) * pipe through to worker * wip * pass source and dest def resource reqs to job client * fix test * use resource requirements utils to get resource reqs for legacy and new impls * undo changes to pass sync input to container launcher worker factory * remove import * fix hierarchy order of resource requirements * add nullable annotations * undo change to test * format * use destination resource reqs for normalization and make resource req utils more flexible * format * refactor resource requirements utils and add tests * switch to storing source/dest resource requirements directly on job sync config * fix tests and javadocs * use sync input resource requirements for container orchestrator pod * do not set connection resource reqs to worker reqs * add overrident requirement utils method + test + comment Co-authored-by: lmossman --- .../src/main/resources/types/ActorType.yaml | 9 ++ .../main/resources/types/JobSyncConfig.yaml | 10 +- .../resources/types/StandardSyncInput.yaml | 13 +- .../ReplicationJobOrchestrator.java | 4 +- .../client/DefaultSchedulerJobClient.java | 10 +- .../scheduler/client/SchedulerJobClient.java | 6 +- .../client/DefaultSchedulerJobClientTest.java | 19 ++- .../persistence/DefaultJobCreator.java | 47 +++--- .../scheduler/persistence/JobCreator.java | 6 +- .../ResourceRequirementsUtils.java | 124 +++++++++++++++ .../job_factory/DefaultSyncJobFactory.java | 11 +- .../persistence/DefaultJobCreatorTest.java | 141 +++++++++++++++++- .../ResourceRequirementsUtilsTest.java | 124 +++++++++++++++ .../DefaultSyncJobFactoryTest.java | 4 +- .../server/handlers/ConnectionsHandler.java | 2 - .../server/handlers/SchedulerHandler.java | 4 +- .../server/handlers/SchedulerHandlerTest.java | 20 ++- .../workers/temporal/TemporalClient.java | 8 +- .../activities/GenerateInputActivityImpl.java | 4 +- .../sync/ReplicationActivityImpl.java | 20 +-- .../sync/ReplicationLauncherWorker.java | 6 +- .../temporal/sync/SyncWorkflowImpl.java | 2 +- .../worker_run/TemporalWorkerRunFactory.java | 4 +- 23 files changed, 526 insertions(+), 72 deletions(-) create mode 100644 airbyte-config/models/src/main/resources/types/ActorType.yaml create mode 100644 airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/ResourceRequirementsUtils.java create mode 100644 airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/ResourceRequirementsUtilsTest.java diff --git a/airbyte-config/models/src/main/resources/types/ActorType.yaml b/airbyte-config/models/src/main/resources/types/ActorType.yaml new file mode 100644 index 000000000000..ec29b2606f87 --- /dev/null +++ b/airbyte-config/models/src/main/resources/types/ActorType.yaml @@ -0,0 +1,9 @@ +--- +"$schema": http://json-schema.org/draft-07/schema# +"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/ActorType.yaml +title: ActorType +description: enum that describes different types of actors +type: string +enum: + - source + - destination diff --git a/airbyte-config/models/src/main/resources/types/JobSyncConfig.yaml b/airbyte-config/models/src/main/resources/types/JobSyncConfig.yaml index 9ed4fca0fb7f..ea6535954378 100644 --- a/airbyte-config/models/src/main/resources/types/JobSyncConfig.yaml +++ b/airbyte-config/models/src/main/resources/types/JobSyncConfig.yaml @@ -39,6 +39,14 @@ properties: destinationDockerImage: description: Image name of the destination with tag. type: string + sourceResourceRequirements: + type: object + description: optional resource requirements to use in source container - this is used instead of `resourceRequirements` for the source container + existingJavaType: io.airbyte.config.ResourceRequirements + destinationResourceRequirements: + type: object + description: optional resource requirements to use in dest container - this is used instead of `resourceRequirements` for the dest container + existingJavaType: io.airbyte.config.ResourceRequirements operationSequence: description: Sequence of configurations of operations to apply as part of the sync type: array @@ -49,5 +57,5 @@ properties: "$ref": State.yaml resourceRequirements: type: object - description: optional resource requirements to run sync workers + description: optional resource requirements to run sync workers - this is used for containers other than the source/dest containers existingJavaType: io.airbyte.config.ResourceRequirements diff --git a/airbyte-config/models/src/main/resources/types/StandardSyncInput.yaml b/airbyte-config/models/src/main/resources/types/StandardSyncInput.yaml index b188c3d9676d..a41da2982aa9 100644 --- a/airbyte-config/models/src/main/resources/types/StandardSyncInput.yaml +++ b/airbyte-config/models/src/main/resources/types/StandardSyncInput.yaml @@ -35,11 +35,20 @@ properties: catalog: description: the configured airbyte catalog type: object + # necessary because the configuration declaration is in a separate package. existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog state: description: optional state of the previous run. this object is defined per integration. "$ref": State.yaml resourceRequirements: - description: optional resource requirements to run sync workers + description: optional resource requirements to run sync workers - this is used for containers other than the source/dest containers type: object - existingJavaType: io.airbyte.config.ResourceRequirements + "$ref": ResourceRequirements.yaml + sourceResourceRequirements: + description: optional resource requirements to use in source container - this is used instead of `resourceRequirements` for the source container + type: object + "$ref": ResourceRequirements.yaml + destinationResourceRequirements: + description: optional resource requirements to use in dest container - this is used instead of `resourceRequirements` for the dest container + type: object + "$ref": ResourceRequirements.yaml diff --git a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java index f7f515ce4d18..476df67e68e3 100644 --- a/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java +++ b/airbyte-container-orchestrator/src/main/java/io/airbyte/container_orchestrator/ReplicationJobOrchestrator.java @@ -72,7 +72,7 @@ public Optional runJob() throws Exception { Math.toIntExact(sourceLauncherConfig.getAttemptId()), sourceLauncherConfig.getDockerImage(), processFactory, - syncInput.getResourceRequirements()); + syncInput.getSourceResourceRequirements()); log.info("Setting up destination launcher..."); final IntegrationLauncher destinationLauncher = new AirbyteIntegrationLauncher( @@ -80,7 +80,7 @@ public Optional runJob() throws Exception { Math.toIntExact(destinationLauncherConfig.getAttemptId()), destinationLauncherConfig.getDockerImage(), processFactory, - syncInput.getResourceRequirements()); + syncInput.getDestinationResourceRequirements()); log.info("Setting up source..."); // reset jobs use an empty source to induce resetting all data in destination. diff --git a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSchedulerJobClient.java b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSchedulerJobClient.java index a8f7f2f18810..42e3d2d9823a 100644 --- a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSchedulerJobClient.java +++ b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/DefaultSchedulerJobClient.java @@ -4,6 +4,7 @@ package io.airbyte.scheduler.client; +import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.DestinationConnection; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardSync; @@ -14,6 +15,7 @@ import java.io.IOException; import java.util.List; import java.util.Optional; +import javax.annotation.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,7 +37,9 @@ public Job createOrGetActiveSyncJob(final SourceConnection source, final StandardSync standardSync, final String sourceDockerImage, final String destinationDockerImage, - final List standardSyncOperations) + final List standardSyncOperations, + @Nullable final ActorDefinitionResourceRequirements sourceResourceRequirements, + @Nullable final ActorDefinitionResourceRequirements destinationResourceRequirements) throws IOException { final Optional jobIdOptional = jobCreator.createSyncJob( source, @@ -43,7 +47,9 @@ public Job createOrGetActiveSyncJob(final SourceConnection source, standardSync, sourceDockerImage, destinationDockerImage, - standardSyncOperations); + standardSyncOperations, + sourceResourceRequirements, + destinationResourceRequirements); final long jobId = jobIdOptional.isEmpty() ? jobPersistence.getLastReplicationJob(standardSync.getConnectionId()).orElseThrow(() -> new RuntimeException("No job available")).getId() diff --git a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/SchedulerJobClient.java b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/SchedulerJobClient.java index 0ed5192269a8..09292fd7bb1e 100644 --- a/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/SchedulerJobClient.java +++ b/airbyte-scheduler/client/src/main/java/io/airbyte/scheduler/client/SchedulerJobClient.java @@ -4,6 +4,7 @@ package io.airbyte.scheduler.client; +import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.DestinationConnection; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardSync; @@ -11,6 +12,7 @@ import io.airbyte.scheduler.models.Job; import java.io.IOException; import java.util.List; +import javax.annotation.Nullable; /** * Exposes a way of executing short-lived jobs as RPC calls. If it returns successfully, it @@ -24,7 +26,9 @@ Job createOrGetActiveSyncJob(SourceConnection source, StandardSync standardSync, String sourceDockerImage, String destinationDockerImage, - List standardSyncOperations) + List standardSyncOperations, + @Nullable ActorDefinitionResourceRequirements sourceResourceRequirements, + @Nullable ActorDefinitionResourceRequirements destinationResourceRequirements) throws IOException; Job createOrGetActiveResetConnectionJob(DestinationConnection destination, diff --git a/airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSchedulerJobClientTest.java b/airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSchedulerJobClientTest.java index 7fc21ebaab80..172bf77b2907 100644 --- a/airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSchedulerJobClientTest.java +++ b/airbyte-scheduler/client/src/test/java/io/airbyte/scheduler/client/DefaultSchedulerJobClientTest.java @@ -46,11 +46,13 @@ void testCreateSyncJob() throws IOException { final DestinationConnection destination = mock(DestinationConnection.class); final StandardSync standardSync = mock(StandardSync.class); final String destinationDockerImage = "airbyte/spaceport"; - when(jobCreator.createSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of())) + when(jobCreator.createSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of(), null, null)) .thenReturn(Optional.of(JOB_ID)); when(jobPersistence.getJob(JOB_ID)).thenReturn(job); - assertEquals(job, client.createOrGetActiveSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of())); + assertEquals( + job, + client.createOrGetActiveSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of(), null, null)); } @Test @@ -61,14 +63,23 @@ void testCreateSyncJobAlreadyExist() throws IOException { final UUID connectionUuid = UUID.randomUUID(); when(standardSync.getConnectionId()).thenReturn(connectionUuid); final String destinationDockerImage = "airbyte/spaceport"; - when(jobCreator.createSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of())).thenReturn(Optional.empty()); + when(jobCreator.createSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of(), null, null)) + .thenReturn(Optional.empty()); final Job currentJob = mock(Job.class); when(currentJob.getId()).thenReturn(42L); when(jobPersistence.getLastReplicationJob(connectionUuid)).thenReturn(Optional.of(currentJob)); when(jobPersistence.getJob(42L)).thenReturn(currentJob); - assertEquals(currentJob, client.createOrGetActiveSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of())); + assertEquals(currentJob, client.createOrGetActiveSyncJob( + source, + destination, + standardSync, + DOCKER_IMAGE, + destinationDockerImage, + List.of(), + null, + null)); } @Test diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobCreator.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobCreator.java index e8cbdf45f3e1..1c8b63f4a921 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobCreator.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/DefaultJobCreator.java @@ -4,11 +4,13 @@ package io.airbyte.scheduler.persistence; +import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.DestinationConnection; import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobResetConnectionConfig; import io.airbyte.config.JobSyncConfig; +import io.airbyte.config.JobTypeResourceLimit.JobType; import io.airbyte.config.ResourceRequirements; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardSync; @@ -20,6 +22,7 @@ import java.io.IOException; import java.util.List; import java.util.Optional; +import javax.annotation.Nullable; public class DefaultJobCreator implements JobCreator { @@ -41,7 +44,9 @@ public Optional createSyncJob(final SourceConnection source, final StandardSync standardSync, final String sourceDockerImageName, final String destinationDockerImageName, - final List standardSyncOperations) + final List standardSyncOperations, + @Nullable final ActorDefinitionResourceRequirements sourceResourceReqs, + @Nullable final ActorDefinitionResourceRequirements destinationResourceReqs) throws IOException { // reusing this isn't going to quite work. final JobSyncConfig jobSyncConfig = new JobSyncConfig() @@ -55,7 +60,19 @@ public Optional createSyncJob(final SourceConnection source, .withOperationSequence(standardSyncOperations) .withConfiguredAirbyteCatalog(standardSync.getCatalog()) .withState(null) - .withResourceRequirements(getJobResourceRequirements(standardSync)); + .withResourceRequirements(ResourceRequirementsUtils.getResourceRequirements( + standardSync.getResourceRequirements(), + workerResourceRequirements)) + .withSourceResourceRequirements(ResourceRequirementsUtils.getResourceRequirements( + standardSync.getResourceRequirements(), + sourceResourceReqs, + workerResourceRequirements, + JobType.SYNC)) + .withDestinationResourceRequirements(ResourceRequirementsUtils.getResourceRequirements( + standardSync.getResourceRequirements(), + destinationResourceReqs, + workerResourceRequirements, + JobType.SYNC)); configRepository.getConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState); @@ -91,7 +108,9 @@ public Optional createResetConnectionJob(final DestinationConnection desti .withDestinationConfiguration(destination.getConfiguration()) .withOperationSequence(standardSyncOperations) .withConfiguredAirbyteCatalog(configuredAirbyteCatalog) - .withResourceRequirements(getJobResourceRequirements(standardSync)); + .withResourceRequirements(ResourceRequirementsUtils.getResourceRequirements( + standardSync.getResourceRequirements(), + workerResourceRequirements)); final JobConfig jobConfig = new JobConfig() .withConfigType(ConfigType.RESET_CONNECTION) @@ -99,26 +118,4 @@ public Optional createResetConnectionJob(final DestinationConnection desti return jobPersistence.enqueueJob(standardSync.getConnectionId().toString(), jobConfig); } - private ResourceRequirements getJobResourceRequirements(final StandardSync standardSync) { - if (standardSync.getResourceRequirements() == null) { - return workerResourceRequirements; - } - - final ResourceRequirements jobResourceRequirements = standardSync.getResourceRequirements(); - if (jobResourceRequirements.getCpuRequest() == null) { - jobResourceRequirements.setCpuRequest(workerResourceRequirements.getCpuRequest()); - } - if (jobResourceRequirements.getCpuLimit() == null) { - jobResourceRequirements.setCpuLimit(workerResourceRequirements.getCpuLimit()); - } - if (jobResourceRequirements.getMemoryRequest() == null) { - jobResourceRequirements.setMemoryRequest(workerResourceRequirements.getMemoryRequest()); - } - if (jobResourceRequirements.getMemoryLimit() == null) { - jobResourceRequirements.setMemoryLimit(workerResourceRequirements.getMemoryLimit()); - } - - return jobResourceRequirements; - } - } diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobCreator.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobCreator.java index 348ed6c8a347..59d5ec749855 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobCreator.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/JobCreator.java @@ -4,6 +4,7 @@ package io.airbyte.scheduler.persistence; +import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.DestinationConnection; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardSync; @@ -11,6 +12,7 @@ import java.io.IOException; import java.util.List; import java.util.Optional; +import javax.annotation.Nullable; public interface JobCreator { @@ -29,7 +31,9 @@ Optional createSyncJob(SourceConnection source, StandardSync standardSync, String sourceDockerImage, String destinationDockerImage, - List standardSyncOperations) + List standardSyncOperations, + @Nullable ActorDefinitionResourceRequirements sourceResourceReqs, + @Nullable ActorDefinitionResourceRequirements destinationResourceReqs) throws IOException; /** diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/ResourceRequirementsUtils.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/ResourceRequirementsUtils.java new file mode 100644 index 000000000000..0517271b3562 --- /dev/null +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/ResourceRequirementsUtils.java @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.scheduler.persistence; + +import com.google.common.base.Preconditions; +import io.airbyte.config.ActorDefinitionResourceRequirements; +import io.airbyte.config.JobTypeResourceLimit; +import io.airbyte.config.JobTypeResourceLimit.JobType; +import io.airbyte.config.ResourceRequirements; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import javax.annotation.Nullable; + +public class ResourceRequirementsUtils { + + /** + * Given connection-level resource requirements, actor-definition-level resource requirements, + * worker-default resource requirements, and a job type, returns the final resource requirements + * generated by merging the provided requirements in hierarchy order. + * + * Connection-level resource requirements take precendence over actor-definition level resource + * requirements. Within the actor-definition level requirements, job-type-specific requirements take + * precedence over default definition requirements. Actor-definition level resource requirements + * take precedence over worker default resource requirements. + * + * @param connectionResourceReqs - the resource requirements set on the connection + * @param actorDefinitionResourceReqs - the resource requirements set on the actor definition + * @param workerDefaultResourceReqs - the default worker resource requirements set in the env + * variables + * @param jobType - type of job to extract resource requirements for from the actor definition reqs + * @return resource requirements, if present, otherwise an empty ResourceRequirements object. + */ + public static ResourceRequirements getResourceRequirements(@Nullable final ResourceRequirements connectionResourceReqs, + @Nullable final ActorDefinitionResourceRequirements actorDefinitionResourceReqs, + @Nullable final ResourceRequirements workerDefaultResourceReqs, + final JobType jobType) { + final ResourceRequirements jobSpecificDefinitionResourceReqs = getResourceRequirementsForJobType(actorDefinitionResourceReqs, jobType) + .orElse(null); + final ResourceRequirements defaultDefinitionResourceReqs = Optional.ofNullable(actorDefinitionResourceReqs) + .map(ActorDefinitionResourceRequirements::getDefault).orElse(null); + return mergeResourceRequirements( + connectionResourceReqs, + jobSpecificDefinitionResourceReqs, + defaultDefinitionResourceReqs, + workerDefaultResourceReqs); + } + + /** + * Given connection-level and worker-default resource requirements, returns the final resource + * requirements generated by merging the provided requirements in hierarchy order. + * + * Connection-level resource requirements take precendence over worker-default resource + * requirements. + * + * @param connectionResourceReqs - the resource requirements set on the connection + * @param workerDefaultResourceReqs - the default worker resource requirements set in the env + * variables + * @return resource requirements, if present, otherwise an empty ResourceRequirements object. + */ + public static ResourceRequirements getResourceRequirements(@Nullable final ResourceRequirements connectionResourceReqs, + @Nullable final ResourceRequirements workerDefaultResourceReqs) { + return mergeResourceRequirements( + connectionResourceReqs, + workerDefaultResourceReqs); + } + + /** + * Given a list of resource requirements, merges them together. Earlier reqs override later ones. + * + * @param resourceReqs - list of resource request to merge + * @return merged resource req + */ + private static ResourceRequirements mergeResourceRequirements(final ResourceRequirements... resourceReqs) { + final ResourceRequirements outputReqs = new ResourceRequirements(); + final List reversed = new ArrayList<>(Arrays.asList(resourceReqs)); + Collections.reverse(reversed); + + // start from the lowest priority requirements so that we can repeatedly override the output + // requirements to guarantee that we end with the highest priority setting for each + for (final ResourceRequirements resourceReq : reversed) { + if (resourceReq == null) { + continue; + } + + if (resourceReq.getCpuRequest() != null) { + outputReqs.setCpuRequest(resourceReq.getCpuRequest()); + } + if (resourceReq.getCpuLimit() != null) { + outputReqs.setCpuLimit(resourceReq.getCpuLimit()); + } + if (resourceReq.getMemoryRequest() != null) { + outputReqs.setMemoryRequest(resourceReq.getMemoryRequest()); + } + if (resourceReq.getMemoryLimit() != null) { + outputReqs.setMemoryLimit(resourceReq.getMemoryLimit()); + } + } + return outputReqs; + } + + private static Optional getResourceRequirementsForJobType(final ActorDefinitionResourceRequirements actorDefResourceReqs, + final JobType jobType) { + if (actorDefResourceReqs == null) { + return Optional.empty(); + } + + final List jobTypeResourceRequirement = actorDefResourceReqs.getJobSpecific() + .stream() + .filter(jobSpecific -> jobSpecific.getJobType() == jobType).map(JobTypeResourceLimit::getResourceRequirements).collect( + Collectors.toList()); + + Preconditions.checkArgument(jobTypeResourceRequirement.size() <= 1, "Should only have one resource requirement per job type."); + return jobTypeResourceRequirement.isEmpty() + ? Optional.empty() + : Optional.of(jobTypeResourceRequirement.get(0)); + } + +} diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactory.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactory.java index 1c1a87eb392f..3aeb545647c4 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactory.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactory.java @@ -50,9 +50,10 @@ public Long create(final UUID connectionId) { destinationConnection.getWorkspaceId(), destinationConnection.getConfiguration()); destinationConnection.withConfiguration(destinationConfiguration); - final StandardSourceDefinition sourceDefinition = configRepository.getStandardSourceDefinition(sourceConnection.getSourceDefinitionId()); - final StandardDestinationDefinition destinationDefinition = - configRepository.getStandardDestinationDefinition(destinationConnection.getDestinationDefinitionId()); + final StandardSourceDefinition sourceDefinition = configRepository + .getStandardSourceDefinition(sourceConnection.getSourceDefinitionId()); + final StandardDestinationDefinition destinationDefinition = configRepository + .getStandardDestinationDefinition(destinationConnection.getDestinationDefinitionId()); final String sourceImageName = DockerUtils.getTaggedImageName(sourceDefinition.getDockerRepository(), sourceDefinition.getDockerImageTag()); final String destinationImageName = @@ -70,7 +71,9 @@ public Long create(final UUID connectionId) { standardSync, sourceImageName, destinationImageName, - standardSyncOperations) + standardSyncOperations, + sourceDefinition.getResourceRequirements(), + destinationDefinition.getResourceRequirements()) .orElseThrow(() -> new IllegalStateException("We shouldn't be trying to create a new sync job if there is one running already.")); } catch (final IOException | JsonValidationException | ConfigNotFoundException e) { diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobCreatorTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobCreatorTest.java index 6ff012ee19b7..d9feaed0d9ef 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobCreatorTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/DefaultJobCreatorTest.java @@ -7,17 +7,22 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.DestinationConnection; import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.JobResetConnectionConfig; import io.airbyte.config.JobSyncConfig; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; +import io.airbyte.config.JobTypeResourceLimit; +import io.airbyte.config.JobTypeResourceLimit.JobType; import io.airbyte.config.OperatorNormalization; import io.airbyte.config.OperatorNormalization.Option; import io.airbyte.config.ResourceRequirements; @@ -136,7 +141,9 @@ void testCreateSyncJob() throws IOException { .withDestinationDockerImage(DESTINATION_IMAGE_NAME) .withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog()) .withOperationSequence(List.of(STANDARD_SYNC_OPERATION)) - .withResourceRequirements(workerResourceRequirements); + .withResourceRequirements(workerResourceRequirements) + .withSourceResourceRequirements(workerResourceRequirements) + .withDestinationResourceRequirements(workerResourceRequirements); final JobConfig jobConfig = new JobConfig() .withConfigType(JobConfig.ConfigType.SYNC) @@ -151,7 +158,9 @@ void testCreateSyncJob() throws IOException { STANDARD_SYNC, SOURCE_IMAGE_NAME, DESTINATION_IMAGE_NAME, - List.of(STANDARD_SYNC_OPERATION)).orElseThrow(); + List.of(STANDARD_SYNC_OPERATION), + null, + null).orElseThrow(); assertEquals(JOB_ID, jobId); } @@ -182,7 +191,133 @@ void testCreateSyncJobEnsureNoQueuing() throws IOException { STANDARD_SYNC, SOURCE_IMAGE_NAME, DESTINATION_IMAGE_NAME, - List.of(STANDARD_SYNC_OPERATION)).isEmpty()); + List.of(STANDARD_SYNC_OPERATION), + null, + null).isEmpty()); + } + + @Test + void testCreateSyncJobDefaultWorkerResourceReqs() throws IOException { + jobCreator.createSyncJob( + SOURCE_CONNECTION, + DESTINATION_CONNECTION, + STANDARD_SYNC, + SOURCE_IMAGE_NAME, + DESTINATION_IMAGE_NAME, + List.of(STANDARD_SYNC_OPERATION), + null, + null); + + final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig() + .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) + .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) + .withPrefix(STANDARD_SYNC.getPrefix()) + .withSourceConfiguration(SOURCE_CONNECTION.getConfiguration()) + .withSourceDockerImage(SOURCE_IMAGE_NAME) + .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) + .withDestinationDockerImage(DESTINATION_IMAGE_NAME) + .withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog()) + .withOperationSequence(List.of(STANDARD_SYNC_OPERATION)) + .withResourceRequirements(workerResourceRequirements) + .withSourceResourceRequirements(workerResourceRequirements) + .withDestinationResourceRequirements(workerResourceRequirements); + + final JobConfig expectedJobConfig = new JobConfig() + .withConfigType(JobConfig.ConfigType.SYNC) + .withSync(expectedJobSyncConfig); + + final String expectedScope = STANDARD_SYNC.getConnectionId().toString(); + + verify(jobPersistence, times(1)).enqueueJob(expectedScope, expectedJobConfig); + } + + @Test + void testCreateSyncJobConnectionResourceReqs() throws IOException { + final ResourceRequirements standardSyncResourceRequirements = new ResourceRequirements() + .withCpuLimit("0.5") + .withCpuRequest("0.5") + .withMemoryLimit("500Mi") + .withMemoryRequest("500Mi"); + final StandardSync standardSync = Jsons.clone(STANDARD_SYNC).withResourceRequirements(standardSyncResourceRequirements); + + jobCreator.createSyncJob( + SOURCE_CONNECTION, + DESTINATION_CONNECTION, + standardSync, + SOURCE_IMAGE_NAME, + DESTINATION_IMAGE_NAME, + List.of(STANDARD_SYNC_OPERATION), + null, + null); + + final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig() + .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) + .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) + .withPrefix(STANDARD_SYNC.getPrefix()) + .withSourceConfiguration(SOURCE_CONNECTION.getConfiguration()) + .withSourceDockerImage(SOURCE_IMAGE_NAME) + .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) + .withDestinationDockerImage(DESTINATION_IMAGE_NAME) + .withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog()) + .withOperationSequence(List.of(STANDARD_SYNC_OPERATION)) + .withResourceRequirements(standardSyncResourceRequirements) + .withSourceResourceRequirements(standardSyncResourceRequirements) + .withDestinationResourceRequirements(standardSyncResourceRequirements); + + final JobConfig expectedJobConfig = new JobConfig() + .withConfigType(JobConfig.ConfigType.SYNC) + .withSync(expectedJobSyncConfig); + + final String expectedScope = STANDARD_SYNC.getConnectionId().toString(); + + verify(jobPersistence, times(1)).enqueueJob(expectedScope, expectedJobConfig); + } + + @Test + void testCreateSyncJobSourceAndDestinationResourceReqs() throws IOException { + final ResourceRequirements sourceResourceRequirements = new ResourceRequirements() + .withCpuLimit("0.7") + .withCpuRequest("0.7") + .withMemoryLimit("700Mi") + .withMemoryRequest("700Mi"); + final ResourceRequirements destResourceRequirements = new ResourceRequirements() + .withCpuLimit("0.8") + .withCpuRequest("0.8") + .withMemoryLimit("800Mi") + .withMemoryRequest("800Mi"); + + jobCreator.createSyncJob( + SOURCE_CONNECTION, + DESTINATION_CONNECTION, + STANDARD_SYNC, + SOURCE_IMAGE_NAME, + DESTINATION_IMAGE_NAME, + List.of(STANDARD_SYNC_OPERATION), + new ActorDefinitionResourceRequirements().withDefault(sourceResourceRequirements), + new ActorDefinitionResourceRequirements().withJobSpecific(List.of( + new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements(destResourceRequirements)))); + + final JobSyncConfig expectedJobSyncConfig = new JobSyncConfig() + .withNamespaceDefinition(STANDARD_SYNC.getNamespaceDefinition()) + .withNamespaceFormat(STANDARD_SYNC.getNamespaceFormat()) + .withPrefix(STANDARD_SYNC.getPrefix()) + .withSourceConfiguration(SOURCE_CONNECTION.getConfiguration()) + .withSourceDockerImage(SOURCE_IMAGE_NAME) + .withDestinationConfiguration(DESTINATION_CONNECTION.getConfiguration()) + .withDestinationDockerImage(DESTINATION_IMAGE_NAME) + .withConfiguredAirbyteCatalog(STANDARD_SYNC.getCatalog()) + .withOperationSequence(List.of(STANDARD_SYNC_OPERATION)) + .withResourceRequirements(workerResourceRequirements) + .withSourceResourceRequirements(sourceResourceRequirements) + .withDestinationResourceRequirements(destResourceRequirements); + + final JobConfig expectedJobConfig = new JobConfig() + .withConfigType(JobConfig.ConfigType.SYNC) + .withSync(expectedJobSyncConfig); + + final String expectedScope = STANDARD_SYNC.getConnectionId().toString(); + + verify(jobPersistence, times(1)).enqueueJob(expectedScope, expectedJobConfig); } @Test diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/ResourceRequirementsUtilsTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/ResourceRequirementsUtilsTest.java new file mode 100644 index 000000000000..b60348d4b420 --- /dev/null +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/ResourceRequirementsUtilsTest.java @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2021 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.scheduler.persistence; + +import static org.junit.jupiter.api.Assertions.*; + +import io.airbyte.config.ActorDefinitionResourceRequirements; +import io.airbyte.config.JobTypeResourceLimit; +import io.airbyte.config.JobTypeResourceLimit.JobType; +import io.airbyte.config.ResourceRequirements; +import java.util.List; +import org.junit.jupiter.api.Test; + +class ResourceRequirementsUtilsTest { + + @Test + void testNoReqsSet() { + final ResourceRequirements result = ResourceRequirementsUtils.getResourceRequirements( + null, + null, + null, + JobType.SYNC); + + assertEquals(new ResourceRequirements(), result); + } + + @Test + void testWorkerDefaultReqsSet() { + final ResourceRequirements workerDefaultReqs = new ResourceRequirements().withCpuRequest("1").withCpuLimit("1"); + final ResourceRequirements reqs = ResourceRequirementsUtils.getResourceRequirements( + null, + null, + workerDefaultReqs, + JobType.SYNC); + + assertEquals(workerDefaultReqs, reqs); + } + + @Test + void testDefinitionDefaultReqsOverrideWorker() { + final ResourceRequirements workerDefaultReqs = new ResourceRequirements().withCpuRequest("1").withCpuLimit("1"); + final ResourceRequirements definitionDefaultReqs = new ResourceRequirements().withCpuLimit("2").withMemoryRequest("100Mi"); + final ActorDefinitionResourceRequirements definitionReqs = new ActorDefinitionResourceRequirements().withDefault(definitionDefaultReqs); + + final ResourceRequirements result = ResourceRequirementsUtils.getResourceRequirements( + null, + definitionReqs, + workerDefaultReqs, + JobType.SYNC); + + final ResourceRequirements expectedReqs = new ResourceRequirements() + .withCpuRequest("1") + .withCpuLimit("2") + .withMemoryRequest("100Mi"); + + assertEquals(expectedReqs, result); + } + + @Test + void testJobSpecificReqsOverrideDefault() { + final ResourceRequirements workerDefaultReqs = new ResourceRequirements().withCpuRequest("1").withCpuLimit("1"); + final ResourceRequirements definitionDefaultReqs = new ResourceRequirements().withCpuLimit("2").withMemoryRequest("100Mi"); + final JobTypeResourceLimit jobTypeResourceLimit = new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements( + new ResourceRequirements().withCpuRequest("2").withMemoryRequest("200Mi").withMemoryLimit("300Mi")); + final ActorDefinitionResourceRequirements definitionReqs = new ActorDefinitionResourceRequirements() + .withDefault(definitionDefaultReqs) + .withJobSpecific(List.of(jobTypeResourceLimit)); + + final ResourceRequirements result = ResourceRequirementsUtils.getResourceRequirements( + null, + definitionReqs, + workerDefaultReqs, + JobType.SYNC); + + final ResourceRequirements expectedReqs = new ResourceRequirements() + .withCpuRequest("2") + .withCpuLimit("2") + .withMemoryRequest("200Mi") + .withMemoryLimit("300Mi"); + assertEquals(expectedReqs, result); + } + + @Test + void testConnectionResourceRequirementsOverrideDefault() { + final ResourceRequirements workerDefaultReqs = new ResourceRequirements().withCpuRequest("1"); + final ResourceRequirements definitionDefaultReqs = new ResourceRequirements().withCpuLimit("2").withCpuRequest("2"); + final JobTypeResourceLimit jobTypeResourceLimit = new JobTypeResourceLimit().withJobType(JobType.SYNC).withResourceRequirements( + new ResourceRequirements().withCpuLimit("3").withMemoryRequest("200Mi")); + final ActorDefinitionResourceRequirements definitionReqs = new ActorDefinitionResourceRequirements() + .withDefault(definitionDefaultReqs) + .withJobSpecific(List.of(jobTypeResourceLimit)); + final ResourceRequirements connectionResourceRequirements = new ResourceRequirements().withMemoryRequest("400Mi").withMemoryLimit("500Mi"); + + final ResourceRequirements result = ResourceRequirementsUtils.getResourceRequirements( + connectionResourceRequirements, + definitionReqs, + workerDefaultReqs, + JobType.SYNC); + + final ResourceRequirements expectedReqs = new ResourceRequirements() + .withCpuRequest("2") + .withCpuLimit("3") + .withMemoryRequest("400Mi") + .withMemoryLimit("500Mi"); + assertEquals(expectedReqs, result); + } + + @Test + void testConnectionResourceRequirementsOverrideWorker() { + final ResourceRequirements workerDefaultReqs = new ResourceRequirements().withCpuRequest("1").withCpuLimit("1"); + final ResourceRequirements connectionResourceRequirements = new ResourceRequirements().withCpuLimit("2").withMemoryLimit("500Mi"); + + final ResourceRequirements result = ResourceRequirementsUtils.getResourceRequirements(connectionResourceRequirements, workerDefaultReqs); + + final ResourceRequirements expectedReqs = new ResourceRequirements() + .withCpuRequest("1") + .withCpuLimit("2") + .withMemoryLimit("500Mi"); + assertEquals(expectedReqs, result); + } + +} diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactoryTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactoryTest.java index 98a4a8725cb2..bc263b2b0ac7 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactoryTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_factory/DefaultSyncJobFactoryTest.java @@ -62,7 +62,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo when(configRepository.getSourceConnection(sourceId)).thenReturn(sourceConnection); when(configRepository.getDestinationConnection(destinationId)).thenReturn(destinationConnection); when(configRepository.getStandardSyncOperation(operationId)).thenReturn(operation); - when(jobCreator.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, dstDockerImage, operations)) + when(jobCreator.createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, dstDockerImage, operations, null, null)) .thenReturn(Optional.of(jobId)); when(configRepository.getStandardSourceDefinition(sourceDefinitionId)) .thenReturn(new StandardSourceDefinition().withSourceDefinitionId(sourceDefinitionId).withDockerRepository(srcDockerRepo) @@ -77,7 +77,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo assertEquals(jobId, actualJobId); verify(jobCreator) - .createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, dstDockerImage, operations); + .createSyncJob(sourceConnection, destinationConnection, standardSync, srcDockerImage, dstDockerImage, operations, null, null); } } diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java index 3c92a7952695..c5eb017c85b3 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/ConnectionsHandler.java @@ -123,8 +123,6 @@ public ConnectionRead createConnection(final ConnectionCreate connectionCreate) .withStatus(ApiPojoConverters.toPersistenceStatus(connectionCreate.getStatus())); if (connectionCreate.getResourceRequirements() != null) { standardSync.withResourceRequirements(ApiPojoConverters.resourceRequirementsToInternal(connectionCreate.getResourceRequirements())); - } else { - standardSync.withResourceRequirements(workerConfigs.getResourceRequirements()); } // TODO Undesirable behavior: sending a null configured catalog should not be valid? diff --git a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java index 936db2235713..9012914c575b 100644 --- a/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java +++ b/airbyte-server/src/main/java/io/airbyte/server/handlers/SchedulerHandler.java @@ -353,7 +353,9 @@ public JobInfoRead syncConnection(final ConnectionIdRequestBody connectionIdRequ standardSync, sourceImageName, destinationImageName, - standardSyncOperations); + standardSyncOperations, + sourceDef.getResourceRequirements(), + destinationDef.getResourceRequirements()); return jobConverter.getJobInfoRead(job); } diff --git a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java index 544eb9c903fb..be52d1ac072b 100644 --- a/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-server/src/test/java/io/airbyte/server/handlers/SchedulerHandlerTest.java @@ -39,12 +39,14 @@ import io.airbyte.commons.features.FeatureFlags; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; +import io.airbyte.config.ActorDefinitionResourceRequirements; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.DestinationConnection; import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; import io.airbyte.config.OperatorNormalization; import io.airbyte.config.OperatorNormalization.Option; +import io.airbyte.config.ResourceRequirements; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardCheckConnectionOutput; import io.airbyte.config.StandardDestinationDefinition; @@ -500,16 +502,22 @@ void testSyncConnection() throws JsonValidationException, IOException, ConfigNot final UUID operationId = standardSync.getOperationIds().get(0); final List operations = getOperations(standardSync); + final ActorDefinitionResourceRequirements sourceResourceReqs = + new ActorDefinitionResourceRequirements().withDefault(new ResourceRequirements().withCpuRequest("1")); when(configRepository.getStandardSourceDefinition(source.getSourceDefinitionId())) .thenReturn(new StandardSourceDefinition() .withDockerRepository(SOURCE_DOCKER_REPO) .withDockerImageTag(SOURCE_DOCKER_TAG) - .withSourceDefinitionId(source.getSourceDefinitionId())); + .withSourceDefinitionId(source.getSourceDefinitionId()) + .withResourceRequirements(sourceResourceReqs)); + final ActorDefinitionResourceRequirements destResourceReqs = + new ActorDefinitionResourceRequirements().withDefault(new ResourceRequirements().withCpuRequest("2")); when(configRepository.getStandardDestinationDefinition(destination.getDestinationDefinitionId())) .thenReturn(new StandardDestinationDefinition() .withDockerRepository(DESTINATION_DOCKER_REPO) .withDockerImageTag(DESTINATION_DOCKER_TAG) - .withDestinationDefinitionId(destination.getDestinationDefinitionId())); + .withDestinationDefinitionId(destination.getDestinationDefinitionId()) + .withResourceRequirements(destResourceReqs)); when(configRepository.getStandardSync(standardSync.getConnectionId())).thenReturn(standardSync); when(configRepository.getSourceConnection(source.getSourceId())).thenReturn(source); when(configRepository.getDestinationConnection(destination.getDestinationId())).thenReturn(destination); @@ -520,7 +528,9 @@ void testSyncConnection() throws JsonValidationException, IOException, ConfigNot standardSync, SOURCE_DOCKER_IMAGE, DESTINATION_DOCKER_IMAGE, - operations)) + operations, + sourceResourceReqs, + destResourceReqs)) .thenReturn(completedJob); when(completedJob.getScope()).thenReturn("cat:12"); final JobConfig jobConfig = mock(JobConfig.class); @@ -539,7 +549,9 @@ void testSyncConnection() throws JsonValidationException, IOException, ConfigNot standardSync, SOURCE_DOCKER_IMAGE, DESTINATION_DOCKER_IMAGE, - operations); + operations, + sourceResourceReqs, + destResourceReqs); } @Test diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java index e9429384c0f5..b9c82a441abc 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/TemporalClient.java @@ -147,7 +147,9 @@ public TemporalResponse submitSync(final long jobId, final i .withOperationSequence(config.getOperationSequence()) .withCatalog(config.getConfiguredAirbyteCatalog()) .withState(config.getState()) - .withResourceRequirements(config.getResourceRequirements()); + .withResourceRequirements(config.getResourceRequirements()) + .withSourceResourceRequirements(config.getSourceResourceRequirements()) + .withDestinationResourceRequirements(config.getDestinationResourceRequirements()); return execute(jobRunConfig, () -> getWorkflowStub(SyncWorkflow.class, TemporalJobType.SYNC).run( @@ -394,8 +396,8 @@ public ManualSyncSubmissionResult resetConnection(final UUID connectionId) { * The way to do so is to wait for the jobId to change, either to a new job id or the default id * that signal that a workflow is waiting to be submitted */ - public ManualSyncSubmissionResult synchronousResetConnection(UUID connectionId) { - ManualSyncSubmissionResult resetResult = resetConnection(connectionId); + public ManualSyncSubmissionResult synchronousResetConnection(final UUID connectionId) { + final ManualSyncSubmissionResult resetResult = resetConnection(connectionId); if (resetResult.getFailingReason().isPresent()) { return resetResult; } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java index 9ea24f34d727..30edaf108adc 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/scheduling/activities/GenerateInputActivityImpl.java @@ -65,7 +65,9 @@ public GeneratedJobInput getSyncWorkflowInput(final SyncInput input) { .withOperationSequence(config.getOperationSequence()) .withCatalog(config.getConfiguredAirbyteCatalog()) .withState(config.getState()) - .withResourceRequirements(config.getResourceRequirements()); + .withResourceRequirements(config.getResourceRequirements()) + .withSourceResourceRequirements(config.getSourceResourceRequirements()) + .withDestinationResourceRequirements(config.getDestinationResourceRequirements()); return new GeneratedJobInput(jobRunConfig, sourceLauncherConfig, destinationLauncherConfig, syncInput); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java index 6d50e79b77bf..656a30c79f77 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationActivityImpl.java @@ -11,6 +11,7 @@ import io.airbyte.config.ConfigSchema; import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.ReplicationOutput; +import io.airbyte.config.ResourceRequirements; import io.airbyte.config.StandardSyncInput; import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; @@ -22,6 +23,7 @@ import io.airbyte.workers.DefaultReplicationWorker; import io.airbyte.workers.Worker; import io.airbyte.workers.WorkerApp; +import io.airbyte.workers.WorkerApp.ContainerOrchestratorConfig; import io.airbyte.workers.WorkerConfigs; import io.airbyte.workers.WorkerConstants; import io.airbyte.workers.process.AirbyteIntegrationLauncher; @@ -122,7 +124,8 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig, containerOrchestratorConfig.get(), sourceLauncherConfig, destinationLauncherConfig, - jobRunConfig); + jobRunConfig, + syncInput.getResourceRequirements()); } else { workerFactory = getLegacyWorkerFactory(sourceLauncherConfig, destinationLauncherConfig, jobRunConfig, syncInput); } @@ -169,8 +172,7 @@ private static StandardSyncOutput reduceReplicationOutput(final ReplicationOutpu return standardSyncOutput; } - private CheckedSupplier, Exception> getLegacyWorkerFactory( - final IntegrationLauncherConfig sourceLauncherConfig, + private CheckedSupplier, Exception> getLegacyWorkerFactory(final IntegrationLauncherConfig sourceLauncherConfig, final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig, final StandardSyncInput syncInput) { @@ -180,13 +182,13 @@ private CheckedSupplier, Exception> Math.toIntExact(sourceLauncherConfig.getAttemptId()), sourceLauncherConfig.getDockerImage(), processFactory, - syncInput.getResourceRequirements()); + syncInput.getSourceResourceRequirements()); final IntegrationLauncher destinationLauncher = new AirbyteIntegrationLauncher( destinationLauncherConfig.getJobId(), Math.toIntExact(destinationLauncherConfig.getAttemptId()), destinationLauncherConfig.getDockerImage(), processFactory, - syncInput.getResourceRequirements()); + syncInput.getDestinationResourceRequirements()); // reset jobs use an empty source to induce resetting all data in destination. final AirbyteSource airbyteSource = @@ -203,11 +205,11 @@ private CheckedSupplier, Exception> }; } - private CheckedSupplier, Exception> getContainerLauncherWorkerFactory( - final WorkerApp.ContainerOrchestratorConfig containerOrchestratorConfig, + private CheckedSupplier, Exception> getContainerLauncherWorkerFactory(final ContainerOrchestratorConfig containerOrchestratorConfig, final IntegrationLauncherConfig sourceLauncherConfig, final IntegrationLauncherConfig destinationLauncherConfig, - final JobRunConfig jobRunConfig) + final JobRunConfig jobRunConfig, + final ResourceRequirements resourceRequirements) throws IOException { final var jobScope = jobPersistence.getJob(Long.parseLong(jobRunConfig.getJobId())).getScope(); @@ -219,7 +221,7 @@ private CheckedSupplier, Exception> sourceLauncherConfig, destinationLauncherConfig, jobRunConfig, - workerConfigs); + resourceRequirements); } } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java index 99fa1811fa90..6cfb706f70df 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/ReplicationLauncherWorker.java @@ -6,11 +6,11 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.config.ReplicationOutput; +import io.airbyte.config.ResourceRequirements; import io.airbyte.config.StandardSyncInput; import io.airbyte.scheduler.models.IntegrationLauncherConfig; import io.airbyte.scheduler.models.JobRunConfig; import io.airbyte.workers.WorkerApp; -import io.airbyte.workers.WorkerConfigs; import java.util.Map; import java.util.UUID; @@ -31,7 +31,7 @@ public ReplicationLauncherWorker(final UUID connectionId, final IntegrationLauncherConfig sourceLauncherConfig, final IntegrationLauncherConfig destinationLauncherConfig, final JobRunConfig jobRunConfig, - final WorkerConfigs workerConfigs) { + final ResourceRequirements resourceRequirements) { super( connectionId, REPLICATION, @@ -41,7 +41,7 @@ public ReplicationLauncherWorker(final UUID connectionId, INIT_FILE_SOURCE_LAUNCHER_CONFIG, Jsons.serialize(sourceLauncherConfig), INIT_FILE_DESTINATION_LAUNCHER_CONFIG, Jsons.serialize(destinationLauncherConfig)), containerOrchestratorConfig, - workerConfigs.getResourceRequirements(), + resourceRequirements, ReplicationOutput.class); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java index 1af3f2880dc8..028ae6fded4d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/temporal/sync/SyncWorkflowImpl.java @@ -56,7 +56,7 @@ public StandardSyncOutput run(final JobRunConfig jobRunConfig, final NormalizationInput normalizationInput = new NormalizationInput() .withDestinationConfiguration(syncInput.getDestinationConfiguration()) .withCatalog(run.getOutputCatalog()) - .withResourceRequirements(syncInput.getResourceRequirements()); + .withResourceRequirements(syncInput.getDestinationResourceRequirements()); normalizationActivity.normalize(jobRunConfig, destinationLauncherConfig, normalizationInput); } else if (standardSyncOperation.getOperatorType() == OperatorType.DBT) { diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/worker_run/TemporalWorkerRunFactory.java b/airbyte-workers/src/main/java/io/airbyte/workers/worker_run/TemporalWorkerRunFactory.java index 792c448abd73..063a8286dd2d 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/worker_run/TemporalWorkerRunFactory.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/worker_run/TemporalWorkerRunFactory.java @@ -96,7 +96,9 @@ public CheckedSupplier, Exception> createSupplier(fin .withDestinationConfiguration(resetConnection.getDestinationConfiguration()) .withConfiguredAirbyteCatalog(resetConnection.getConfiguredAirbyteCatalog()) .withOperationSequence(resetConnection.getOperationSequence()) - .withResourceRequirements(resetConnection.getResourceRequirements()); + .withResourceRequirements(resetConnection.getResourceRequirements()) + .withSourceResourceRequirements(resetConnection.getResourceRequirements()) + .withDestinationResourceRequirements(resetConnection.getResourceRequirements()); final TemporalResponse output = temporalClient.submitSync(job.getId(), attemptId, config, connectionId); return toOutputAndStatus(output);