-
Notifications
You must be signed in to change notification settings - Fork 530
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
TestEnvironment: Fix timeout arithmetic
Motivation: The timeout calculation decrementing `totalTimeoutRemainingNs` is incorrect which results in premature timeouts and test failures. Signed-off-by: Scott Mitchell <scott_mitchell@apple.com>
- Loading branch information
1 parent
944163a
commit 1c426b4
Showing
2 changed files
with
176 additions
and
1 deletion.
There are no files selected for viewing
175 changes: 175 additions & 0 deletions
175
examples/src/test/java/org/reactivestreams/example/unicast/AsyncRangePublisherTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,175 @@ | ||
package org.reactivestreams.example.unicast; | ||
|
||
import org.reactivestreams.Publisher; | ||
import org.reactivestreams.Subscriber; | ||
import org.reactivestreams.Subscription; | ||
import org.reactivestreams.tck.PublisherVerification; | ||
import org.reactivestreams.tck.TestEnvironment; | ||
import org.testng.annotations.AfterClass; | ||
import org.testng.annotations.BeforeClass; | ||
import org.testng.annotations.Test; | ||
|
||
import java.util.concurrent.BlockingQueue; | ||
import java.util.concurrent.Executor; | ||
import java.util.concurrent.ExecutorService; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.LinkedBlockingQueue; | ||
|
||
@Test // Must be here for TestNG to find and run this, do not remove | ||
public class AsyncRangePublisherTest extends PublisherVerification<Integer> { | ||
private static final int TERMINAL_DELAY_MS = 200; | ||
private static final int DEFAULT_TIMEOUT_MS = TERMINAL_DELAY_MS * 4; | ||
private static final int DEFAULT_POLL_INTERVAL_MS = TERMINAL_DELAY_MS / 2; | ||
private ExecutorService e; | ||
@BeforeClass | ||
void before() { e = Executors.newCachedThreadPool(); } | ||
@AfterClass | ||
void after() { if (e != null) e.shutdown(); } | ||
|
||
public AsyncRangePublisherTest() { | ||
super(new TestEnvironment(DEFAULT_TIMEOUT_MS, 50, DEFAULT_POLL_INTERVAL_MS)); | ||
} | ||
|
||
@Override | ||
public Publisher<Integer> createPublisher(long elements) { | ||
return new AsyncPublisher<Integer>(new RangePublisher(1, (int)elements), e); | ||
} | ||
|
||
@Override | ||
public Publisher<Integer> createFailedPublisher() { | ||
return null; | ||
} | ||
|
||
private static final class AsyncPublisher<T> implements Publisher<T> { | ||
private final Publisher<T> original; | ||
private final Executor executor; | ||
|
||
private AsyncPublisher(Publisher<T> original, Executor executor) { | ||
this.original = requireNonNull(original); | ||
this.executor = requireNonNull(executor); | ||
} | ||
|
||
@Override | ||
public void subscribe(Subscriber<? super T> s) { | ||
original.subscribe(new AsyncSubscriber<T>(requireNonNull(s), executor)); | ||
} | ||
|
||
private static final class AsyncSubscriber<T> implements Subscriber<T> { | ||
private final BlockingQueue<Object> signalQueue = new LinkedBlockingQueue<Object>(); | ||
|
||
private AsyncSubscriber(final Subscriber<? super T> original, final Executor executor) { | ||
try { | ||
executor.execute(new Runnable() { | ||
@Override | ||
public void run() { | ||
for (; ; ) { | ||
try { | ||
final Object signal = signalQueue.take(); | ||
if (signal instanceof Cancelled) { | ||
return; | ||
} else if (signal instanceof TerminalSignal) { | ||
Thread.sleep(TERMINAL_DELAY_MS); | ||
TerminalSignal terminalSignal = (TerminalSignal) signal; | ||
if (terminalSignal.cause == null) { | ||
original.onComplete(); | ||
} else { | ||
original.onError(terminalSignal.cause); | ||
} | ||
return; | ||
} else if (signal instanceof OnSubscribeSignal) { | ||
original.onSubscribe(((OnSubscribeSignal) signal).subscription); | ||
} else { | ||
@SuppressWarnings("unchecked") | ||
final T onNextSignal = ((OnNextSignal<T>) signal).onNext; | ||
original.onNext(onNextSignal); | ||
} | ||
} catch (InterruptedException ex) { | ||
throw new RuntimeException(ex); | ||
} | ||
} | ||
} | ||
}); | ||
} catch (Throwable cause) { | ||
original.onSubscribe(new Subscription() { | ||
@Override | ||
public void request(long n) { | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
} | ||
}); | ||
original.onError(new IllegalStateException("Executor rejected", cause)); | ||
} | ||
} | ||
|
||
@Override | ||
public void onSubscribe(final Subscription s) { | ||
signalQueue.add(new OnSubscribeSignal(new Subscription() { | ||
@Override | ||
public void request(long n) { | ||
s.request(n); | ||
} | ||
|
||
@Override | ||
public void cancel() { | ||
try { | ||
s.cancel(); | ||
} finally { | ||
signalQueue.add(new Cancelled()); | ||
} | ||
} | ||
})); | ||
} | ||
|
||
@Override | ||
public void onNext(T t) { | ||
signalQueue.add(new OnNextSignal<T>(t)); | ||
} | ||
|
||
@Override | ||
public void onError(Throwable t) { | ||
signalQueue.add(new TerminalSignal(requireNonNull(t))); | ||
} | ||
|
||
@Override | ||
public void onComplete() { | ||
signalQueue.add(new TerminalSignal(null)); | ||
} | ||
} | ||
|
||
private static final class TerminalSignal { | ||
private final Throwable cause; | ||
|
||
private TerminalSignal(Throwable cause) { | ||
this.cause = cause; | ||
} | ||
} | ||
|
||
private static final class OnSubscribeSignal { | ||
private final Subscription subscription; | ||
|
||
private OnSubscribeSignal(Subscription subscription) { | ||
this.subscription = subscription; | ||
} | ||
} | ||
|
||
private static final class OnNextSignal<T> { | ||
private final T onNext; | ||
|
||
private OnNextSignal(T onNext) { | ||
this.onNext = onNext; | ||
} | ||
} | ||
|
||
private static final class Cancelled { | ||
} | ||
|
||
private static <T> T requireNonNull(T o) { | ||
if (o == null) { | ||
throw new NullPointerException(); | ||
} | ||
return o; | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters