diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java index 661954e5227c..b57cffbc7a82 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobScheduler.java @@ -72,7 +72,11 @@ public void run() { private void scheduleSyncJobs() throws IOException { int jobsScheduled = 0; + var start = System.currentTimeMillis(); final List activeConnections = getAllActiveConnections(); + var queryEnd = System.currentTimeMillis(); + LOGGER.debug("Total active connections: {}", activeConnections.size()); + LOGGER.debug("Time to retrieve all connections: {} ms", queryEnd - start); for (StandardSync connection : activeConnections) { final Optional previousJobOptional = jobPersistence.getLastReplicationJob(connection.getConnectionId()); @@ -80,8 +84,11 @@ private void scheduleSyncJobs() throws IOException { if (scheduleJobPredicate.test(previousJobOptional, connection)) { jobFactory.create(connection.getConnectionId()); jobsScheduled++; + SchedulerApp.PENDING_JOBS.getAndIncrement(); } } + var end = System.currentTimeMillis(); + LOGGER.debug("Time taken to schedule jobs: {} ms", end - start); if (jobsScheduled > 0) { LOGGER.info("Job-Scheduler Summary. Active connections: {}, Jobs scheduled this cycle: {}", activeConnections.size(), jobsScheduled); diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobSubmitter.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobSubmitter.java index ca7c3d610ca3..f538712dedcf 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobSubmitter.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/JobSubmitter.java @@ -55,12 +55,14 @@ public JobSubmitter(final ExecutorService threadPool, public void run() { try { LOGGER.debug("Running job-submitter..."); + var start = System.currentTimeMillis(); final Optional nextJob = persistence.getNextJob(); nextJob.ifPresent(attemptJobSubmit()); - LOGGER.debug("Completed Job-Submitter..."); + var end = System.currentTimeMillis(); + LOGGER.debug("Completed Job-Submitter. Time taken: {} ms", end - start); } catch (Throwable e) { LOGGER.error("Job Submitter Error", e); } @@ -87,9 +89,12 @@ synchronized private Consumer attemptJobSubmit() { runningJobs.add(job.getId()); trackSubmission(job); submitJob(job); + var pending = SchedulerApp.PENDING_JOBS.decrementAndGet(); LOGGER.info("Job-Submitter Summary. Submitted job with scope {}", job.getScope()); + LOGGER.debug("Pending jobs: {}", pending); } else { LOGGER.info("Attempting to submit already running job {}. There are probably too many queued jobs.", job.getId()); + LOGGER.debug("Pending jobs: {}", SchedulerApp.PENDING_JOBS.get()); } }; } @@ -112,6 +117,7 @@ void submitJob(Job job) { LogClientSingleton.setJobMdc(workerRun.getJobRoot()); }) .setOnSuccess(output -> { + LOGGER.debug("Job id {} succeeded", job.getId()); if (output.getOutput().isPresent()) { persistence.writeOutput(job.getId(), attemptNumber, output.getOutput().get()); } @@ -133,6 +139,7 @@ void submitJob(Job job) { }) .setOnFinish(() -> { runningJobs.remove(job.getId()); + LOGGER.debug("Job id {} cleared", job.getId()); MDC.clear(); }) .build()); diff --git a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java index 689e26a8327d..52cc8df10fd2 100644 --- a/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java +++ b/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java @@ -41,6 +41,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; @@ -57,6 +58,8 @@ */ public class SchedulerApp { + public static AtomicInteger PENDING_JOBS = new AtomicInteger(); + private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerApp.class); private static final long GRACEFUL_SHUTDOWN_SECONDS = 30; @@ -88,7 +91,9 @@ public SchedulerApp(Path workspaceRoot, public void start() throws IOException { final ExecutorService workerThreadPool = Executors.newFixedThreadPool(SUBMITTER_NUM_THREADS, THREAD_FACTORY); - final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor(); + final ScheduledExecutorService scheduleJobsPool = Executors.newSingleThreadScheduledExecutor(); + final ScheduledExecutorService executeJobsPool = Executors.newSingleThreadScheduledExecutor(); + final ScheduledExecutorService cleanupJobsPool = Executors.newSingleThreadScheduledExecutor(); final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot); final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier); final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository); @@ -105,7 +110,7 @@ public void start() throws IOException { // anymore. cleanupZombies(jobPersistence, jobNotifier); - scheduledPool.scheduleWithFixedDelay( + scheduleJobsPool.scheduleWithFixedDelay( () -> { MDC.setContextMap(mdc); jobRetrier.run(); @@ -116,7 +121,16 @@ public void start() throws IOException { SCHEDULING_DELAY.toSeconds(), TimeUnit.SECONDS); - scheduledPool.scheduleWithFixedDelay( + executeJobsPool.scheduleWithFixedDelay( + () -> { + MDC.setContextMap(mdc); + jobSubmitter.run(); + }, + 0L, + SCHEDULING_DELAY.toSeconds(), + TimeUnit.SECONDS); + + cleanupJobsPool.scheduleWithFixedDelay( () -> { MDC.setContextMap(mdc); jobCleaner.run(); @@ -126,7 +140,8 @@ public void start() throws IOException { CLEANING_DELAY.toSeconds(), TimeUnit.SECONDS); - Runtime.getRuntime().addShutdownHook(new GracefulShutdownHandler(Duration.ofSeconds(GRACEFUL_SHUTDOWN_SECONDS), workerThreadPool, scheduledPool)); + Runtime.getRuntime().addShutdownHook(new GracefulShutdownHandler(Duration.ofSeconds(GRACEFUL_SHUTDOWN_SECONDS), workerThreadPool, + scheduleJobsPool, executeJobsPool, cleanupJobsPool)); } private void cleanupZombies(JobPersistence jobPersistence, JobNotifier jobNotifier) throws IOException {