Skip to content

Commit

Permalink
Make number of Concurrent Jobs configurable. (#4687)
Browse files Browse the repository at this point in the history
  • Loading branch information
davinchia authored Jul 19, 2021
1 parent 24be682 commit 8034aa5
Show file tree
Hide file tree
Showing 9 changed files with 31 additions and 5 deletions.
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ LOCAL_DOCKER_MOUNT=/tmp/airbyte_local
# Issue: https://github.com/airbytehq/airbyte/issues/577
HACK_LOCAL_ROOT_PARENT=/tmp

SUBMITTER_NUM_THREADS=10

# Miscellaneous
TRACKING_STRATEGY=segment
WEBAPP_URL=http://localhost:8000/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public interface Configs {

String getKubeNamespace();

String getSubmitterNumThreads();

// Resources
String getCpuRequest();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class EnvConfigs implements Configs {
private static final String TEMPORAL_HOST = "TEMPORAL_HOST";
private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS";
private static final String KUBE_NAMESPACE = "KUBE_NAMESPACE";
private static final String SUBMITTER_NUM_THREADS = "SUBMITTER_NUM_THREADS";
private static final String RESOURCE_CPU_REQUEST = "RESOURCE_CPU_REQUEST";
private static final String RESOURCE_CPU_LIMIT = "RESOURCE_CPU_LIMIT";
private static final String RESOURCE_MEMORY_REQUEST = "RESOURCE_MEMORY_REQUEST";
Expand Down Expand Up @@ -211,6 +212,11 @@ public String getKubeNamespace() {
return getEnvOrDefault(KUBE_NAMESPACE, DEFAULT_KUBE_NAMESPACE);
}

@Override
public String getSubmitterNumThreads() {
return getEnvOrDefault(SUBMITTER_NUM_THREADS, "5");
}

@Override
public String getCpuRequest() {
return getEnvOrDefault(RESOURCE_CPU_REQUEST, DEFAULT_RESOURCE_REQUIREMENT_CPU);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,20 @@

/**
* The SchedulerApp is responsible for finding new scheduled jobs that need to be run and to launch
* them. The current implementation uses a thread pool on the scheduler's machine to launch the
* jobs. One thread is reserved for the job submitter, which is responsible for finding and
* launching new jobs.
* them. The current implementation uses two thread pools to do so. One pool is responsible for all
* job launching operations. The other pool is responsible for clean up operations.
*
* Operations can have thread pools under the hood. An important thread pool to note is that the job
* submitter thread pool. This pool does the work of submitting jobs to temporal - the size of this
* pool determines the number of concurrent jobs that can be run. This is controlled via the
* {@link #SUBMITTER_NUM_THREADS} variable.
*/
public class SchedulerApp {

private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerApp.class);

private static final long GRACEFUL_SHUTDOWN_SECONDS = 30;
private static final int MAX_WORKERS = 4;
private static final int SUBMITTER_NUM_THREADS = Integer.parseInt(new EnvConfigs().getSubmitterNumThreads());
private static final Duration SCHEDULING_DELAY = Duration.ofSeconds(5);
private static final Duration CLEANING_DELAY = Duration.ofHours(2);
private static final ThreadFactory THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("worker-%d").build();
Expand Down Expand Up @@ -121,7 +125,7 @@ public void start() throws IOException {
final TemporalPool temporalPool = new TemporalPool(temporalService, workspaceRoot, processFactory);
temporalPool.run();

final ExecutorService workerThreadPool = Executors.newFixedThreadPool(MAX_WORKERS, THREAD_FACTORY);
final ExecutorService workerThreadPool = Executors.newFixedThreadPool(SUBMITTER_NUM_THREADS, THREAD_FACTORY);
final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();
final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot);
final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier);
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ services:
- AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
- GCP_STORAGE_BUCKET=${GCP_STORAGE_BUCKET}
- LOG_LEVEL=${LOG_LEVEL}
- SUBMITTER_NUM_THREADS=${SUBMITTER_NUM_THREADS}
- RESOURCE_CPU_REQUEST=${RESOURCE_CPU_REQUEST}
- RESOURCE_CPU_LIMIT=${RESOURCE_CPU_LIMIT}
- RESOURCE_MEMORY_REQUEST=${RESOURCE_MEMORY_REQUEST}
Expand Down
2 changes: 2 additions & 0 deletions kube/overlays/dev/.env
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ WORKSPACE_DOCKER_MOUNT=airbyte_workspace

LOCAL_ROOT=/tmp/airbyte_local

SUBMITTER_NUM_THREADS=10

# Miscellaneous
TRACKING_STRATEGY=logging
WEBAPP_URL=airbyte-webapp-svc:80
Expand Down
2 changes: 2 additions & 0 deletions kube/overlays/stable-with-resource-limits/.env
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ WORKSPACE_DOCKER_MOUNT=airbyte_workspace

LOCAL_ROOT=/tmp/airbyte_local

SUBMITTER_NUM_THREADS=10

# Miscellaneous
TRACKING_STRATEGY=segment
WEBAPP_URL=airbyte-webapp-svc:80
Expand Down
2 changes: 2 additions & 0 deletions kube/overlays/stable/.env
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ WORKSPACE_DOCKER_MOUNT=airbyte_workspace

LOCAL_ROOT=/tmp/airbyte_local

SUBMITTER_NUM_THREADS=10

# Miscellaneous
TRACKING_STRATEGY=segment
WEBAPP_URL=airbyte-webapp-svc:80
Expand Down
5 changes: 5 additions & 0 deletions kube/resources/scheduler.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: SUBMITTER_NUM_THREADS
valueFrom:
configMapKeyRef:
name: airbyte-env
key: SUBMITTER_NUM_THREADS
- name: RESOURCE_CPU_REQUEST
valueFrom:
configMapKeyRef:
Expand Down

0 comments on commit 8034aa5

Please sign in to comment.