From 8f240352557761fc7df2490456c9f5bf6dbfcecd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dariusz=20J=C4=99drzejczyk?= Date: Thu, 2 Nov 2023 14:40:46 +0100 Subject: [PATCH] JCStress: Await Scheduler dispose and increase timeouts (#3630) Resolves #3626 --- .../scheduler/BasicSchedulersStressTest.java | 24 +++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/reactor-core/src/jcstress/java/reactor/core/scheduler/BasicSchedulersStressTest.java b/reactor-core/src/jcstress/java/reactor/core/scheduler/BasicSchedulersStressTest.java index aad18a61ef..8c0dac28f2 100644 --- a/reactor-core/src/jcstress/java/reactor/core/scheduler/BasicSchedulersStressTest.java +++ b/reactor-core/src/jcstress/java/reactor/core/scheduler/BasicSchedulersStressTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved. + * Copyright (c) 2022-2023 VMware Inc. or its affiliates, All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ import java.time.Duration; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.openjdk.jcstress.annotations.Actor; import org.openjdk.jcstress.annotations.Arbiter; @@ -29,6 +28,7 @@ import org.openjdk.jcstress.annotations.State; import org.openjdk.jcstress.infra.results.IIZ_Result; import org.openjdk.jcstress.infra.results.Z_Result; +import reactor.core.Disposable; public abstract class BasicSchedulersStressTest { @@ -43,11 +43,15 @@ private static boolean canScheduleTask(Scheduler scheduler) { if (scheduler.isDisposed()) { return false; } - scheduler.schedule(latch::countDown); + Disposable disposable = scheduler.schedule(latch::countDown); boolean taskDone = false; try { - taskDone = latch.await(100, TimeUnit.MILLISECONDS); - } catch (InterruptedException ignored) { + taskDone = latch.await(1, TimeUnit.SECONDS); + } catch (InterruptedException ie) { + throw new RuntimeException(ie); + } + if (((SchedulerTask) disposable).future.isCancelled()) { + throw new RuntimeException("Future cancelled " + disposable); } return taskDone; } @@ -78,7 +82,7 @@ public void arbiter(Z_Result r) { // At this stage, at least one actor called scheduler.start(), // so we should be able to execute a task. r.r1 = canScheduleTask(scheduler); - scheduler.dispose(); + scheduler.disposeGracefully().block(Duration.ofMillis(500)); } } @@ -88,7 +92,7 @@ public void arbiter(Z_Result r) { public static class ParallelSchedulerStartDisposeStressTest { private final ParallelScheduler scheduler = - new ParallelScheduler(4, Thread::new); + new ParallelScheduler(2, Thread::new); { scheduler.init(); @@ -109,7 +113,7 @@ public void arbiter(Z_Result r) { // At this stage, at least one actor called scheduler.start(), // so we should be able to execute a task. r.r1 = canScheduleTask(scheduler); - scheduler.dispose(); + scheduler.disposeGracefully().block(Duration.ofMillis(500));; } } @@ -169,7 +173,7 @@ public static class ParallelSchedulerDisposeGracefullyStressTest { private final CountDownLatch latch = new CountDownLatch(2); private final ParallelScheduler scheduler = - new ParallelScheduler(10, Thread::new); + new ParallelScheduler(2, Thread::new); { scheduler.init(); @@ -263,7 +267,7 @@ public void arbiter(IIZ_Result r) { public static class ParallelSchedulerDisposeGracefullyAndDisposeStressTest { private final ParallelScheduler scheduler = - new ParallelScheduler(10, Thread::new); + new ParallelScheduler(2, Thread::new); { scheduler.init();