Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Stress Test #6440

Merged
merged 2 commits into from
Sep 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,23 @@ public void run() {

private void scheduleSyncJobs() throws IOException {
int jobsScheduled = 0;
var start = System.currentTimeMillis();
final List<StandardSync> activeConnections = getAllActiveConnections();
var queryEnd = System.currentTimeMillis();
LOGGER.debug("Total active connections: {}", activeConnections.size());
LOGGER.debug("Time to retrieve all connections: {} ms", queryEnd - start);

for (StandardSync connection : activeConnections) {
final Optional<Job> previousJobOptional = jobPersistence.getLastReplicationJob(connection.getConnectionId());

if (scheduleJobPredicate.test(previousJobOptional, connection)) {
jobFactory.create(connection.getConnectionId());
jobsScheduled++;
SchedulerApp.PENDING_JOBS.getAndIncrement();
}
}
var end = System.currentTimeMillis();
LOGGER.debug("Time taken to schedule jobs: {} ms", end - start);

if (jobsScheduled > 0) {
LOGGER.info("Job-Scheduler Summary. Active connections: {}, Jobs scheduled this cycle: {}", activeConnections.size(), jobsScheduled);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@ public JobSubmitter(final ExecutorService threadPool,
public void run() {
try {
LOGGER.debug("Running job-submitter...");
var start = System.currentTimeMillis();

final Optional<Job> nextJob = persistence.getNextJob();

nextJob.ifPresent(attemptJobSubmit());

LOGGER.debug("Completed Job-Submitter...");
var end = System.currentTimeMillis();
LOGGER.debug("Completed Job-Submitter. Time taken: {} ms", end - start);
} catch (Throwable e) {
LOGGER.error("Job Submitter Error", e);
}
Expand All @@ -107,9 +109,12 @@ synchronized private Consumer<Job> attemptJobSubmit() {
runningJobs.add(job.getId());
trackSubmission(job);
submitJob(job);
var pending = SchedulerApp.PENDING_JOBS.decrementAndGet();
LOGGER.info("Job-Submitter Summary. Submitted job with scope {}", job.getScope());
LOGGER.debug("Pending jobs: {}", pending);
} else {
LOGGER.info("Attempting to submit already running job {}. There are probably too many queued jobs.", job.getId());
LOGGER.debug("Pending jobs: {}", SchedulerApp.PENDING_JOBS.get());
}
};
}
Expand All @@ -132,6 +137,7 @@ void submitJob(Job job) {
LogClientSingleton.setJobMdc(workerRun.getJobRoot());
})
.setOnSuccess(output -> {
LOGGER.debug("Job id {} succeeded", job.getId());
if (output.getOutput().isPresent()) {
persistence.writeOutput(job.getId(), attemptNumber, output.getOutput().get());
}
Expand All @@ -153,6 +159,7 @@ void submitJob(Job job) {
})
.setOnFinish(() -> {
runningJobs.remove(job.getId());
LOGGER.debug("Job id {} cleared", job.getId());
MDC.clear();
})
.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
Expand All @@ -77,6 +78,8 @@
*/
public class SchedulerApp {

public static AtomicInteger PENDING_JOBS = new AtomicInteger();

private static final Logger LOGGER = LoggerFactory.getLogger(SchedulerApp.class);

private static final long GRACEFUL_SHUTDOWN_SECONDS = 30;
Expand Down Expand Up @@ -108,7 +111,9 @@ public SchedulerApp(Path workspaceRoot,

public void start() throws IOException {
final ExecutorService workerThreadPool = Executors.newFixedThreadPool(SUBMITTER_NUM_THREADS, THREAD_FACTORY);
final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();
final ScheduledExecutorService scheduleJobsPool = Executors.newSingleThreadScheduledExecutor();
final ScheduledExecutorService executeJobsPool = Executors.newSingleThreadScheduledExecutor();
final ScheduledExecutorService cleanupJobsPool = Executors.newSingleThreadScheduledExecutor();
final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot);
final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now, jobNotifier);
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository);
Expand All @@ -125,7 +130,7 @@ public void start() throws IOException {
// anymore.
cleanupZombies(jobPersistence, jobNotifier);

scheduledPool.scheduleWithFixedDelay(
scheduleJobsPool.scheduleWithFixedDelay(
() -> {
MDC.setContextMap(mdc);
jobRetrier.run();
Expand All @@ -136,7 +141,16 @@ public void start() throws IOException {
SCHEDULING_DELAY.toSeconds(),
TimeUnit.SECONDS);

scheduledPool.scheduleWithFixedDelay(
executeJobsPool.scheduleWithFixedDelay(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi @jrhizor @cgardens splitting all of these into separate thread pools as per the result of my stress test

() -> {
MDC.setContextMap(mdc);
jobSubmitter.run();
},
0L,
SCHEDULING_DELAY.toSeconds(),
TimeUnit.SECONDS);

cleanupJobsPool.scheduleWithFixedDelay(
() -> {
MDC.setContextMap(mdc);
jobCleaner.run();
Expand All @@ -146,7 +160,8 @@ public void start() throws IOException {
CLEANING_DELAY.toSeconds(),
TimeUnit.SECONDS);

Runtime.getRuntime().addShutdownHook(new GracefulShutdownHandler(Duration.ofSeconds(GRACEFUL_SHUTDOWN_SECONDS), workerThreadPool, scheduledPool));
Runtime.getRuntime().addShutdownHook(new GracefulShutdownHandler(Duration.ofSeconds(GRACEFUL_SHUTDOWN_SECONDS), workerThreadPool,
scheduleJobsPool, executeJobsPool, cleanupJobsPool));
}

private void cleanupZombies(JobPersistence jobPersistence, JobNotifier jobNotifier) throws IOException {
Expand Down