Skip to content

Commit

Permalink
Set resource limits for connector definitions: expose in worker (#10483)
Browse files Browse the repository at this point in the history
* pipe through to worker

* wip

* pass source and dest def resource reqs to job client

* fix test

* use resource requirements utils to get resource reqs for legacy and new impls

* undo changes to pass sync input to container launcher worker factory

* remove import

* fix hierarchy order of resource requirements

* add nullable annotations

* undo change to test

* format

* use destination resource reqs for normalization and make resource req utils more flexible

* format

* refactor resource requirements utils and add tests

* switch to storing source/dest resource requirements directly on job sync config

* fix tests and javadocs

* use sync input resource requirements for container orchestrator pod

* do not set connection resource reqs to worker reqs

* add overrident requirement utils method + test + comment

Co-authored-by: lmossman <lake@airbyte.io>
  • Loading branch information
cgardens and lmossman authored Feb 25, 2022
1 parent b045a9e commit 1242aa8
Show file tree
Hide file tree
Showing 23 changed files with 526 additions and 72 deletions.
9 changes: 9 additions & 0 deletions airbyte-config/models/src/main/resources/types/ActorType.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/ActorType.yaml
title: ActorType
description: enum that describes different types of actors
type: string
enum:
- source
- destination
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ properties:
destinationDockerImage:
description: Image name of the destination with tag.
type: string
sourceResourceRequirements:
type: object
description: optional resource requirements to use in source container - this is used instead of `resourceRequirements` for the source container
existingJavaType: io.airbyte.config.ResourceRequirements
destinationResourceRequirements:
type: object
description: optional resource requirements to use in dest container - this is used instead of `resourceRequirements` for the dest container
existingJavaType: io.airbyte.config.ResourceRequirements
operationSequence:
description: Sequence of configurations of operations to apply as part of the sync
type: array
Expand All @@ -49,5 +57,5 @@ properties:
"$ref": State.yaml
resourceRequirements:
type: object
description: optional resource requirements to run sync workers
description: optional resource requirements to run sync workers - this is used for containers other than the source/dest containers
existingJavaType: io.airbyte.config.ResourceRequirements
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,20 @@ properties:
catalog:
description: the configured airbyte catalog
type: object
# necessary because the configuration declaration is in a separate package.
existingJavaType: io.airbyte.protocol.models.ConfiguredAirbyteCatalog
state:
description: optional state of the previous run. this object is defined per integration.
"$ref": State.yaml
resourceRequirements:
description: optional resource requirements to run sync workers
description: optional resource requirements to run sync workers - this is used for containers other than the source/dest containers
type: object
existingJavaType: io.airbyte.config.ResourceRequirements
"$ref": ResourceRequirements.yaml
sourceResourceRequirements:
description: optional resource requirements to use in source container - this is used instead of `resourceRequirements` for the source container
type: object
"$ref": ResourceRequirements.yaml
destinationResourceRequirements:
description: optional resource requirements to use in dest container - this is used instead of `resourceRequirements` for the dest container
type: object
"$ref": ResourceRequirements.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,15 @@ public Optional<String> runJob() throws Exception {
Math.toIntExact(sourceLauncherConfig.getAttemptId()),
sourceLauncherConfig.getDockerImage(),
processFactory,
syncInput.getResourceRequirements());
syncInput.getSourceResourceRequirements());

log.info("Setting up destination launcher...");
final IntegrationLauncher destinationLauncher = new AirbyteIntegrationLauncher(
destinationLauncherConfig.getJobId(),
Math.toIntExact(destinationLauncherConfig.getAttemptId()),
destinationLauncherConfig.getDockerImage(),
processFactory,
syncInput.getResourceRequirements());
syncInput.getDestinationResourceRequirements());

log.info("Setting up source...");
// reset jobs use an empty source to induce resetting all data in destination.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.scheduler.client;

