From ce0e13e334e622e91c326bc9241a607601fd8aba Mon Sep 17 00:00:00 2001 From: Tomas Tulka Date: Mon, 12 Jun 2017 08:35:54 +0200 Subject: [PATCH 1/7] Asynchronous Stream Publisher - Example Based on Supplier, for the case you don't know the count of elements (for example streaming from a file, etc). Publisher is controlled by the null value - enf of stream. --- .../example/unicast/AsyncStreamPublisher.java | 285 ++++++++++++++++++ .../unicast/AsyncStreamPublisherTest.java | 73 +++++ 2 files changed, 358 insertions(+) create mode 100644 examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java create mode 100644 examples/src/test/java/org/reactivestreams/example/unicast/AsyncStreamPublisherTest.java diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java new file mode 100644 index 00000000..18c60176 --- /dev/null +++ b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java @@ -0,0 +1,285 @@ +/************************************************************************ +* Licensed under Public Domain (CC0) * +* * +* To the extent possible under law, the person who associated CC0 with * +* this code has waived all copyright and related or neighboring * +* rights to this code. * +* * +* You should have received a copy of the CC0 legalcode along with this * +* work. If not, see .* +************************************************************************/ + +package org.reactivestreams.example.unicast; + +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** + * AsyncStreamPublisher is an implementation of Reactive Streams `Publisher` + * which executes asynchronously, using a provided `Executor` and produces elements + * from a given `Supplier` in a "unicast" configuration to its `Subscribers`. + *

+ * NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden. + */ +class AsyncStreamPublisher implements Publisher { + + private final static int DEFAULT_BATCHSIZE = 1024; + + private final Supplier supplier; // This is our data source / generator + private final Executor executor; // This is our thread pool, which will make sure that our Publisher runs asynchronously to its Subscribers + private final int batchSize; // In general, if one uses an `Executor`, one should be nice nad not hog a thread for too long, this is the cap for that, in elements + + public AsyncStreamPublisher(final Supplier supplier, final Executor executor) { + this(supplier, DEFAULT_BATCHSIZE, executor); + } + + public AsyncStreamPublisher(final Supplier supplier, final int batchSize, final Executor executor) { + if (supplier == null) { + throw null; + } + if (executor == null) { + throw null; + } + if (batchSize < 1) { + throw new IllegalArgumentException("batchSize must be greater than zero!"); + } + this.supplier = supplier; + this.executor = executor; + this.batchSize = batchSize; + } + + @Override + public void subscribe(final Subscriber s) { + // As per rule 1.11, we have decided to support multiple subscribers in a unicast configuration + // for this `Publisher` implementation. + // As per 2.13, this method must return normally (i.e. not throw) + new SubscriptionImpl(s).init(); + } + + // These represent the protocol of the `AsyncIterablePublishers` SubscriptionImpls + static interface Signal { + } + + enum Cancel implements Signal {Instance} + + enum Subscribe implements Signal {Instance} + + enum Send implements Signal {Instance} + + static final class Request implements Signal { + final long n; + + Request(final long n) { + this.n = n; + } + } + + // This is our implementation of the Reactive Streams `Subscription`, + // which represents the association between a `Publisher` and a `Subscriber`. + final class SubscriptionImpl implements Subscription, Runnable { + final Subscriber subscriber; // We need a reference to the `Subscriber` so we can talk to it + private boolean cancelled = false; // This flag will track whether this `Subscription` is to be considered cancelled or not + private long demand = 0; // Here we track the current demand, i.e. what has been requested but not yet delivered + private T nextElementToBeSent = null; // we need to fetch an element by the subscription initialization to ensure the stream is not empty + + SubscriptionImpl(final Subscriber subscriber) { + // As per rule 1.09, we need to throw a `java.lang.NullPointerException` if the `Subscriber` is `null` + if (subscriber == null) { + throw null; + } + this.subscriber = subscriber; + } + + // This `ConcurrentLinkedQueue` will track signals that are sent to this `Subscription`, like `request` and `cancel` + private final ConcurrentLinkedQueue inboundSignals = new ConcurrentLinkedQueue(); + + // We are using this `AtomicBoolean` to make sure that this `Subscription` doesn't run concurrently with itself, + // which would violate rule 1.3 among others (no concurrent notifications). + private final AtomicBoolean on = new AtomicBoolean(false); + + // This method will register inbound demand from our `Subscriber` and validate it against rule 3.9 and rule 3.17 + private void doRequest(final long n) { + if (n < 1) { + terminateDueTo(new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements.")); + } else if (demand + n < 1) { + // As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as "effectively unbounded" + demand = Long.MAX_VALUE; // Here we protect from the overflow and treat it as "effectively unbounded" + doSend(); // Then we proceed with sending data downstream + } else { + demand += n; // Here we record the downstream demand + doSend(); // Then we can proceed with sending data downstream + } + } + + // This handles cancellation requests, and is idempotent, thread-safe and not synchronously performing heavy computations as specified in rule 3.5 + private void doCancel() { + cancelled = true; + } + + // Instead of executing `subscriber.onSubscribe` synchronously from within `Publisher.subscribe` + // we execute it asynchronously, this is to avoid executing the user code (`Iterable.iterator`) on the calling thread. + // It also makes it easier to follow rule 1.9 + private void doSubscribe() { + if (!cancelled) { + // Deal with setting up the subscription with the subscriber + try { + subscriber.onSubscribe(this); + } catch (final Throwable t) { // Due diligence to obey 2.13 + terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t)); + } + + // Deal with already complete iterators promptly + boolean hasElements = false; + try { + // Try to fetch an element from a stream to ensure the stream is not empty, + // this will be sent by the first calling of doSend + nextElementToBeSent = supplier.get(); + hasElements = nextElementToBeSent != null; + } catch (final Throwable t) { + terminateDueTo(t); // If hasNext throws, there's something wrong and we need to signal onError as per 1.2, 1.4 + return; + } + + // If we don't have anything to deliver, we're already done, so lets do the right thing and + // not wait for demand to deliver `onComplete` as per rule 1.2 and 1.3 + if (!hasElements) { + try { + doCancel(); // Rule 1.6 says we need to consider the `Subscription` cancelled when `onComplete` is signalled + subscriber.onComplete(); + } catch (final Throwable t) { // As per rule 2.13, `onComplete` is not allowed to throw exceptions, so we do what we can, and log this. + (new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t)).printStackTrace(System.err); + } + } + } + } + + // This is our behavior for producing elements downstream + private void doSend() { + try { + // In order to play nice with the `Executor` we will only send at-most `batchSize` before + // rescheduing ourselves and relinquishing the current thread. + int leftInBatch = batchSize; + do { + T next; + boolean hasNext; + try { + next = supplier.get(); // We have already checked `hasNext` when subscribing, so we can fall back to testing -after- `next` is called. + hasNext = next != null; // Need to keep track of End-of-Stream + } catch (final Throwable t) { + terminateDueTo(t); // If `next` or `hasNext` throws (they can, since it is user-provided), we need to treat the stream as errored as per rule 1.4 + return; + } finally { + subscriber.onNext(nextElementToBeSent); // Then we signal the next element downstream to the `Subscriber` + } + nextElementToBeSent = next; // The next element is the actually read element + if (!hasNext) { // If we are at End-of-Stream + doCancel(); // We need to consider this `Subscription` as cancelled as per rule 1.6 + subscriber.onComplete(); // Then we signal `onComplete` as per rule 1.2 and 1.5 + } + } while (!cancelled // This makes sure that rule 1.8 is upheld, i.e. we need to stop signalling "eventually" + && --leftInBatch > 0 // This makes sure that we only send `batchSize` number of elements in one go (so we can yield to other Runnables) + && --demand > 0); // This makes sure that rule 1.1 is upheld (sending more than was demanded) + + if (!cancelled && demand > 0) { // If the `Subscription` is still alive and well, and we have demand to satisfy, we signal ourselves to send more data + signal(Send.Instance); + } + } catch (final Throwable t) { + // We can only getNextResult here if `onNext` or `onComplete` threw, and they are not allowed to according to 2.13, so we can only cancel and log here. + doCancel(); // Make sure that we are cancelled, since we cannot do anything else since the `Subscriber` is faulty. + (new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext or onComplete.", t)).printStackTrace(System.err); + } + } + + // This is a helper method to ensure that we always `cancel` when we signal `onError` as per rule 1.6 + private void terminateDueTo(final Throwable t) { + cancelled = true; // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6 + try { + subscriber.onError(t); // Then we signal the error downstream, to the `Subscriber` + } catch (final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.9, and all we can do is to log it. + (new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err); + } + } + + // What `signal` does is that it sends signals to the `Subscription` asynchronously + private void signal(final Signal signal) { + if (inboundSignals.offer(signal)) { // No need to null-check here as ConcurrentLinkedQueue does this for us + tryScheduleToExecute(); // Then we try to schedule it for execution, if it isn't already + } + } + + // This is the main "event loop" if you so will + @Override + public final void run() { + if (on.get()) { // establishes a happens-before relationship with the end of the previous run + try { + final Signal s = inboundSignals.poll(); // We take a signal off the queue + if (!cancelled) { // to make sure that we follow rule 1.8, 3.6 and 3.7 + // Below we simply unpack the `Signal`s and invoke the corresponding methods + if (s instanceof Request) { + doRequest(((Request) s).n); + } else if (s == Send.Instance) { + doSend(); + } else if (s == Cancel.Instance) { + doCancel(); + } else if (s == Subscribe.Instance) { + doSubscribe(); + } + } + } finally { + on.set(false); // establishes a happens-before relationship with the beginning of the next run + if (!inboundSignals.isEmpty()) { // If we still have signals to process + tryScheduleToExecute(); // Then we try to schedule ourselves to execute again + } + } + } + } + + // This method makes sure that this `Subscription` is only running on one Thread at a time, + // this is important to make sure that we follow rule 1.3 + private final void tryScheduleToExecute() { + if (on.compareAndSet(false, true)) { + try { + executor.execute(this); + } catch (Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully + if (!cancelled) { + doCancel(); // First of all, this failure is not recoverable, so we need to follow rule 1.4 and 1.6 + try { + terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t)); + } finally { + inboundSignals.clear(); // We're not going to need these anymore + // This subscription is cancelled by now, but letting it become schedulable again means + // that we can drain the inboundSignals queue if anything arrives after clearing + on.set(false); + } + } + } + } + } + + // Our implementation of `Subscription.request` sends a signal to the Subscription that more elements are in demand + @Override + public void request(final long n) { + signal(new Request(n)); + } + + // Our implementation of `Subscription.cancel` sends a signal to the Subscription that the `Subscriber` is not interested in any more elements + @Override + public void cancel() { + signal(Cancel.Instance); + } + + // The reason for the `executeQuery` method is that we want to ensure the `SubscriptionImpl` + // is completely constructed before it is exposed to the thread pool, therefor this + // method is only intended to be invoked once, and immediately after the constructor has + // finished. + void init() { + signal(Subscribe.Instance); + } + } +} \ No newline at end of file diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncStreamPublisherTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncStreamPublisherTest.java new file mode 100644 index 00000000..c17b1fdc --- /dev/null +++ b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncStreamPublisherTest.java @@ -0,0 +1,73 @@ +/************************************************************************ +* Licensed under Public Domain (CC0) * +* * +* To the extent possible under law, the person who associated CC0 with * +* this code has waived all copyright and related or neighboring * +* rights to this code. * +* * +* You should have received a copy of the CC0 legalcode along with this * +* work. If not, see .* +************************************************************************/ + +package org.reactivestreams.example.unicast; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.Supplier; + +import org.reactivestreams.Publisher; +import org.reactivestreams.tck.PublisherVerification; +import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test // Must be here for TestNG to find and run this, do not remove +public class AsyncStreamPublisherTest extends PublisherVerification { + + private ExecutorService e; + + @BeforeClass + void before() { + e = Executors.newFixedThreadPool(4); + } + + @AfterClass + void after() { + if (e != null) { + e.shutdown(); + } + } + + public AsyncStreamPublisherTest() { + super(new TestEnvironment()); + } + + @SuppressWarnings("unchecked") + @Override + public Publisher createPublisher(final long elements) { + assert (elements <= maxElementsFromPublisher()); + return new AsyncStreamPublisher(new Supplier() { + private int at; + @Override + public Integer get() { + return at < elements ? at++ : null; + } + }, e); + } + + @Override + public Publisher createFailedPublisher() { + return new AsyncStreamPublisher(new Supplier() { + @Override + public Integer get() { + throw new RuntimeException("Error state signal!"); + } + }, e); + } + + @Override + public long maxElementsFromPublisher() { + return Integer.MAX_VALUE; + } +} \ No newline at end of file From b3eff1d688eb20e91d77f6054833dcf9f4894802 Mon Sep 17 00:00:00 2001 From: Tomas Tulka Date: Mon, 12 Jun 2017 08:51:50 +0200 Subject: [PATCH 2/7] Adapting to Java 1.6 --- .../example/unicast/AsyncStreamPublisher.java | 13 ++++++++----- .../example/unicast/AsyncStreamPublisherTest.java | 7 ++++--- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java index 18c60176..6eb4c4cb 100644 --- a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java +++ b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java @@ -14,7 +14,6 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.Supplier; import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; @@ -23,7 +22,7 @@ /** * AsyncStreamPublisher is an implementation of Reactive Streams `Publisher` * which executes asynchronously, using a provided `Executor` and produces elements - * from a given `Supplier` in a "unicast" configuration to its `Subscribers`. + * from a given `StreamSupplier` in a "unicast" configuration to its `Subscribers`. *

* NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden. */ @@ -31,15 +30,15 @@ class AsyncStreamPublisher implements Publisher { private final static int DEFAULT_BATCHSIZE = 1024; - private final Supplier supplier; // This is our data source / generator + private final StreamSupplier supplier; // This is our data source / generator private final Executor executor; // This is our thread pool, which will make sure that our Publisher runs asynchronously to its Subscribers private final int batchSize; // In general, if one uses an `Executor`, one should be nice nad not hog a thread for too long, this is the cap for that, in elements - public AsyncStreamPublisher(final Supplier supplier, final Executor executor) { + public AsyncStreamPublisher(final StreamSupplier supplier, final Executor executor) { this(supplier, DEFAULT_BATCHSIZE, executor); } - public AsyncStreamPublisher(final Supplier supplier, final int batchSize, final Executor executor) { + public AsyncStreamPublisher(final StreamSupplier supplier, final int batchSize, final Executor executor) { if (supplier == null) { throw null; } @@ -61,6 +60,10 @@ public void subscribe(final Subscriber s) { // As per 2.13, this method must return normally (i.e. not throw) new SubscriptionImpl(s).init(); } + + static interface StreamSupplier { + T get(); + } // These represent the protocol of the `AsyncIterablePublishers` SubscriptionImpls static interface Signal { diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncStreamPublisherTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncStreamPublisherTest.java index c17b1fdc..dc68a488 100644 --- a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncStreamPublisherTest.java +++ b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncStreamPublisherTest.java @@ -13,7 +13,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.function.Supplier; import org.reactivestreams.Publisher; import org.reactivestreams.tck.PublisherVerification; @@ -22,6 +21,8 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.reactivestreams.example.unicast.AsyncStreamPublisher.StreamSupplier; + @Test // Must be here for TestNG to find and run this, do not remove public class AsyncStreamPublisherTest extends PublisherVerification { @@ -47,7 +48,7 @@ public AsyncStreamPublisherTest() { @Override public Publisher createPublisher(final long elements) { assert (elements <= maxElementsFromPublisher()); - return new AsyncStreamPublisher(new Supplier() { + return new AsyncStreamPublisher(new StreamSupplier() { private int at; @Override public Integer get() { @@ -58,7 +59,7 @@ public Integer get() { @Override public Publisher createFailedPublisher() { - return new AsyncStreamPublisher(new Supplier() { + return new AsyncStreamPublisher(new StreamSupplier() { @Override public Integer get() { throw new RuntimeException("Error state signal!"); From fb3a7d61a42b93a7182980bbbdf81d39952fb069 Mon Sep 17 00:00:00 2001 From: Tomas Tulka Date: Mon, 12 Jun 2017 11:35:25 +0200 Subject: [PATCH 3/7] review #1 cancelled being true here is impossible because the only way to get that true is by submitting a Subscription to the Subscriber that happens once and under this condition. --- .../example/unicast/AsyncStreamPublisher.java | 54 +++++++++---------- 1 file changed, 26 insertions(+), 28 deletions(-) diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java index 6eb4c4cb..7e185a2f 100644 --- a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java +++ b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java @@ -126,38 +126,36 @@ private void doCancel() { } // Instead of executing `subscriber.onSubscribe` synchronously from within `Publisher.subscribe` - // we execute it asynchronously, this is to avoid executing the user code (`Iterable.iterator`) on the calling thread. + // we execute it asynchronously, this is to avoid executing the user code (`supplier`) on the calling thread. // It also makes it easier to follow rule 1.9 private void doSubscribe() { - if (!cancelled) { - // Deal with setting up the subscription with the subscriber - try { - subscriber.onSubscribe(this); - } catch (final Throwable t) { // Due diligence to obey 2.13 - terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t)); - } + // Deal with setting up the subscription with the subscriber + try { + subscriber.onSubscribe(this); + } catch (final Throwable t) { // Due diligence to obey 2.13 + terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t)); + } - // Deal with already complete iterators promptly - boolean hasElements = false; - try { - // Try to fetch an element from a stream to ensure the stream is not empty, - // this will be sent by the first calling of doSend - nextElementToBeSent = supplier.get(); - hasElements = nextElementToBeSent != null; - } catch (final Throwable t) { - terminateDueTo(t); // If hasNext throws, there's something wrong and we need to signal onError as per 1.2, 1.4 - return; - } + // Deal with already complete iterators promptly + boolean hasElements = false; + try { + // Try to fetch an element from a stream to ensure the stream is not empty, + // this will be sent by the first calling of doSend + nextElementToBeSent = supplier.get(); + hasElements = nextElementToBeSent != null; + } catch (final Throwable t) { + terminateDueTo(t); // If hasNext throws, there's something wrong and we need to signal onError as per 1.2, 1.4 + return; + } - // If we don't have anything to deliver, we're already done, so lets do the right thing and - // not wait for demand to deliver `onComplete` as per rule 1.2 and 1.3 - if (!hasElements) { - try { - doCancel(); // Rule 1.6 says we need to consider the `Subscription` cancelled when `onComplete` is signalled - subscriber.onComplete(); - } catch (final Throwable t) { // As per rule 2.13, `onComplete` is not allowed to throw exceptions, so we do what we can, and log this. - (new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t)).printStackTrace(System.err); - } + // If we don't have anything to deliver, we're already done, so lets do the right thing and + // not wait for demand to deliver `onComplete` as per rule 1.2 and 1.3 + if (!hasElements) { + try { + doCancel(); // Rule 1.6 says we need to consider the `Subscription` cancelled when `onComplete` is signalled + subscriber.onComplete(); + } catch (final Throwable t) { // As per rule 2.13, `onComplete` is not allowed to throw exceptions, so we do what we can, and log this. + (new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t)).printStackTrace(System.err); } } } From 1355ef0029d610a2dabaa72ec91aa9ef44e70b1d Mon Sep 17 00:00:00 2001 From: Tomas Tulka Date: Mon, 12 Jun 2017 11:44:02 +0200 Subject: [PATCH 4/7] review #2 The problem with crashing onXXX is that you may end up in an undefined state where even calling onError fails. --- .../example/unicast/AsyncStreamPublisher.java | 41 ++++++++++--------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java index 7e185a2f..ab30ab63 100644 --- a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java +++ b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java @@ -135,27 +135,28 @@ private void doSubscribe() { } catch (final Throwable t) { // Due diligence to obey 2.13 terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t)); } - - // Deal with already complete iterators promptly - boolean hasElements = false; - try { - // Try to fetch an element from a stream to ensure the stream is not empty, - // this will be sent by the first calling of doSend - nextElementToBeSent = supplier.get(); - hasElements = nextElementToBeSent != null; - } catch (final Throwable t) { - terminateDueTo(t); // If hasNext throws, there's something wrong and we need to signal onError as per 1.2, 1.4 - return; - } - - // If we don't have anything to deliver, we're already done, so lets do the right thing and - // not wait for demand to deliver `onComplete` as per rule 1.2 and 1.3 - if (!hasElements) { + if (!cancelled) { + // Deal with already complete iterators promptly + boolean hasElements = false; try { - doCancel(); // Rule 1.6 says we need to consider the `Subscription` cancelled when `onComplete` is signalled - subscriber.onComplete(); - } catch (final Throwable t) { // As per rule 2.13, `onComplete` is not allowed to throw exceptions, so we do what we can, and log this. - (new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t)).printStackTrace(System.err); + // Try to fetch an element from a stream to ensure the stream is not empty, + // this will be sent by the first calling of doSend + nextElementToBeSent = supplier.get(); + hasElements = nextElementToBeSent != null; + } catch (final Throwable t) { + terminateDueTo(t); // If hasNext throws, there's something wrong and we need to signal onError as per 1.2, 1.4 + return; + } + + // If we don't have anything to deliver, we're already done, so lets do the right thing and + // not wait for demand to deliver `onComplete` as per rule 1.2 and 1.3 + if (!hasElements) { + try { + doCancel(); // Rule 1.6 says we need to consider the `Subscription` cancelled when `onComplete` is signalled + subscriber.onComplete(); + } catch (final Throwable t) { // As per rule 2.13, `onComplete` is not allowed to throw exceptions, so we do what we can, and log this. + (new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t)).printStackTrace(System.err); + } } } } From cc968012ac9ac4aea55dce512515485be9aa39a6 Mon Sep 17 00:00:00 2001 From: Tomas Tulka Date: Mon, 12 Jun 2017 11:48:44 +0200 Subject: [PATCH 5/7] comment text adapted --- .../reactivestreams/example/unicast/AsyncStreamPublisher.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java index ab30ab63..f099bb49 100644 --- a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java +++ b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java @@ -136,7 +136,7 @@ private void doSubscribe() { terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t)); } if (!cancelled) { - // Deal with already complete iterators promptly + // Deal with already complete suppliers promptly boolean hasElements = false; try { // Try to fetch an element from a stream to ensure the stream is not empty, From 083fbca16b827dd4c9ddf69491f369b802ca751b Mon Sep 17 00:00:00 2001 From: Tomas Tulka Date: Mon, 12 Jun 2017 20:57:42 +0200 Subject: [PATCH 6/7] supplier publisher baked on the interable publisher --- .../example/unicast/AsyncStreamPublisher.java | 287 ------------------ .../example/unicast/SupplierPublisher.java | 46 +++ .../unicast/AsyncStreamPublisherTest.java | 74 ----- .../unicast/SupplierPublisherTest.java | 59 ++++ 4 files changed, 105 insertions(+), 361 deletions(-) delete mode 100644 examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java create mode 100644 examples/src/main/java/org/reactivestreams/example/unicast/SupplierPublisher.java delete mode 100644 examples/src/test/java/org/reactivestreams/example/unicast/AsyncStreamPublisherTest.java create mode 100644 examples/src/test/java/org/reactivestreams/example/unicast/SupplierPublisherTest.java diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java b/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java deleted file mode 100644 index f099bb49..00000000 --- a/examples/src/main/java/org/reactivestreams/example/unicast/AsyncStreamPublisher.java +++ /dev/null @@ -1,287 +0,0 @@ -/************************************************************************ -* Licensed under Public Domain (CC0) * -* * -* To the extent possible under law, the person who associated CC0 with * -* this code has waived all copyright and related or neighboring * -* rights to this code. * -* * -* You should have received a copy of the CC0 legalcode along with this * -* work. If not, see .* -************************************************************************/ - -package org.reactivestreams.example.unicast; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.atomic.AtomicBoolean; - -import org.reactivestreams.Publisher; -import org.reactivestreams.Subscriber; -import org.reactivestreams.Subscription; - -/** - * AsyncStreamPublisher is an implementation of Reactive Streams `Publisher` - * which executes asynchronously, using a provided `Executor` and produces elements - * from a given `StreamSupplier` in a "unicast" configuration to its `Subscribers`. - *

- * NOTE: The code below uses a lot of try-catches to show the reader where exceptions can be expected, and where they are forbidden. - */ -class AsyncStreamPublisher implements Publisher { - - private final static int DEFAULT_BATCHSIZE = 1024; - - private final StreamSupplier supplier; // This is our data source / generator - private final Executor executor; // This is our thread pool, which will make sure that our Publisher runs asynchronously to its Subscribers - private final int batchSize; // In general, if one uses an `Executor`, one should be nice nad not hog a thread for too long, this is the cap for that, in elements - - public AsyncStreamPublisher(final StreamSupplier supplier, final Executor executor) { - this(supplier, DEFAULT_BATCHSIZE, executor); - } - - public AsyncStreamPublisher(final StreamSupplier supplier, final int batchSize, final Executor executor) { - if (supplier == null) { - throw null; - } - if (executor == null) { - throw null; - } - if (batchSize < 1) { - throw new IllegalArgumentException("batchSize must be greater than zero!"); - } - this.supplier = supplier; - this.executor = executor; - this.batchSize = batchSize; - } - - @Override - public void subscribe(final Subscriber s) { - // As per rule 1.11, we have decided to support multiple subscribers in a unicast configuration - // for this `Publisher` implementation. - // As per 2.13, this method must return normally (i.e. not throw) - new SubscriptionImpl(s).init(); - } - - static interface StreamSupplier { - T get(); - } - - // These represent the protocol of the `AsyncIterablePublishers` SubscriptionImpls - static interface Signal { - } - - enum Cancel implements Signal {Instance} - - enum Subscribe implements Signal {Instance} - - enum Send implements Signal {Instance} - - static final class Request implements Signal { - final long n; - - Request(final long n) { - this.n = n; - } - } - - // This is our implementation of the Reactive Streams `Subscription`, - // which represents the association between a `Publisher` and a `Subscriber`. - final class SubscriptionImpl implements Subscription, Runnable { - final Subscriber subscriber; // We need a reference to the `Subscriber` so we can talk to it - private boolean cancelled = false; // This flag will track whether this `Subscription` is to be considered cancelled or not - private long demand = 0; // Here we track the current demand, i.e. what has been requested but not yet delivered - private T nextElementToBeSent = null; // we need to fetch an element by the subscription initialization to ensure the stream is not empty - - SubscriptionImpl(final Subscriber subscriber) { - // As per rule 1.09, we need to throw a `java.lang.NullPointerException` if the `Subscriber` is `null` - if (subscriber == null) { - throw null; - } - this.subscriber = subscriber; - } - - // This `ConcurrentLinkedQueue` will track signals that are sent to this `Subscription`, like `request` and `cancel` - private final ConcurrentLinkedQueue inboundSignals = new ConcurrentLinkedQueue(); - - // We are using this `AtomicBoolean` to make sure that this `Subscription` doesn't run concurrently with itself, - // which would violate rule 1.3 among others (no concurrent notifications). - private final AtomicBoolean on = new AtomicBoolean(false); - - // This method will register inbound demand from our `Subscriber` and validate it against rule 3.9 and rule 3.17 - private void doRequest(final long n) { - if (n < 1) { - terminateDueTo(new IllegalArgumentException(subscriber + " violated the Reactive Streams rule 3.9 by requesting a non-positive number of elements.")); - } else if (demand + n < 1) { - // As governed by rule 3.17, when demand overflows `Long.MAX_VALUE` we treat the signalled demand as "effectively unbounded" - demand = Long.MAX_VALUE; // Here we protect from the overflow and treat it as "effectively unbounded" - doSend(); // Then we proceed with sending data downstream - } else { - demand += n; // Here we record the downstream demand - doSend(); // Then we can proceed with sending data downstream - } - } - - // This handles cancellation requests, and is idempotent, thread-safe and not synchronously performing heavy computations as specified in rule 3.5 - private void doCancel() { - cancelled = true; - } - - // Instead of executing `subscriber.onSubscribe` synchronously from within `Publisher.subscribe` - // we execute it asynchronously, this is to avoid executing the user code (`supplier`) on the calling thread. - // It also makes it easier to follow rule 1.9 - private void doSubscribe() { - // Deal with setting up the subscription with the subscriber - try { - subscriber.onSubscribe(this); - } catch (final Throwable t) { // Due diligence to obey 2.13 - terminateDueTo(new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onSubscribe.", t)); - } - if (!cancelled) { - // Deal with already complete suppliers promptly - boolean hasElements = false; - try { - // Try to fetch an element from a stream to ensure the stream is not empty, - // this will be sent by the first calling of doSend - nextElementToBeSent = supplier.get(); - hasElements = nextElementToBeSent != null; - } catch (final Throwable t) { - terminateDueTo(t); // If hasNext throws, there's something wrong and we need to signal onError as per 1.2, 1.4 - return; - } - - // If we don't have anything to deliver, we're already done, so lets do the right thing and - // not wait for demand to deliver `onComplete` as per rule 1.2 and 1.3 - if (!hasElements) { - try { - doCancel(); // Rule 1.6 says we need to consider the `Subscription` cancelled when `onComplete` is signalled - subscriber.onComplete(); - } catch (final Throwable t) { // As per rule 2.13, `onComplete` is not allowed to throw exceptions, so we do what we can, and log this. - (new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onComplete.", t)).printStackTrace(System.err); - } - } - } - } - - // This is our behavior for producing elements downstream - private void doSend() { - try { - // In order to play nice with the `Executor` we will only send at-most `batchSize` before - // rescheduing ourselves and relinquishing the current thread. - int leftInBatch = batchSize; - do { - T next; - boolean hasNext; - try { - next = supplier.get(); // We have already checked `hasNext` when subscribing, so we can fall back to testing -after- `next` is called. - hasNext = next != null; // Need to keep track of End-of-Stream - } catch (final Throwable t) { - terminateDueTo(t); // If `next` or `hasNext` throws (they can, since it is user-provided), we need to treat the stream as errored as per rule 1.4 - return; - } finally { - subscriber.onNext(nextElementToBeSent); // Then we signal the next element downstream to the `Subscriber` - } - nextElementToBeSent = next; // The next element is the actually read element - if (!hasNext) { // If we are at End-of-Stream - doCancel(); // We need to consider this `Subscription` as cancelled as per rule 1.6 - subscriber.onComplete(); // Then we signal `onComplete` as per rule 1.2 and 1.5 - } - } while (!cancelled // This makes sure that rule 1.8 is upheld, i.e. we need to stop signalling "eventually" - && --leftInBatch > 0 // This makes sure that we only send `batchSize` number of elements in one go (so we can yield to other Runnables) - && --demand > 0); // This makes sure that rule 1.1 is upheld (sending more than was demanded) - - if (!cancelled && demand > 0) { // If the `Subscription` is still alive and well, and we have demand to satisfy, we signal ourselves to send more data - signal(Send.Instance); - } - } catch (final Throwable t) { - // We can only getNextResult here if `onNext` or `onComplete` threw, and they are not allowed to according to 2.13, so we can only cancel and log here. - doCancel(); // Make sure that we are cancelled, since we cannot do anything else since the `Subscriber` is faulty. - (new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onNext or onComplete.", t)).printStackTrace(System.err); - } - } - - // This is a helper method to ensure that we always `cancel` when we signal `onError` as per rule 1.6 - private void terminateDueTo(final Throwable t) { - cancelled = true; // When we signal onError, the subscription must be considered as cancelled, as per rule 1.6 - try { - subscriber.onError(t); // Then we signal the error downstream, to the `Subscriber` - } catch (final Throwable t2) { // If `onError` throws an exception, this is a spec violation according to rule 1.9, and all we can do is to log it. - (new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 by throwing an exception from onError.", t2)).printStackTrace(System.err); - } - } - - // What `signal` does is that it sends signals to the `Subscription` asynchronously - private void signal(final Signal signal) { - if (inboundSignals.offer(signal)) { // No need to null-check here as ConcurrentLinkedQueue does this for us - tryScheduleToExecute(); // Then we try to schedule it for execution, if it isn't already - } - } - - // This is the main "event loop" if you so will - @Override - public final void run() { - if (on.get()) { // establishes a happens-before relationship with the end of the previous run - try { - final Signal s = inboundSignals.poll(); // We take a signal off the queue - if (!cancelled) { // to make sure that we follow rule 1.8, 3.6 and 3.7 - // Below we simply unpack the `Signal`s and invoke the corresponding methods - if (s instanceof Request) { - doRequest(((Request) s).n); - } else if (s == Send.Instance) { - doSend(); - } else if (s == Cancel.Instance) { - doCancel(); - } else if (s == Subscribe.Instance) { - doSubscribe(); - } - } - } finally { - on.set(false); // establishes a happens-before relationship with the beginning of the next run - if (!inboundSignals.isEmpty()) { // If we still have signals to process - tryScheduleToExecute(); // Then we try to schedule ourselves to execute again - } - } - } - } - - // This method makes sure that this `Subscription` is only running on one Thread at a time, - // this is important to make sure that we follow rule 1.3 - private final void tryScheduleToExecute() { - if (on.compareAndSet(false, true)) { - try { - executor.execute(this); - } catch (Throwable t) { // If we can't run on the `Executor`, we need to fail gracefully - if (!cancelled) { - doCancel(); // First of all, this failure is not recoverable, so we need to follow rule 1.4 and 1.6 - try { - terminateDueTo(new IllegalStateException("Publisher terminated due to unavailable Executor.", t)); - } finally { - inboundSignals.clear(); // We're not going to need these anymore - // This subscription is cancelled by now, but letting it become schedulable again means - // that we can drain the inboundSignals queue if anything arrives after clearing - on.set(false); - } - } - } - } - } - - // Our implementation of `Subscription.request` sends a signal to the Subscription that more elements are in demand - @Override - public void request(final long n) { - signal(new Request(n)); - } - - // Our implementation of `Subscription.cancel` sends a signal to the Subscription that the `Subscriber` is not interested in any more elements - @Override - public void cancel() { - signal(Cancel.Instance); - } - - // The reason for the `executeQuery` method is that we want to ensure the `SubscriptionImpl` - // is completely constructed before it is exposed to the thread pool, therefor this - // method is only intended to be invoked once, and immediately after the constructor has - // finished. - void init() { - signal(Subscribe.Instance); - } - } -} \ No newline at end of file diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/SupplierPublisher.java b/examples/src/main/java/org/reactivestreams/example/unicast/SupplierPublisher.java new file mode 100644 index 00000000..fd07cccc --- /dev/null +++ b/examples/src/main/java/org/reactivestreams/example/unicast/SupplierPublisher.java @@ -0,0 +1,46 @@ +/************************************************************************ +* Licensed under Public Domain (CC0) * +* * +* To the extent possible under law, the person who associated CC0 with * +* this code has waived all copyright and related or neighboring * +* rights to this code. * +* * +* You should have received a copy of the CC0 legalcode along with this * +* work. If not, see .* +************************************************************************/ + +package org.reactivestreams.example.unicast; + +import java.util.Collections; +import java.util.Iterator; +import java.util.concurrent.Executor; + +/** + * SupplierPublisher uses a supplier with only one 'get()' method for supplying values + * from unknown streams like files or sockets. + */ +public class SupplierPublisher extends AsyncIterablePublisher { + public SupplierPublisher(final Supplier supplier, final Executor executor) { + super(new Iterable() { + @Override public Iterator iterator() { + return new Iterator() { + private T elem = supplier.get(); + @Override public boolean hasNext() { return elem != null; } + @Override public T next() { + if (!hasNext()) return Collections.emptyList().iterator().next(); + else { + T prev = elem; + elem = supplier.get(); + return prev; + } + } + @Override public void remove() { throw new UnsupportedOperationException(); } + }; + } + }, executor); + } + + static interface Supplier { + T get(); + } +} \ No newline at end of file diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncStreamPublisherTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/AsyncStreamPublisherTest.java deleted file mode 100644 index dc68a488..00000000 --- a/examples/src/test/java/org/reactivestreams/example/unicast/AsyncStreamPublisherTest.java +++ /dev/null @@ -1,74 +0,0 @@ -/************************************************************************ -* Licensed under Public Domain (CC0) * -* * -* To the extent possible under law, the person who associated CC0 with * -* this code has waived all copyright and related or neighboring * -* rights to this code. * -* * -* You should have received a copy of the CC0 legalcode along with this * -* work. If not, see .* -************************************************************************/ - -package org.reactivestreams.example.unicast; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; - -import org.reactivestreams.Publisher; -import org.reactivestreams.tck.PublisherVerification; -import org.reactivestreams.tck.TestEnvironment; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import static org.reactivestreams.example.unicast.AsyncStreamPublisher.StreamSupplier; - -@Test // Must be here for TestNG to find and run this, do not remove -public class AsyncStreamPublisherTest extends PublisherVerification { - - private ExecutorService e; - - @BeforeClass - void before() { - e = Executors.newFixedThreadPool(4); - } - - @AfterClass - void after() { - if (e != null) { - e.shutdown(); - } - } - - public AsyncStreamPublisherTest() { - super(new TestEnvironment()); - } - - @SuppressWarnings("unchecked") - @Override - public Publisher createPublisher(final long elements) { - assert (elements <= maxElementsFromPublisher()); - return new AsyncStreamPublisher(new StreamSupplier() { - private int at; - @Override - public Integer get() { - return at < elements ? at++ : null; - } - }, e); - } - - @Override - public Publisher createFailedPublisher() { - return new AsyncStreamPublisher(new StreamSupplier() { - @Override - public Integer get() { - throw new RuntimeException("Error state signal!"); - } - }, e); - } - - @Override - public long maxElementsFromPublisher() { - return Integer.MAX_VALUE; - } -} \ No newline at end of file diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/SupplierPublisherTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/SupplierPublisherTest.java new file mode 100644 index 00000000..f6438271 --- /dev/null +++ b/examples/src/test/java/org/reactivestreams/example/unicast/SupplierPublisherTest.java @@ -0,0 +1,59 @@ +/************************************************************************ +* Licensed under Public Domain (CC0) * +* * +* To the extent possible under law, the person who associated CC0 with * +* this code has waived all copyright and related or neighboring * +* rights to this code. * +* * +* You should have received a copy of the CC0 legalcode along with this * +* work. If not, see .* +************************************************************************/ + +package org.reactivestreams.example.unicast; + +import java.util.Iterator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.reactivestreams.Publisher; +import org.reactivestreams.tck.PublisherVerification; +import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test // Must be here for TestNG to find and run this, do not remove +public class SupplierPublisherTest extends PublisherVerification { + + private ExecutorService e; + @BeforeClass void before() { e = Executors.newFixedThreadPool(4); } + @AfterClass void after() { if (e != null) e.shutdown(); } + + public SupplierPublisherTest() { + super(new TestEnvironment()); + } + + @SuppressWarnings("unchecked") + @Override public Publisher createPublisher(final long elements) { + assert(elements <= maxElementsFromPublisher()); + return new SupplierPublisher(new SupplierPublisher.Supplier() { + private int at; + @Override + public Integer get() { + return at < elements ? at++ : null; + } + }, e); + } + + @Override public Publisher createFailedPublisher() { + return new AsyncIterablePublisher(new Iterable() { + @Override public Iterator iterator() { + throw new RuntimeException("Error state signal!"); + } + }, e); + } + + @Override public long maxElementsFromPublisher() { + return Integer.MAX_VALUE; + } +} From 9725ef2e4705a5a5a90468f4c3ab3177700a0213 Mon Sep 17 00:00:00 2001 From: Tomas Tulka Date: Mon, 12 Jun 2017 21:03:38 +0200 Subject: [PATCH 7/7] comment about end of stream added --- .../org/reactivestreams/example/unicast/SupplierPublisher.java | 1 + 1 file changed, 1 insertion(+) diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/SupplierPublisher.java b/examples/src/main/java/org/reactivestreams/example/unicast/SupplierPublisher.java index fd07cccc..9cfe2da6 100644 --- a/examples/src/main/java/org/reactivestreams/example/unicast/SupplierPublisher.java +++ b/examples/src/main/java/org/reactivestreams/example/unicast/SupplierPublisher.java @@ -18,6 +18,7 @@ /** * SupplierPublisher uses a supplier with only one 'get()' method for supplying values * from unknown streams like files or sockets. + * The supplier informs about the end of stream by returning a 'null' value. */ public class SupplierPublisher extends AsyncIterablePublisher { public SupplierPublisher(final Supplier supplier, final Executor executor) {