From e9300229283986e72d5433896885116ab6e9257e Mon Sep 17 00:00:00 2001 From: Scott Mitchell Date: Sun, 7 May 2023 09:42:01 -0700 Subject: [PATCH] Add AsyncRangePublisherTest which demonstrates TCK bug fix --- .../unicast/AsyncRangePublisherTest.java | 175 ++++++++++++++++++ 1 file changed, 175 insertions(+) create mode 100644 examples/src/test/java/org/reactivestreams/example/unicast/AsyncRangePublisherTest.java diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncRangePublisherTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncRangePublisherTest.java new file mode 100644 index 00000000..6d8e9d1d --- /dev/null +++ b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncRangePublisherTest.java @@ -0,0 +1,175 @@ +package org.reactivestreams.example.unicast; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.reactivestreams.tck.PublisherVerification; +import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; + +@Test // Must be here for TestNG to find and run this, do not remove +public class AsyncRangePublisherTest extends PublisherVerification { + private static final int TERMINAL_DELAY_MS = 200; + private static final int DEFAULT_TIMEOUT_MS = TERMINAL_DELAY_MS * 4; + private static final int DEFAULT_POLL_INTERVAL_MS = TERMINAL_DELAY_MS / 2; + private ExecutorService e; + @BeforeClass + void before() { e = Executors.newCachedThreadPool(); } + @AfterClass + void after() { if (e != null) e.shutdown(); } + + public AsyncRangePublisherTest() { + super(new TestEnvironment(DEFAULT_TIMEOUT_MS, 50, DEFAULT_POLL_INTERVAL_MS)); + } + + @Override + public Publisher createPublisher(long elements) { + return new AsyncPublisher(new RangePublisher(1, (int)elements), e); + } + + @Override + public Publisher createFailedPublisher() { + return null; + } + + private static final class AsyncPublisher implements Publisher { + private final Publisher original; + private final Executor executor; + + private AsyncPublisher(Publisher original, Executor executor) { + this.original = requireNonNull(original); + this.executor = requireNonNull(executor); + } + + @Override + public void subscribe(Subscriber s) { + original.subscribe(new AsyncSubscriber(requireNonNull(s), executor)); + } + + private static final class AsyncSubscriber implements Subscriber { + private final BlockingQueue signalQueue = new LinkedBlockingQueue(); + + private AsyncSubscriber(final Subscriber original, final Executor executor) { + try { + executor.execute(new Runnable() { + @Override + public void run() { + for (; ; ) { + try { + final Object signal = signalQueue.take(); + if (signal instanceof Cancelled) { + return; + } else if (signal instanceof TerminalSignal) { + Thread.sleep(TERMINAL_DELAY_MS); + TerminalSignal terminalSignal = (TerminalSignal) signal; + if (terminalSignal.cause == null) { + original.onComplete(); + } else { + original.onError(terminalSignal.cause); + } + return; + } else if (signal instanceof OnSubscribeSignal) { + original.onSubscribe(((OnSubscribeSignal) signal).subscription); + } else { + @SuppressWarnings("unchecked") + final T onNextSignal = ((OnNextSignal) signal).onNext; + original.onNext(onNextSignal); + } + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + }); + } catch (Throwable cause) { + original.onSubscribe(new Subscription() { + @Override + public void request(long n) { + } + + @Override + public void cancel() { + } + }); + original.onError(new IllegalStateException("Executor rejected", cause)); + } + } + + @Override + public void onSubscribe(final Subscription s) { + signalQueue.add(new OnSubscribeSignal(new Subscription() { + @Override + public void request(long n) { + s.request(n); + } + + @Override + public void cancel() { + try { + s.cancel(); + } finally { + signalQueue.add(new Cancelled()); + } + } + })); + } + + @Override + public void onNext(T t) { + signalQueue.add(new OnNextSignal(t)); + } + + @Override + public void onError(Throwable t) { + signalQueue.add(new TerminalSignal(requireNonNull(t))); + } + + @Override + public void onComplete() { + signalQueue.add(new TerminalSignal(null)); + } + } + + private static final class TerminalSignal { + private final Throwable cause; + + private TerminalSignal(Throwable cause) { + this.cause = cause; + } + } + + private static final class OnSubscribeSignal { + private final Subscription subscription; + + private OnSubscribeSignal(Subscription subscription) { + this.subscription = subscription; + } + } + + private static final class OnNextSignal { + private final T onNext; + + private OnNextSignal(T onNext) { + this.onNext = onNext; + } + } + + private static final class Cancelled { + } + + private static T requireNonNull(T o) { + if (o == null) { + throw new NullPointerException(); + } + return o; + } + } +}