import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
Expand All @@ -14,6 +15,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -35,15 +37,19 @@ public Job createOrGetActiveSyncJob(final SourceConnection source,
final StandardSync standardSync,
final String sourceDockerImage,
final String destinationDockerImage,
final List<StandardSyncOperation> standardSyncOperations)
final List<StandardSyncOperation> standardSyncOperations,
@Nullable final ActorDefinitionResourceRequirements sourceResourceRequirements,
@Nullable final ActorDefinitionResourceRequirements destinationResourceRequirements)
throws IOException {
final Optional<Long> jobIdOptional = jobCreator.createSyncJob(
source,
destination,
standardSync,
sourceDockerImage,
destinationDockerImage,
standardSyncOperations);
standardSyncOperations,
sourceResourceRequirements,
destinationResourceRequirements);

final long jobId = jobIdOptional.isEmpty()
? jobPersistence.getLastReplicationJob(standardSync.getConnectionId()).orElseThrow(() -> new RuntimeException("No job available")).getId()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

package io.airbyte.scheduler.client;

import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import io.airbyte.scheduler.models.Job;
import java.io.IOException;
import java.util.List;
import javax.annotation.Nullable;

/**
* Exposes a way of executing short-lived jobs as RPC calls. If it returns successfully, it
Expand All @@ -24,7 +26,9 @@ Job createOrGetActiveSyncJob(SourceConnection source,
StandardSync standardSync,
String sourceDockerImage,
String destinationDockerImage,
List<StandardSyncOperation> standardSyncOperations)
List<StandardSyncOperation> standardSyncOperations,
@Nullable ActorDefinitionResourceRequirements sourceResourceRequirements,
@Nullable ActorDefinitionResourceRequirements destinationResourceRequirements)
throws IOException;

Job createOrGetActiveResetConnectionJob(DestinationConnection destination,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ void testCreateSyncJob() throws IOException {
final DestinationConnection destination = mock(DestinationConnection.class);
final StandardSync standardSync = mock(StandardSync.class);
final String destinationDockerImage = "airbyte/spaceport";
when(jobCreator.createSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of()))
when(jobCreator.createSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of(), null, null))
.thenReturn(Optional.of(JOB_ID));
when(jobPersistence.getJob(JOB_ID)).thenReturn(job);

assertEquals(job, client.createOrGetActiveSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of()));
assertEquals(
job,
client.createOrGetActiveSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of(), null, null));
}

@Test
Expand All @@ -61,14 +63,23 @@ void testCreateSyncJobAlreadyExist() throws IOException {
final UUID connectionUuid = UUID.randomUUID();
when(standardSync.getConnectionId()).thenReturn(connectionUuid);
final String destinationDockerImage = "airbyte/spaceport";
when(jobCreator.createSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of())).thenReturn(Optional.empty());
when(jobCreator.createSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of(), null, null))
.thenReturn(Optional.empty());

final Job currentJob = mock(Job.class);
when(currentJob.getId()).thenReturn(42L);
when(jobPersistence.getLastReplicationJob(connectionUuid)).thenReturn(Optional.of(currentJob));
when(jobPersistence.getJob(42L)).thenReturn(currentJob);

assertEquals(currentJob, client.createOrGetActiveSyncJob(source, destination, standardSync, DOCKER_IMAGE, destinationDockerImage, List.of()));
assertEquals(currentJob, client.createOrGetActiveSyncJob(
source,
destination,
standardSync,
DOCKER_IMAGE,
destinationDockerImage,
List.of(),
null,
null));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@

package io.airbyte.scheduler.persistence;

import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.JobConfig;
import io.airbyte.config.JobConfig.ConfigType;
import io.airbyte.config.JobResetConnectionConfig;
import io.airbyte.config.JobSyncConfig;
import io.airbyte.config.JobTypeResourceLimit.JobType;
import io.airbyte.config.ResourceRequirements;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
Expand All @@ -20,6 +22,7 @@
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;

public class DefaultJobCreator implements JobCreator {

Expand All @@ -41,7 +44,9 @@ public Optional<Long> createSyncJob(final SourceConnection source,
final StandardSync standardSync,
final String sourceDockerImageName,
final String destinationDockerImageName,
final List<StandardSyncOperation> standardSyncOperations)
final List<StandardSyncOperation> standardSyncOperations,
@Nullable final ActorDefinitionResourceRequirements sourceResourceReqs,
@Nullable final ActorDefinitionResourceRequirements destinationResourceReqs)
throws IOException {
// reusing this isn't going to quite work.
final JobSyncConfig jobSyncConfig = new JobSyncConfig()
Expand All @@ -55,7 +60,19 @@ public Optional<Long> createSyncJob(final SourceConnection source,
.withOperationSequence(standardSyncOperations)
.withConfiguredAirbyteCatalog(standardSync.getCatalog())
.withState(null)
.withResourceRequirements(getJobResourceRequirements(standardSync));
.withResourceRequirements(ResourceRequirementsUtils.getResourceRequirements(
standardSync.getResourceRequirements(),
workerResourceRequirements))
.withSourceResourceRequirements(ResourceRequirementsUtils.getResourceRequirements(
standardSync.getResourceRequirements(),
sourceResourceReqs,
workerResourceRequirements,
JobType.SYNC))
.withDestinationResourceRequirements(ResourceRequirementsUtils.getResourceRequirements(
standardSync.getResourceRequirements(),
destinationResourceReqs,
workerResourceRequirements,
JobType.SYNC));

configRepository.getConnectionState(standardSync.getConnectionId()).ifPresent(jobSyncConfig::withState);

Expand Down Expand Up @@ -91,34 +108,14 @@ public Optional<Long> createResetConnectionJob(final DestinationConnection desti
.withDestinationConfiguration(destination.getConfiguration())
.withOperationSequence(standardSyncOperations)
.withConfiguredAirbyteCatalog(configuredAirbyteCatalog)
.withResourceRequirements(getJobResourceRequirements(standardSync));
.withResourceRequirements(ResourceRequirementsUtils.getResourceRequirements(
standardSync.getResourceRequirements(),
workerResourceRequirements));

final JobConfig jobConfig = new JobConfig()
.withConfigType(ConfigType.RESET_CONNECTION)
.withResetConnection(resetConnectionConfig);
return jobPersistence.enqueueJob(standardSync.getConnectionId().toString(), jobConfig);
}

private ResourceRequirements getJobResourceRequirements(final StandardSync standardSync) {
if (standardSync.getResourceRequirements() == null) {
return workerResourceRequirements;
}

final ResourceRequirements jobResourceRequirements = standardSync.getResourceRequirements();
if (jobResourceRequirements.getCpuRequest() == null) {
jobResourceRequirements.setCpuRequest(workerResourceRequirements.getCpuRequest());
}
if (jobResourceRequirements.getCpuLimit() == null) {
jobResourceRequirements.setCpuLimit(workerResourceRequirements.getCpuLimit());
}
if (jobResourceRequirements.getMemoryRequest() == null) {
jobResourceRequirements.setMemoryRequest(workerResourceRequirements.getMemoryRequest());
}
if (jobResourceRequirements.getMemoryLimit() == null) {
jobResourceRequirements.setMemoryLimit(workerResourceRequirements.getMemoryLimit());
}

return jobResourceRequirements;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

package io.airbyte.scheduler.persistence;

import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardSync;
import io.airbyte.config.StandardSyncOperation;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import javax.annotation.Nullable;

public interface JobCreator {

Expand All @@ -29,7 +31,9 @@ Optional<Long> createSyncJob(SourceConnection source,
StandardSync standardSync,
String sourceDockerImage,
String destinationDockerImage,
List<StandardSyncOperation> standardSyncOperations)
List<StandardSyncOperation> standardSyncOperations,
@Nullable ActorDefinitionResourceRequirements sourceResourceReqs,
@Nullable ActorDefinitionResourceRequirements destinationResourceReqs)
throws IOException;

/**
Expand Down
Loading

0 comments on commit 1242aa8

Please sign in to comment.