diff --git a/examples/src/main/java/org/reactivestreams/example/unicast/SupplierPublisher.java b/examples/src/main/java/org/reactivestreams/example/unicast/SupplierPublisher.java new file mode 100644 index 00000000..9cfe2da6 --- /dev/null +++ b/examples/src/main/java/org/reactivestreams/example/unicast/SupplierPublisher.java @@ -0,0 +1,47 @@ +/************************************************************************ +* Licensed under Public Domain (CC0) * +* * +* To the extent possible under law, the person who associated CC0 with * +* this code has waived all copyright and related or neighboring * +* rights to this code. * +* * +* You should have received a copy of the CC0 legalcode along with this * +* work. If not, see .* +************************************************************************/ + +package org.reactivestreams.example.unicast; + +import java.util.Collections; +import java.util.Iterator; +import java.util.concurrent.Executor; + +/** + * SupplierPublisher uses a supplier with only one 'get()' method for supplying values + * from unknown streams like files or sockets. + * The supplier informs about the end of stream by returning a 'null' value. + */ +public class SupplierPublisher extends AsyncIterablePublisher { + public SupplierPublisher(final Supplier supplier, final Executor executor) { + super(new Iterable() { + @Override public Iterator iterator() { + return new Iterator() { + private T elem = supplier.get(); + @Override public boolean hasNext() { return elem != null; } + @Override public T next() { + if (!hasNext()) return Collections.emptyList().iterator().next(); + else { + T prev = elem; + elem = supplier.get(); + return prev; + } + } + @Override public void remove() { throw new UnsupportedOperationException(); } + }; + } + }, executor); + } + + static interface Supplier { + T get(); + } +} \ No newline at end of file diff --git a/examples/src/test/java/org/reactivestreams/example/unicast/SupplierPublisherTest.java b/examples/src/test/java/org/reactivestreams/example/unicast/SupplierPublisherTest.java new file mode 100644 index 00000000..f6438271 --- /dev/null +++ b/examples/src/test/java/org/reactivestreams/example/unicast/SupplierPublisherTest.java @@ -0,0 +1,59 @@ +/************************************************************************ +* Licensed under Public Domain (CC0) * +* * +* To the extent possible under law, the person who associated CC0 with * +* this code has waived all copyright and related or neighboring * +* rights to this code. * +* * +* You should have received a copy of the CC0 legalcode along with this * +* work. If not, see .* +************************************************************************/ + +package org.reactivestreams.example.unicast; + +import java.util.Iterator; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import org.reactivestreams.Publisher; +import org.reactivestreams.tck.PublisherVerification; +import org.reactivestreams.tck.TestEnvironment; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +@Test // Must be here for TestNG to find and run this, do not remove +public class SupplierPublisherTest extends PublisherVerification { + + private ExecutorService e; + @BeforeClass void before() { e = Executors.newFixedThreadPool(4); } + @AfterClass void after() { if (e != null) e.shutdown(); } + + public SupplierPublisherTest() { + super(new TestEnvironment()); + } + + @SuppressWarnings("unchecked") + @Override public Publisher createPublisher(final long elements) { + assert(elements <= maxElementsFromPublisher()); + return new SupplierPublisher(new SupplierPublisher.Supplier() { + private int at; + @Override + public Integer get() { + return at < elements ? at++ : null; + } + }, e); + } + + @Override public Publisher createFailedPublisher() { + return new AsyncIterablePublisher(new Iterable() { + @Override public Iterator iterator() { + throw new RuntimeException("Error state signal!"); + } + }, e); + } + + @Override public long maxElementsFromPublisher() { + return Integer.MAX_VALUE; + } +}