Skip to content

Commit

Permalink
default to no resource limits for OSS (#10800)
Browse files Browse the repository at this point in the history
  • Loading branch information
jrhizor authored Mar 2, 2022
1 parent c81ceb2 commit 5aecbc3
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ public interface Configs {
*/
int getSyncJobMaxTimeoutDays();

/**
* Defines whether job creation uses connector-specific resource requirements when spawning jobs.
* Works on both Docker and Kubernetes. Defaults to false for ease of use in OSS trials of Airbyte
* but recommended for production deployments.
*/
boolean connectorSpecificResourceDefaultsEnabled();

/**
* Define the job container's minimum CPU usage. Units follow either Docker or Kubernetes, depending
* on the deployment. Defaults to none.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class EnvConfigs implements Configs {
public static final String JOB_KUBE_CURL_IMAGE = "JOB_KUBE_CURL_IMAGE";
public static final String SYNC_JOB_MAX_ATTEMPTS = "SYNC_JOB_MAX_ATTEMPTS";
public static final String SYNC_JOB_MAX_TIMEOUT_DAYS = "SYNC_JOB_MAX_TIMEOUT_DAYS";
private static final String CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED = "CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED";
private static final String MINIMUM_WORKSPACE_RETENTION_DAYS = "MINIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_SIZE_MB = "MAXIMUM_WORKSPACE_SIZE_MB";
Expand Down Expand Up @@ -419,6 +420,11 @@ public int getSyncJobMaxTimeoutDays() {
return Integer.parseInt(getEnvOrDefault(SYNC_JOB_MAX_TIMEOUT_DAYS, "3"));
}

@Override
public boolean connectorSpecificResourceDefaultsEnabled() {
return getEnvOrDefault(CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED, false);
}

/**
* Returns worker pod tolerations parsed from its own environment variable. The value of the env is
* a string that represents one or more tolerations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public class JobScheduler implements Runnable {
this.jobFactory = jobFactory;
}

public JobScheduler(final JobPersistence jobPersistence,
public JobScheduler(final boolean connectorSpecificResourceDefaultsEnabled,
final JobPersistence jobPersistence,
final ConfigRepository configRepository,
final TrackingClient trackingClient,
final WorkerConfigs workerConfigs) {
Expand All @@ -56,6 +57,7 @@ public JobScheduler(final JobPersistence jobPersistence,
configRepository,
new ScheduleJobPredicate(Instant::now),
new DefaultSyncJobFactory(
connectorSpecificResourceDefaultsEnabled,
new DefaultJobCreator(jobPersistence, configRepository, workerConfigs.getResourceRequirements()),
configRepository,
new OAuthConfigSupplier(configRepository, trackingClient)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,12 @@ public void start() throws IOException {
featureFlags);
final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier, maxSyncJobAttempts);
final TrackingClient trackingClient = TrackingClientSingleton.get();
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository, trackingClient, workerConfigs);
final JobScheduler jobScheduler = new JobScheduler(
configs.connectorSpecificResourceDefaultsEnabled(),
jobPersistence,
configRepository,
trackingClient,
workerConfigs);
final JobSubmitter jobSubmitter = new JobSubmitter(
workerThreadPool,
jobPersistence,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ public class DefaultSchedulerJobClient implements SchedulerJobClient {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSchedulerJobClient.class);

private final boolean connectorSpecificResourceDefaultsEnabled;
private final JobPersistence jobPersistence;
private final JobCreator jobCreator;

public DefaultSchedulerJobClient(final JobPersistence jobPersistence, final JobCreator jobCreator) {
public DefaultSchedulerJobClient(final boolean connectorSpecificResourceDefaultsEnabled,
final JobPersistence jobPersistence,
final JobCreator jobCreator) {
this.connectorSpecificResourceDefaultsEnabled = connectorSpecificResourceDefaultsEnabled;
this.jobPersistence = jobPersistence;
this.jobCreator = jobCreator;
}
Expand All @@ -38,9 +42,19 @@ public Job createOrGetActiveSyncJob(final SourceConnection source,
final String sourceDockerImage,
final String destinationDockerImage,
final List<StandardSyncOperation> standardSyncOperations,
@Nullable final ActorDefinitionResourceRequirements sourceResourceRequirements,
@Nullable final ActorDefinitionResourceRequirements destinationResourceRequirements)
@Nullable final ActorDefinitionResourceRequirements ignorableSourceResourceRequirements,
@Nullable final ActorDefinitionResourceRequirements ignorableDestinationResourceRequirements)
throws IOException {

ActorDefinitionResourceRequirements sourceResourceRequirements = ignorableSourceResourceRequirements;
ActorDefinitionResourceRequirements destinationResourceRequirements = ignorableDestinationResourceRequirements;

// for OSS users, make it possible to ignore default actor-level resource requirements
if (!connectorSpecificResourceDefaultsEnabled) {
sourceResourceRequirements = null;
destinationResourceRequirements = null;
}

final Optional<Long> jobIdOptional = jobCreator.createSyncJob(
source,
destination,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void setup() {
jobPersistence = mock(JobPersistence.class);
jobCreator = mock(JobCreator.class);
job = mock(Job.class);
client = spy(new DefaultSchedulerJobClient(jobPersistence, jobCreator));
client = spy(new DefaultSchedulerJobClient(true, jobPersistence, jobCreator));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
Expand All @@ -23,13 +24,16 @@

public class DefaultSyncJobFactory implements SyncJobFactory {

private final boolean connectorSpecificResourceDefaultsEnabled;
private final DefaultJobCreator jobCreator;
private final ConfigRepository configRepository;
private final OAuthConfigSupplier oAuthConfigSupplier;

public DefaultSyncJobFactory(final DefaultJobCreator jobCreator,
public DefaultSyncJobFactory(final boolean connectorSpecificResourceDefaultsEnabled,
final DefaultJobCreator jobCreator,
final ConfigRepository configRepository,
final OAuthConfigSupplier oAuthConfigSupplier) {
this.connectorSpecificResourceDefaultsEnabled = connectorSpecificResourceDefaultsEnabled;
this.jobCreator = jobCreator;
this.configRepository = configRepository;
this.oAuthConfigSupplier = oAuthConfigSupplier;
Expand Down Expand Up @@ -65,15 +69,24 @@ public Long create(final UUID connectionId) {
standardSyncOperations.add(standardSyncOperation);
}

ActorDefinitionResourceRequirements sourceResourceRequirements = sourceDefinition.getResourceRequirements();
ActorDefinitionResourceRequirements destinationResourceRequirements = destinationDefinition.getResourceRequirements();

// for OSS users, make it possible to ignore default actor-level resource requirements
if (!connectorSpecificResourceDefaultsEnabled) {
sourceResourceRequirements = null;
destinationResourceRequirements = null;
}

return jobCreator.createSyncJob(
sourceConnection,
destinationConnection,
standardSync,
sourceImageName,
destinationImageName,
standardSyncOperations,
sourceDefinition.getResourceRequirements(),
destinationDefinition.getResourceRequirements())
sourceResourceRequirements,
destinationResourceRequirements)
.orElseThrow(() -> new IllegalStateException("We shouldn't be trying to create a new sync job if there is one running already."));

} catch (final IOException | JsonValidationException | ConfigNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
.thenReturn(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinitionId).withDockerRepository(dstDockerRepo)
.withDockerImageTag(dstDockerTag));

final SyncJobFactory factory = new DefaultSyncJobFactory(jobCreator, configRepository, mock(OAuthConfigSupplier.class));
final SyncJobFactory factory = new DefaultSyncJobFactory(true, jobCreator, configRepository, mock(OAuthConfigSupplier.class));
final long actualJobId = factory.create(connectionId);
assertEquals(jobId, actualJobId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,9 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
final TemporalClient temporalClient = TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot(), configs);
final OAuthConfigSupplier oAuthConfigSupplier = new OAuthConfigSupplier(configRepository, trackingClient);
final SchedulerJobClient schedulerJobClient =
new DefaultSchedulerJobClient(jobPersistence,
new DefaultSchedulerJobClient(
configs.connectorSpecificResourceDefaultsEnabled(),
jobPersistence,
new DefaultJobCreator(jobPersistence, configRepository, workerConfigs.getResourceRequirements()));
final DefaultSynchronousSchedulerClient syncSchedulerClient =
new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ private static void launchWorkerApp() throws IOException {
configRepository);
final TrackingClient trackingClient = TrackingClientSingleton.get();
final SyncJobFactory jobFactory = new DefaultSyncJobFactory(
configs.connectorSpecificResourceDefaultsEnabled(),
new DefaultJobCreator(jobPersistence, configRepository, defaultWorkerConfigs.getResourceRequirements()),
configRepository,
new OAuthConfigSupplier(configRepository, trackingClient));
Expand Down
1 change: 1 addition & 0 deletions kube/overlays/stable-with-resource-limits/.env
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,4 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY=
# Launch a separate pod to orchestrate sync steps
CONTAINER_ORCHESTRATOR_ENABLED=false

CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED=true
18 changes: 18 additions & 0 deletions kube/overlays/stable-with-resource-limits/set-resource-limits.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ spec:
spec:
containers:
- name: airbyte-scheduler-container
env:
- name: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED
valueFrom:
configMapKeyRef:
name: airbyte-env
key: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED
resources:
limits:
cpu: 2
Expand All @@ -35,6 +41,12 @@ spec:
spec:
containers:
- name: airbyte-worker-container
env:
- name: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED
valueFrom:
configMapKeyRef:
name: airbyte-env
key: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED
resources:
limits:
cpu: 2
Expand All @@ -49,6 +61,12 @@ spec:
spec:
containers:
- name: airbyte-server-container
env:
- name: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED
valueFrom:
configMapKeyRef:
name: airbyte-env
key: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED
resources:
limits:
cpu: 1
Expand Down

0 comments on commit 5aecbc3

Please sign in to comment.