Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

default to no resource limits for OSS #10800

Merged
merged 1 commit into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,13 @@ public interface Configs {
*/
int getSyncJobMaxTimeoutDays();

/**
* Defines whether job creation uses connector-specific resource requirements when spawning jobs.
* Works on both Docker and Kubernetes. Defaults to false for ease of use in OSS trials of Airbyte
* but recommended for production deployments.
*/
boolean connectorSpecificResourceDefaultsEnabled();

/**
* Define the job container's minimum CPU usage. Units follow either Docker or Kubernetes, depending
* on the deployment. Defaults to none.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class EnvConfigs implements Configs {
public static final String JOB_KUBE_CURL_IMAGE = "JOB_KUBE_CURL_IMAGE";
public static final String SYNC_JOB_MAX_ATTEMPTS = "SYNC_JOB_MAX_ATTEMPTS";
public static final String SYNC_JOB_MAX_TIMEOUT_DAYS = "SYNC_JOB_MAX_TIMEOUT_DAYS";
private static final String CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED = "CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED";
private static final String MINIMUM_WORKSPACE_RETENTION_DAYS = "MINIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_SIZE_MB = "MAXIMUM_WORKSPACE_SIZE_MB";
Expand Down Expand Up @@ -419,6 +420,11 @@ public int getSyncJobMaxTimeoutDays() {
return Integer.parseInt(getEnvOrDefault(SYNC_JOB_MAX_TIMEOUT_DAYS, "3"));
}

@Override
public boolean connectorSpecificResourceDefaultsEnabled() {
return getEnvOrDefault(CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED, false);
}

/**
* Returns worker pod tolerations parsed from its own environment variable. The value of the env is
* a string that represents one or more tolerations.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ public class JobScheduler implements Runnable {
this.jobFactory = jobFactory;
}

public JobScheduler(final JobPersistence jobPersistence,
public JobScheduler(final boolean connectorSpecificResourceDefaultsEnabled,
final JobPersistence jobPersistence,
final ConfigRepository configRepository,
final TrackingClient trackingClient,
final WorkerConfigs workerConfigs) {
Expand All @@ -56,6 +57,7 @@ public JobScheduler(final JobPersistence jobPersistence,
configRepository,
new ScheduleJobPredicate(Instant::now),
new DefaultSyncJobFactory(
connectorSpecificResourceDefaultsEnabled,
new DefaultJobCreator(jobPersistence, configRepository, workerConfigs.getResourceRequirements()),
configRepository,
new OAuthConfigSupplier(configRepository, trackingClient)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,12 @@ public void start() throws IOException {
featureFlags);
final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier, maxSyncJobAttempts);
final TrackingClient trackingClient = TrackingClientSingleton.get();
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository, trackingClient, workerConfigs);
final JobScheduler jobScheduler = new JobScheduler(
configs.connectorSpecificResourceDefaultsEnabled(),
jobPersistence,
configRepository,
trackingClient,
workerConfigs);
final JobSubmitter jobSubmitter = new JobSubmitter(
workerThreadPool,
jobPersistence,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ public class DefaultSchedulerJobClient implements SchedulerJobClient {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultSchedulerJobClient.class);

private final boolean connectorSpecificResourceDefaultsEnabled;
private final JobPersistence jobPersistence;
private final JobCreator jobCreator;

public DefaultSchedulerJobClient(final JobPersistence jobPersistence, final JobCreator jobCreator) {
public DefaultSchedulerJobClient(final boolean connectorSpecificResourceDefaultsEnabled,
final JobPersistence jobPersistence,
final JobCreator jobCreator) {
this.connectorSpecificResourceDefaultsEnabled = connectorSpecificResourceDefaultsEnabled;
this.jobPersistence = jobPersistence;
this.jobCreator = jobCreator;
}
Expand All @@ -38,9 +42,19 @@ public Job createOrGetActiveSyncJob(final SourceConnection source,
final String sourceDockerImage,
final String destinationDockerImage,
final List<StandardSyncOperation> standardSyncOperations,
@Nullable final ActorDefinitionResourceRequirements sourceResourceRequirements,
@Nullable final ActorDefinitionResourceRequirements destinationResourceRequirements)
@Nullable final ActorDefinitionResourceRequirements ignorableSourceResourceRequirements,
@Nullable final ActorDefinitionResourceRequirements ignorableDestinationResourceRequirements)
throws IOException {

ActorDefinitionResourceRequirements sourceResourceRequirements = ignorableSourceResourceRequirements;
ActorDefinitionResourceRequirements destinationResourceRequirements = ignorableDestinationResourceRequirements;

// for OSS users, make it possible to ignore default actor-level resource requirements
if (!connectorSpecificResourceDefaultsEnabled) {
sourceResourceRequirements = null;
destinationResourceRequirements = null;
}

final Optional<Long> jobIdOptional = jobCreator.createSyncJob(
source,
destination,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ void setup() {
jobPersistence = mock(JobPersistence.class);
jobCreator = mock(JobCreator.class);
job = mock(Job.class);
client = spy(new DefaultSchedulerJobClient(jobPersistence, jobCreator));
client = spy(new DefaultSchedulerJobClient(true, jobPersistence, jobCreator));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.Lists;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.config.ActorDefinitionResourceRequirements;
import io.airbyte.config.DestinationConnection;
import io.airbyte.config.SourceConnection;
import io.airbyte.config.StandardDestinationDefinition;
Expand All @@ -23,13 +24,16 @@

public class DefaultSyncJobFactory implements SyncJobFactory {

private final boolean connectorSpecificResourceDefaultsEnabled;
private final DefaultJobCreator jobCreator;
private final ConfigRepository configRepository;
private final OAuthConfigSupplier oAuthConfigSupplier;

public DefaultSyncJobFactory(final DefaultJobCreator jobCreator,
public DefaultSyncJobFactory(final boolean connectorSpecificResourceDefaultsEnabled,
final DefaultJobCreator jobCreator,
final ConfigRepository configRepository,
final OAuthConfigSupplier oAuthConfigSupplier) {
this.connectorSpecificResourceDefaultsEnabled = connectorSpecificResourceDefaultsEnabled;
this.jobCreator = jobCreator;
this.configRepository = configRepository;
this.oAuthConfigSupplier = oAuthConfigSupplier;
Expand Down Expand Up @@ -65,15 +69,24 @@ public Long create(final UUID connectionId) {
standardSyncOperations.add(standardSyncOperation);
}

ActorDefinitionResourceRequirements sourceResourceRequirements = sourceDefinition.getResourceRequirements();
ActorDefinitionResourceRequirements destinationResourceRequirements = destinationDefinition.getResourceRequirements();

// for OSS users, make it possible to ignore default actor-level resource requirements
if (!connectorSpecificResourceDefaultsEnabled) {
sourceResourceRequirements = null;
destinationResourceRequirements = null;
}
Comment on lines +72 to +79
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic could be moved into the JobCreator.createSyncJob() method so that it doesn't need to be repeated


return jobCreator.createSyncJob(
sourceConnection,
destinationConnection,
standardSync,
sourceImageName,
destinationImageName,
standardSyncOperations,
sourceDefinition.getResourceRequirements(),
destinationDefinition.getResourceRequirements())
sourceResourceRequirements,
destinationResourceRequirements)
.orElseThrow(() -> new IllegalStateException("We shouldn't be trying to create a new sync job if there is one running already."));

} catch (final IOException | JsonValidationException | ConfigNotFoundException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ void createSyncJobFromConnectionId() throws JsonValidationException, ConfigNotFo
.thenReturn(new StandardDestinationDefinition().withDestinationDefinitionId(destinationDefinitionId).withDockerRepository(dstDockerRepo)
.withDockerImageTag(dstDockerTag));

final SyncJobFactory factory = new DefaultSyncJobFactory(jobCreator, configRepository, mock(OAuthConfigSupplier.class));
final SyncJobFactory factory = new DefaultSyncJobFactory(true, jobCreator, configRepository, mock(OAuthConfigSupplier.class));
final long actualJobId = factory.create(connectionId);
assertEquals(jobId, actualJobId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ public static ServerRunnable getServer(final ServerFactory apiFactory, final Con
final TemporalClient temporalClient = TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot(), configs);
final OAuthConfigSupplier oAuthConfigSupplier = new OAuthConfigSupplier(configRepository, trackingClient);
final SchedulerJobClient schedulerJobClient =
new DefaultSchedulerJobClient(jobPersistence,
new DefaultSchedulerJobClient(
configs.connectorSpecificResourceDefaultsEnabled(),
jobPersistence,
new DefaultJobCreator(jobPersistence, configRepository, workerConfigs.getResourceRequirements()));
final DefaultSynchronousSchedulerClient syncSchedulerClient =
new DefaultSynchronousSchedulerClient(temporalClient, jobTracker, oAuthConfigSupplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ private static void launchWorkerApp() throws IOException {
configRepository);
final TrackingClient trackingClient = TrackingClientSingleton.get();
final SyncJobFactory jobFactory = new DefaultSyncJobFactory(
configs.connectorSpecificResourceDefaultsEnabled(),
new DefaultJobCreator(jobPersistence, configRepository, defaultWorkerConfigs.getResourceRequirements()),
configRepository,
new OAuthConfigSupplier(configRepository, trackingClient));
Expand Down
1 change: 1 addition & 0 deletions kube/overlays/stable-with-resource-limits/.env
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,4 @@ JOB_KUBE_MAIN_CONTAINER_IMAGE_PULL_POLICY=
# Launch a separate pod to orchestrate sync steps
CONTAINER_ORCHESTRATOR_ENABLED=false

CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED=true
18 changes: 18 additions & 0 deletions kube/overlays/stable-with-resource-limits/set-resource-limits.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ spec:
spec:
containers:
- name: airbyte-scheduler-container
env:
- name: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED
valueFrom:
configMapKeyRef:
name: airbyte-env
key: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED
resources:
limits:
cpu: 2
Expand All @@ -35,6 +41,12 @@ spec:
spec:
containers:
- name: airbyte-worker-container
env:
- name: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED
valueFrom:
configMapKeyRef:
name: airbyte-env
key: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED
resources:
limits:
cpu: 2
Expand All @@ -49,6 +61,12 @@ spec:
spec:
containers:
- name: airbyte-server-container
env:
- name: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED
valueFrom:
configMapKeyRef:
name: airbyte-env
key: CONNECTOR_SPECIFIC_RESOURCE_DEFAULTS_ENABLED
resources:
limits:
cpu: 1
Expand Down