Skip to content

Commit

Permalink
Stress Test Improvements (#6440)
Browse files Browse the repository at this point in the history
* Debug logging. Split thread pool.

* Format.
  • Loading branch information
davinchia authored Sep 28, 2021
1 parent 3369237 commit 96aedd3
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,23 @@ public void run() {

private void scheduleSyncJobs() throws IOException {
int jobsScheduled = 0;
var start = System.currentTimeMillis();
final List<StandardSync> activeConnections = getAllActiveConnections();
var queryEnd = System.currentTimeMillis();
LOGGER.debug("Total active connections: {}", activeConnections.size());
LOGGER.debug("Time to retrieve all connections: {} ms", queryEnd - start);

for (StandardSync connection : activeConnections) {
final Optional<Job> previousJobOptional = jobPersistence.getLastReplicationJob(connection.getConnectionId());

if (scheduleJobPredicate.test(previousJobOptional, connection)) {
jobFactory.create(connection.getConnectionId());
jobsScheduled++;
SchedulerApp.PENDING_JOBS.getAndIncrement();
}
}
var end = System.currentTimeMillis();
LOGGER.debug("Time taken to schedule jobs: {} ms", end - start);

if (jobsScheduled > 0) {
LOGGER.info("Job-Scheduler Summary. Active connections: {}, Jobs scheduled this cycle: {}", activeConnections.size(), jobsScheduled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,14 @@ public JobSubmitter(final ExecutorService threadPool,
public void run() {
try {
LOGGER.debug("Running job-submitter...");
var start = System.currentTimeMillis();

final Optional<Job> nextJob = persistence.getNextJob();

nextJob.ifPresent(attemptJobSubmit());

LOGGER.debug("Completed Job-Submitter...");
var end = System.currentTimeMillis();
LOGGER.debug("Completed Job-Submitter. Time taken: {} ms", end - start);
} catch (Throwable e) {
LOGGER.error("Job Submitter Error", e);
}
Expand All @@ -87,9 +89,12 @@ synchronized private Consumer<Job> attemptJobSubmit() {
runningJobs.add(job.getId());
trackSubmission(job);
submitJob(job);
var pending = SchedulerApp.PENDING_JOBS.decrementAndGet();
LOGGER.info("Job-Submitter Summary. Submitted job with scope {}", job.getScope());
LOGGER.debug("Pending jobs: {}", pending);
} else {
LOGGER.info("Attempting to submit already running job {}. There are probably too many queued jobs.", job.getId());
LOGGER.debug("Pending jobs: {}", SchedulerApp.PENDING_JOBS.get());
}
};
}
Expand All @@ -112,6 +117,7 @@ void submitJob(Job job) {
LogClientSingleton.setJobMdc(workerRun.getJobRoot());
})
.setOnSuccess(output -> {
LOGGER.debug("Job id {} succeeded", job.getId());
if (output.getOutput().isPresent()) {
persistence.writeOutput(job.getId(), attemptNumber, output.getOutput().get());
}
Expand All @@ -133,6 +139,7 @@ void submitJob(Job job) {
})
.setOnFinish(() -> {
runningJobs.remove(job.getId());
LOGGER.debug("Job id {} cleared", job.getId());
MDC.clear();
})
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand All @@ -57,6 +58,8 @@
*/
public class SchedulerApp {

public static AtomicInteger PENDING_JOBS = new AtomicInteger();

private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerApp.class);

private static final long GRACEFUL_SHUTDOWN_SECONDS = 30;
Expand Down Expand Up @@ -88,7 +91,9 @@ public SchedulerApp(Path workspaceRoot,

public void start() throws IOException {
final ExecutorService workerThreadPool = Executors.newFixedThreadPool(SUBMITTER_NUM_THREADS, THREAD_FACTORY);
final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();
final ScheduledExecutorService scheduleJobsPool = Executors.newSingleThreadScheduledExecutor();
final ScheduledExecutorService executeJobsPool = Executors.newSingleThreadScheduledExecutor();
final ScheduledExecutorService cleanupJobsPool = Executors.newSingleThreadScheduledExecutor();
final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot);
final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier);
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository);
Expand All @@ -105,7 +110,7 @@ public void start() throws IOException {
// anymore.
cleanupZombies(jobPersistence, jobNotifier);

scheduledPool.scheduleWithFixedDelay(
scheduleJobsPool.scheduleWithFixedDelay(
() -> {
MDC.setContextMap(mdc);
jobRetrier.run();
Expand All @@ -116,7 +121,16 @@ public void start() throws IOException {
SCHEDULING_DELAY.toSeconds(),
TimeUnit.SECONDS);

scheduledPool.scheduleWithFixedDelay(
executeJobsPool.scheduleWithFixedDelay(
() -> {
MDC.setContextMap(mdc);
jobSubmitter.run();
},
0L,
SCHEDULING_DELAY.toSeconds(),
TimeUnit.SECONDS);

cleanupJobsPool.scheduleWithFixedDelay(
() -> {
MDC.setContextMap(mdc);
jobCleaner.run();
Expand All @@ -126,7 +140,8 @@ public void start() throws IOException {
CLEANING_DELAY.toSeconds(),
TimeUnit.SECONDS);

Runtime.getRuntime().addShutdownHook(new GracefulShutdownHandler(Duration.ofSeconds(GRACEFUL_SHUTDOWN_SECONDS), workerThreadPool, scheduledPool));
Runtime.getRuntime().addShutdownHook(new GracefulShutdownHandler(Duration.ofSeconds(GRACEFUL_SHUTDOWN_SECONDS), workerThreadPool,
scheduleJobsPool, executeJobsPool, cleanupJobsPool));
}

private void cleanupZombies(JobPersistence jobPersistence, JobNotifier jobNotifier) throws IOException {
Expand Down

0 comments on commit 96aedd3

Please sign in to comment.