From f167bed41e79d1cd58e7b7fa51e1f2f6209a8e63 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 9 May 2022 21:44:38 +0200 Subject: [PATCH 1/4] Clean-room RS <-> Flow adapters --- mutiny-zero-flow-adapters/pom.xml | 30 ++++++ .../src/main/java/module-info.java | 4 + .../zero/flow/adapters/AdaptersToFlow.java | 55 +++++++++++ .../adapters/AdaptersToReactiveStreams.java | 56 +++++++++++ .../zero/flow/adapters/common/Wrapper.java | 6 ++ .../toflow/ProcessorAdapterFromRs.java | 47 +++++++++ .../toflow/PublisherAdapterFromRs.java | 27 ++++++ .../toflow/SubscriberAdapterFromRs.java | 42 ++++++++ .../toflow/SubscriptionAdapterFromRs.java | 31 ++++++ .../tors/ProcessorAdapterFromFlow.java | 49 ++++++++++ .../tors/PublisherAdapterFromFlow.java | 28 ++++++ .../tors/SubscriberAdapterFromFlow.java | 43 +++++++++ .../tors/SubscriptionAdapterFromFlow.java | 31 ++++++ .../zero/flow/adapters/AdaptersSmokeTest.java | 96 +++++++++++++++++++ mutiny-zero/pom.xml | 1 - pom.xml | 1 + 16 files changed, 546 insertions(+), 1 deletion(-) create mode 100644 mutiny-zero-flow-adapters/pom.xml create mode 100644 mutiny-zero-flow-adapters/src/main/java/module-info.java create mode 100644 mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/AdaptersToFlow.java create mode 100644 mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/AdaptersToReactiveStreams.java create mode 100644 mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/common/Wrapper.java create mode 100644 mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/toflow/ProcessorAdapterFromRs.java create mode 100644 mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/toflow/PublisherAdapterFromRs.java create mode 100644 mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/toflow/SubscriberAdapterFromRs.java create mode 100644 mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/toflow/SubscriptionAdapterFromRs.java create mode 100644 mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/tors/ProcessorAdapterFromFlow.java create mode 100644 mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/tors/PublisherAdapterFromFlow.java create mode 100644 mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/tors/SubscriberAdapterFromFlow.java create mode 100644 mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/tors/SubscriptionAdapterFromFlow.java create mode 100644 mutiny-zero-flow-adapters/src/test/java/mutiny/zero/flow/adapters/AdaptersSmokeTest.java diff --git a/mutiny-zero-flow-adapters/pom.xml b/mutiny-zero-flow-adapters/pom.xml new file mode 100644 index 0000000..a5e8fd7 --- /dev/null +++ b/mutiny-zero-flow-adapters/pom.xml @@ -0,0 +1,30 @@ + + + 4.0.0 + + + mutiny-zero-parent + io.smallrye.reactive + 0.3.0-SNAPSHOT + + + rs-flow-adapters + SmallRye Mutiny Zero JDK Flow / Reactive Streams Adapters + jar + Adapters from/to JDK Flow and Reactive Streams + + + + org.reactivestreams + reactive-streams + + + org.junit.jupiter + junit-jupiter + test + + + + \ No newline at end of file diff --git a/mutiny-zero-flow-adapters/src/main/java/module-info.java b/mutiny-zero-flow-adapters/src/main/java/module-info.java new file mode 100644 index 0000000..40f7b81 --- /dev/null +++ b/mutiny-zero-flow-adapters/src/main/java/module-info.java @@ -0,0 +1,4 @@ +module mutiny.zero.flow.adapters { + requires transitive org.reactivestreams; + exports mutiny.zero.flow.adapters; +} \ No newline at end of file diff --git a/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/AdaptersToFlow.java b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/AdaptersToFlow.java new file mode 100644 index 0000000..1dfc69b --- /dev/null +++ b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/AdaptersToFlow.java @@ -0,0 +1,55 @@ +package mutiny.zero.flow.adapters; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.Flow; + +import org.reactivestreams.Processor; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import mutiny.zero.flow.adapters.common.Wrapper; +import mutiny.zero.flow.adapters.toflow.ProcessorAdapterFromRs; +import mutiny.zero.flow.adapters.toflow.PublisherAdapterFromRs; +import mutiny.zero.flow.adapters.toflow.SubscriberAdapterFromRs; +import mutiny.zero.flow.adapters.toflow.SubscriptionAdapterFromRs; + +@SuppressWarnings("unchecked") +public interface AdaptersToFlow { + + static Flow.Publisher publisher(Publisher publisher) { + requireNonNull(publisher, "The publisher must not be null"); + if (publisher instanceof Wrapper) { + return (Flow.Publisher) ((Wrapper) publisher).unwrap(); + } else { + return new PublisherAdapterFromRs<>(publisher); + } + } + + static Flow.Subscriber subscriber(Subscriber subscriber) { + requireNonNull(subscriber, "The subscriber must not be null"); + if (subscriber instanceof Wrapper) { + return (Flow.Subscriber) ((Wrapper) subscriber).unwrap(); + } else { + return new SubscriberAdapterFromRs<>(subscriber); + } + } + + static Flow.Subscription subscription(Subscription subscription) { + requireNonNull(subscription, "The subscription must not be null"); + if (subscription instanceof Wrapper) { + return (Flow.Subscription) ((Wrapper) subscription).unwrap(); + } else { + return new SubscriptionAdapterFromRs(subscription); + } + } + + static Flow.Processor processor(Processor processor) { + requireNonNull(processor, "The processor must not be null"); + if (processor instanceof Wrapper) { + return (Flow.Processor) ((Wrapper) processor).unwrap(); + } + return new ProcessorAdapterFromRs<>(processor); + } +} diff --git a/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/AdaptersToReactiveStreams.java b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/AdaptersToReactiveStreams.java new file mode 100644 index 0000000..29f95e4 --- /dev/null +++ b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/AdaptersToReactiveStreams.java @@ -0,0 +1,56 @@ +package mutiny.zero.flow.adapters; + +import static java.util.Objects.requireNonNull; + +import java.util.concurrent.Flow; + +import org.reactivestreams.Processor; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import mutiny.zero.flow.adapters.common.Wrapper; +import mutiny.zero.flow.adapters.tors.ProcessorAdapterFromFlow; +import mutiny.zero.flow.adapters.tors.PublisherAdapterFromFlow; +import mutiny.zero.flow.adapters.tors.SubscriberAdapterFromFlow; +import mutiny.zero.flow.adapters.tors.SubscriptionAdapterFromFlow; + +@SuppressWarnings("unchecked") +public interface AdaptersToReactiveStreams { + + static Publisher publisher(Flow.Publisher publisher) { + requireNonNull(publisher, "The publisher must not be null"); + if (publisher instanceof Wrapper) { + return (Publisher) ((Wrapper) publisher).unwrap(); + } else { + return new PublisherAdapterFromFlow<>(publisher); + } + } + + static Subscriber subscriber(Flow.Subscriber subscriber) { + requireNonNull(subscriber, "The subscriber must not be null"); + if (subscriber instanceof Wrapper) { + return (Subscriber) ((Wrapper) subscriber).unwrap(); + } else { + return new SubscriberAdapterFromFlow<>(subscriber); + } + } + + static Subscription subscription(Flow.Subscription subscription) { + requireNonNull(subscription, "The subscription must not be null"); + if (subscription instanceof Wrapper) { + return (Subscription) ((Wrapper) subscription).unwrap(); + } else { + return new SubscriptionAdapterFromFlow(subscription); + } + } + + static Processor processor(Flow.Processor processor) { + requireNonNull(processor, "The processor must not be null"); + if (processor instanceof Wrapper) { + return (Processor) ((Wrapper) processor).unwrap(); + } else { + return new ProcessorAdapterFromFlow<>(processor); + } + } +} diff --git a/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/common/Wrapper.java b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/common/Wrapper.java new file mode 100644 index 0000000..dfd5c0a --- /dev/null +++ b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/common/Wrapper.java @@ -0,0 +1,6 @@ +package mutiny.zero.flow.adapters.common; + +public interface Wrapper { + + T unwrap(); +} diff --git a/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/toflow/ProcessorAdapterFromRs.java b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/toflow/ProcessorAdapterFromRs.java new file mode 100644 index 0000000..901e3ab --- /dev/null +++ b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/toflow/ProcessorAdapterFromRs.java @@ -0,0 +1,47 @@ +package mutiny.zero.flow.adapters.toflow; + +import java.util.concurrent.Flow; + +import org.reactivestreams.Processor; + +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; +import mutiny.zero.flow.adapters.common.Wrapper; + +public class ProcessorAdapterFromRs implements Flow.Processor, Wrapper> { + + private final Processor processor; + + public ProcessorAdapterFromRs(Processor processor) { + this.processor = processor; + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + processor.subscribe(AdaptersToReactiveStreams.subscriber(subscriber)); + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + processor.onSubscribe(AdaptersToReactiveStreams.subscription(subscription)); + } + + @Override + public void onNext(T item) { + processor.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + processor.onError(throwable); + } + + @Override + public void onComplete() { + processor.onComplete(); + } + + @Override + public Processor unwrap() { + return processor; + } +} diff --git a/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/toflow/PublisherAdapterFromRs.java b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/toflow/PublisherAdapterFromRs.java new file mode 100644 index 0000000..422f02c --- /dev/null +++ b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/toflow/PublisherAdapterFromRs.java @@ -0,0 +1,27 @@ +package mutiny.zero.flow.adapters.toflow; + +import java.util.concurrent.Flow; + +import org.reactivestreams.Publisher; + +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; +import mutiny.zero.flow.adapters.common.Wrapper; + +public class PublisherAdapterFromRs implements Flow.Publisher, Wrapper> { + + private final Publisher publisher; + + public PublisherAdapterFromRs(Publisher publisher) { + this.publisher = publisher; + } + + @Override + public void subscribe(Flow.Subscriber subscriber) { + publisher.subscribe(AdaptersToReactiveStreams.subscriber(subscriber)); + } + + @Override + public Publisher unwrap() { + return publisher; + } +} diff --git a/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/toflow/SubscriberAdapterFromRs.java b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/toflow/SubscriberAdapterFromRs.java new file mode 100644 index 0000000..1566c1a --- /dev/null +++ b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/toflow/SubscriberAdapterFromRs.java @@ -0,0 +1,42 @@ +package mutiny.zero.flow.adapters.toflow; + +import java.util.concurrent.Flow; + +import org.reactivestreams.Subscriber; + +import mutiny.zero.flow.adapters.AdaptersToReactiveStreams; +import mutiny.zero.flow.adapters.common.Wrapper; + +public class SubscriberAdapterFromRs implements Flow.Subscriber, Wrapper> { + + private final Subscriber subscriber; + + public SubscriberAdapterFromRs(Subscriber subscriber) { + this.subscriber = subscriber; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + subscriber.onSubscribe(AdaptersToReactiveStreams.subscription(subscription)); + } + + @Override + public void onNext(T item) { + subscriber.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + subscriber.onError(throwable); + } + + @Override + public void onComplete() { + subscriber.onComplete(); + } + + @Override + public Subscriber unwrap() { + return subscriber; + } +} diff --git a/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/toflow/SubscriptionAdapterFromRs.java b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/toflow/SubscriptionAdapterFromRs.java new file mode 100644 index 0000000..7028eac --- /dev/null +++ b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/toflow/SubscriptionAdapterFromRs.java @@ -0,0 +1,31 @@ +package mutiny.zero.flow.adapters.toflow; + +import java.util.concurrent.Flow; + +import org.reactivestreams.Subscription; + +import mutiny.zero.flow.adapters.common.Wrapper; + +public class SubscriptionAdapterFromRs implements Flow.Subscription, Wrapper { + + private final Subscription subscription; + + public SubscriptionAdapterFromRs(Subscription subscription) { + this.subscription = subscription; + } + + @Override + public void request(long n) { + subscription.request(n); + } + + @Override + public void cancel() { + subscription.cancel(); + } + + @Override + public Subscription unwrap() { + return subscription; + } +} diff --git a/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/tors/ProcessorAdapterFromFlow.java b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/tors/ProcessorAdapterFromFlow.java new file mode 100644 index 0000000..e1e199a --- /dev/null +++ b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/tors/ProcessorAdapterFromFlow.java @@ -0,0 +1,49 @@ +package mutiny.zero.flow.adapters.tors; + +import java.util.concurrent.Flow; + +import org.reactivestreams.Processor; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import mutiny.zero.flow.adapters.AdaptersToFlow; +import mutiny.zero.flow.adapters.common.Wrapper; + +public class ProcessorAdapterFromFlow implements Processor, Wrapper> { + + private final Flow.Processor processor; + + public ProcessorAdapterFromFlow(Flow.Processor processor) { + this.processor = processor; + } + + @Override + public void subscribe(Subscriber subscriber) { + processor.subscribe(AdaptersToFlow.subscriber(subscriber)); + } + + @Override + public void onSubscribe(Subscription subscription) { + processor.onSubscribe(AdaptersToFlow.subscription(subscription)); + } + + @Override + public void onNext(T item) { + processor.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + processor.onError(throwable); + } + + @Override + public void onComplete() { + processor.onComplete(); + } + + @Override + public Flow.Processor unwrap() { + return processor; + } +} diff --git a/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/tors/PublisherAdapterFromFlow.java b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/tors/PublisherAdapterFromFlow.java new file mode 100644 index 0000000..92d9e15 --- /dev/null +++ b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/tors/PublisherAdapterFromFlow.java @@ -0,0 +1,28 @@ +package mutiny.zero.flow.adapters.tors; + +import java.util.concurrent.Flow; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; + +import mutiny.zero.flow.adapters.AdaptersToFlow; +import mutiny.zero.flow.adapters.common.Wrapper; + +public class PublisherAdapterFromFlow implements Publisher, Wrapper> { + + private final Flow.Publisher publisher; + + public PublisherAdapterFromFlow(Flow.Publisher publisher) { + this.publisher = publisher; + } + + @Override + public void subscribe(Subscriber subscriber) { + publisher.subscribe(AdaptersToFlow.subscriber(subscriber)); + } + + @Override + public Flow.Publisher unwrap() { + return publisher; + } +} diff --git a/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/tors/SubscriberAdapterFromFlow.java b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/tors/SubscriberAdapterFromFlow.java new file mode 100644 index 0000000..be6b443 --- /dev/null +++ b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/tors/SubscriberAdapterFromFlow.java @@ -0,0 +1,43 @@ +package mutiny.zero.flow.adapters.tors; + +import java.util.concurrent.Flow; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +import mutiny.zero.flow.adapters.AdaptersToFlow; +import mutiny.zero.flow.adapters.common.Wrapper; + +public class SubscriberAdapterFromFlow implements Subscriber, Wrapper> { + + private final Flow.Subscriber subscriber; + + public SubscriberAdapterFromFlow(Flow.Subscriber subscriber) { + this.subscriber = subscriber; + } + + @Override + public void onSubscribe(Subscription subscription) { + subscriber.onSubscribe(AdaptersToFlow.subscription(subscription)); + } + + @Override + public void onNext(T item) { + subscriber.onNext(item); + } + + @Override + public void onError(Throwable throwable) { + subscriber.onError(throwable); + } + + @Override + public void onComplete() { + subscriber.onComplete(); + } + + @Override + public Flow.Subscriber unwrap() { + return subscriber; + } +} diff --git a/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/tors/SubscriptionAdapterFromFlow.java b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/tors/SubscriptionAdapterFromFlow.java new file mode 100644 index 0000000..1fa7541 --- /dev/null +++ b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/tors/SubscriptionAdapterFromFlow.java @@ -0,0 +1,31 @@ +package mutiny.zero.flow.adapters.tors; + +import java.util.concurrent.Flow; + +import org.reactivestreams.Subscription; + +import mutiny.zero.flow.adapters.common.Wrapper; + +public class SubscriptionAdapterFromFlow implements Subscription, Wrapper { + + private final Flow.Subscription subscription; + + public SubscriptionAdapterFromFlow(Flow.Subscription subscription) { + this.subscription = subscription; + } + + @Override + public void request(long n) { + subscription.request(n); + } + + @Override + public void cancel() { + subscription.cancel(); + } + + @Override + public Flow.Subscription unwrap() { + return subscription; + } +} diff --git a/mutiny-zero-flow-adapters/src/test/java/mutiny/zero/flow/adapters/AdaptersSmokeTest.java b/mutiny-zero-flow-adapters/src/test/java/mutiny/zero/flow/adapters/AdaptersSmokeTest.java new file mode 100644 index 0000000..e9ced9d --- /dev/null +++ b/mutiny-zero-flow-adapters/src/test/java/mutiny/zero/flow/adapters/AdaptersSmokeTest.java @@ -0,0 +1,96 @@ +package mutiny.zero.flow.adapters; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Flow; +import java.util.concurrent.SubmissionPublisher; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +class AdaptersSmokeTest { + + @Test + void rejectNulls() { + assertThrows(NullPointerException.class, () -> AdaptersToFlow.publisher(null)); + assertThrows(NullPointerException.class, () -> AdaptersToFlow.subscriber(null)); + assertThrows(NullPointerException.class, () -> AdaptersToFlow.processor(null)); + assertThrows(NullPointerException.class, () -> AdaptersToFlow.subscription(null)); + + assertThrows(NullPointerException.class, () -> AdaptersToReactiveStreams.publisher(null)); + assertThrows(NullPointerException.class, () -> AdaptersToReactiveStreams.subscriber(null)); + assertThrows(NullPointerException.class, () -> AdaptersToReactiveStreams.processor(null)); + assertThrows(NullPointerException.class, () -> AdaptersToReactiveStreams.subscription(null)); + } + + @Test + void flowToRs() throws InterruptedException { + SubmissionPublisher publisher = new SubmissionPublisher<>(); + + CountDownLatch latch = new CountDownLatch(1); + List items = new ArrayList<>(); + AtomicBoolean completed = new AtomicBoolean(); + AtomicReference failure = new AtomicReference<>(); + + Publisher rsPublisher = AdaptersToReactiveStreams.publisher(publisher); + rsPublisher.subscribe(new Subscriber() { + @Override + public void onSubscribe(Subscription s) { + s.request(Long.MAX_VALUE); + // Dirty, but works + publisher.submit("foo"); + publisher.submit("bar"); + publisher.submit("baz"); + publisher.close(); + } + + @Override + public void onNext(String s) { + items.add(s); + } + + @Override + public void onError(Throwable t) { + failure.set(t); + latch.countDown(); + } + + @Override + public void onComplete() { + completed.set(true); + latch.countDown(); + } + }); + + assertTrue(latch.await(5, TimeUnit.SECONDS)); + + assertTrue(completed.get()); + assertNull(failure.get()); + assertEquals(3, items.size()); + assertEquals("foo", items.get(0)); + assertEquals("bar", items.get(1)); + assertEquals("baz", items.get(2)); + } + + @Test + void avoidExcessiveWrapping() { + Publisher initial = new Publisher<>() { + @Override + public void subscribe(Subscriber s) { + // Nothing + } + }; + Flow.Publisher flowPublisher = AdaptersToFlow.publisher(initial); + Publisher rsPublisher = AdaptersToReactiveStreams.publisher(flowPublisher); + + assertSame(initial, rsPublisher); + } +} diff --git a/mutiny-zero/pom.xml b/mutiny-zero/pom.xml index 5019b58..f668847 100644 --- a/mutiny-zero/pom.xml +++ b/mutiny-zero/pom.xml @@ -11,7 +11,6 @@ mutiny-zero SmallRye Mutiny Zero - 0.3.0-SNAPSHOT jar Mutiny Zero is a minimal API for creating reactive-streams compliant publishers diff --git a/pom.xml b/pom.xml index bc22d0b..bbff695 100644 --- a/pom.xml +++ b/pom.xml @@ -19,6 +19,7 @@ mutiny-zero + mutiny-zero-flow-adapters 2021 From 917686ba1aafa0a598718a0570977a4e5da146c2 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 9 May 2022 22:05:55 +0200 Subject: [PATCH 2/4] Javadocs --- .../zero/flow/adapters/AdaptersToFlow.java | 31 +++++++++++++++++++ .../adapters/AdaptersToReactiveStreams.java | 31 +++++++++++++++++++ .../zero/flow/adapters/package-info.java | 10 ++++++ pom.xml | 8 ++--- 4 files changed, 76 insertions(+), 4 deletions(-) create mode 100644 mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/package-info.java diff --git a/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/AdaptersToFlow.java b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/AdaptersToFlow.java index 1dfc69b..5a49538 100644 --- a/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/AdaptersToFlow.java +++ b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/AdaptersToFlow.java @@ -15,9 +15,19 @@ import mutiny.zero.flow.adapters.toflow.SubscriberAdapterFromRs; import mutiny.zero.flow.adapters.toflow.SubscriptionAdapterFromRs; +/** + * Adapters from Reactive Streams types to {@link Flow} types. + */ @SuppressWarnings("unchecked") public interface AdaptersToFlow { + /** + * Convert a {@link Publisher} to a {@link Flow.Publisher}. + * + * @param publisher the publisher + * @param the items type + * @return the wrapped publisher + */ static Flow.Publisher publisher(Publisher publisher) { requireNonNull(publisher, "The publisher must not be null"); if (publisher instanceof Wrapper) { @@ -27,6 +37,13 @@ static Flow.Publisher publisher(Publisher publisher) { } } + /** + * Convert a {@link Subscriber} to a {@link Flow.Subscriber}. + * + * @param subscriber the subscriber + * @param the items type + * @return the wrapped subscriber + */ static Flow.Subscriber subscriber(Subscriber subscriber) { requireNonNull(subscriber, "The subscriber must not be null"); if (subscriber instanceof Wrapper) { @@ -36,6 +53,12 @@ static Flow.Subscriber subscriber(Subscriber subscriber) { } } + /** + * Convert a {@link Subscription} to a {@link Flow.Subscription}. + * + * @param subscription the subscription + * @return the wrapped subscription + */ static Flow.Subscription subscription(Subscription subscription) { requireNonNull(subscription, "The subscription must not be null"); if (subscription instanceof Wrapper) { @@ -45,6 +68,14 @@ static Flow.Subscription subscription(Subscription subscription) { } } + /** + * Convert a {@link Processor} to a {@link Flow.Processor}. + * + * @param processor the processor + * @param the items type + * @param the output items type + * @return the wrapped processor + */ static Flow.Processor processor(Processor processor) { requireNonNull(processor, "The processor must not be null"); if (processor instanceof Wrapper) { diff --git a/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/AdaptersToReactiveStreams.java b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/AdaptersToReactiveStreams.java index 29f95e4..1db7e62 100644 --- a/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/AdaptersToReactiveStreams.java +++ b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/AdaptersToReactiveStreams.java @@ -15,9 +15,19 @@ import mutiny.zero.flow.adapters.tors.SubscriberAdapterFromFlow; import mutiny.zero.flow.adapters.tors.SubscriptionAdapterFromFlow; +/** + * Adapters from {@link Flow} types to Reactive Streams types. + */ @SuppressWarnings("unchecked") public interface AdaptersToReactiveStreams { + /** + * Convert a {@link Flow.Publisher} to a {@link Publisher}. + * + * @param publisher the publisher + * @param the items type + * @return the wrapped publisher + */ static Publisher publisher(Flow.Publisher publisher) { requireNonNull(publisher, "The publisher must not be null"); if (publisher instanceof Wrapper) { @@ -27,6 +37,13 @@ static Publisher publisher(Flow.Publisher publisher) { } } + /** + * Convert a {@link Flow.Subscriber} to a {@link Subscriber}. + * + * @param subscriber the subscriber + * @param the items type + * @return the wrapped subscriber + */ static Subscriber subscriber(Flow.Subscriber subscriber) { requireNonNull(subscriber, "The subscriber must not be null"); if (subscriber instanceof Wrapper) { @@ -36,6 +53,12 @@ static Subscriber subscriber(Flow.Subscriber subscriber) { } } + /** + * Convert a {@link Flow.Subscription} to a {@link Subscription}. + * + * @param subscription the subscription + * @return the wrapped subscription + */ static Subscription subscription(Flow.Subscription subscription) { requireNonNull(subscription, "The subscription must not be null"); if (subscription instanceof Wrapper) { @@ -45,6 +68,14 @@ static Subscription subscription(Flow.Subscription subscription) { } } + /** + * Convert a {@link Flow.Processor} to a {@link Processor}. + * + * @param processor the processor + * @param the items type + * @param the output items type + * @return the wrapped processor + */ static Processor processor(Flow.Processor processor) { requireNonNull(processor, "The processor must not be null"); if (processor instanceof Wrapper) { diff --git a/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/package-info.java b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/package-info.java new file mode 100644 index 0000000..94afb4e --- /dev/null +++ b/mutiny-zero-flow-adapters/src/main/java/mutiny/zero/flow/adapters/package-info.java @@ -0,0 +1,10 @@ +/** + * A set of adapters from Reactive Streams to/from {@link java.util.concurrent.Flow}. + *

+ * The methods found in {@link mutiny.zero.flow.adapters.AdaptersToFlow} and + * {@link mutiny.zero.flow.adapters.AdaptersToReactiveStreams} + * avoid excessive wrapping when possible. + * For instance wrapping a flow publisher {@code p1} to a reactive streams publisher {@code p2} and back to a flow + * publisher {@code p3 } yields the original flow publisher where {@code p1 == p3}. + */ +package mutiny.zero.flow.adapters; \ No newline at end of file diff --git a/pom.xml b/pom.xml index bbff695..2a2aee3 100644 --- a/pom.xml +++ b/pom.xml @@ -229,9 +229,9 @@ - [9,) + [11,) - java-9+ + java-11+ @@ -247,8 +247,8 @@ - 8 - 8 + 11 + 11 false From 59afc32a0e2da4ac2f513551af9b804f6104a6a8 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Mon, 9 May 2022 22:08:14 +0200 Subject: [PATCH 3/4] RevAPI configuration --- mutiny-zero-flow-adapters/revapi.json | 46 +++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) create mode 100644 mutiny-zero-flow-adapters/revapi.json diff --git a/mutiny-zero-flow-adapters/revapi.json b/mutiny-zero-flow-adapters/revapi.json new file mode 100644 index 0000000..000ec75 --- /dev/null +++ b/mutiny-zero-flow-adapters/revapi.json @@ -0,0 +1,46 @@ +[ + { + "extension": "revapi.java", + "id": "java", + "configuration": { + "missing-classes": { + "behavior": "report", + "ignoreMissingAnnotations": false + } + } + }, + { + "extension": "revapi.filter", + "configuration": { + "elements": { + "include": [ + { + "matcher": "java-package", + "match": "mutiny.zero.flow.adapters" + } + ] + } + } + }, + { + "extension": "revapi.differences", + "id": "breaking-changes", + "configuration": { + "criticality": "highlight", + "minSeverity": "POTENTIALLY_BREAKING", + "minCriticality": "documented", + "differences": [] + } + }, + { + "extension": "revapi.reporter.json", + "configuration": { + "minSeverity": "POTENTIALLY_BREAKING", + "minCriticality": "documented", + "output": "target/compatibility.json", + "indent": true, + "append": false, + "keepEmptyFile": true + } + } +] \ No newline at end of file From 4b458fc2554335887996f7230f550f37196cd536 Mon Sep 17 00:00:00 2001 From: Julien Ponge Date: Tue, 10 May 2022 18:50:01 +0200 Subject: [PATCH 4/4] Documentation for the adapters --- docs/flow-adapters.md | 27 +++++++++++++++++++++++++++ docs/quick-start.md | 6 +++--- mkdocs.yml | 3 ++- 3 files changed, 32 insertions(+), 4 deletions(-) create mode 100644 docs/flow-adapters.md diff --git a/docs/flow-adapters.md b/docs/flow-adapters.md new file mode 100644 index 0000000..7f2a772 --- /dev/null +++ b/docs/flow-adapters.md @@ -0,0 +1,27 @@ +# JDK Flow from/to Reactive Streams Adapters + +The `rs-flow-adapters` library (Maven coordinates `io.smallrye.reactive:rs-flow-adapters`) provides a *clean room* implementation of adapters from and to the [JDK Flow interfaces](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.html) that match those from Reactive Streams. + +## Why another adapter library? + +The implementation of our adapters is similar in spirit to [those from the Reactive Streams library](https://github.com/reactive-streams/reactive-streams-jvm/tree/master/api/src/main/java9/org/reactivestreams), but they differ by: + +- failing early rather than passing `null` through in some cases, +- shipping under a [proper open source license](https://www.apache.org/licenses/LICENSE-2.0) while the Reactive Streams library hasn't made any progress towards publishing a new release, see [#536](https://github.com/reactive-streams/reactive-streams-jvm/issues/536) and [#530](https://github.com/reactive-streams/reactive-streams-jvm/issues/530) +- having correct JPMS (Java modules) descriptors for those who might need modules rather than the classpath. + +## How to use it? + +The public API exposes 2 types: + +- `AdaptersToFlow` to convert Reactive Streams types to `Flow` types, and +- `AdaptersToReactiveStreams` to convert `Flow` types to Reactive Streams types. + +Each type offers factory methods to convert from one type to the other. +For instance here's how you can convert from a Reactive Streams `Publisher` to a `Flow.Publisher`: + +```java +Publisher rsPublisher = connect("foo"); // ... where 'connect' returns a Publisher + +Flow.Publisher flowPublisher = AdaptersToFlow.publisher(rsPublisher); +``` diff --git a/docs/quick-start.md b/docs/quick-start.md index ed7b101..9c9d300 100644 --- a/docs/quick-start.md +++ b/docs/quick-start.md @@ -20,7 +20,7 @@ Your main entry point in the Mutiny Zero API is the `mutiny.zero.ZeroPublisher` If you already know the values to be emitted (or a failure), then you can use following factory methods: ```java linenums="1" ---8<-- "src/test/java/docsamples/FromKnownValues.java" +--8<-- "mutiny-zero/src/test/java/docsamples/FromKnownValues.java" ``` ### Creating from `CompletionStage` @@ -30,7 +30,7 @@ If you already know the values to be emitted (or a failure), then you can use fo Mutiny Zero can create a `Publisher` from a `CompletionStage` that emits exactly 1 item or a failure, then a completion signal: ```java linenums="1" ---8<-- "src/test/java/docsamples/FromCompletionStage.java" +--8<-- "mutiny-zero/src/test/java/docsamples/FromCompletionStage.java" ``` ### Creating using the general-purpose `Tube` API @@ -44,7 +44,7 @@ A `Tube` is a good abstraction if you want to pass events from an existing async Here is a not so fictional example where `SampleAsyncSource` (an asynchronous I/O API) has to be adapted to a `Publisher`: ```java linenums="1" ---8<-- "src/test/java/docsamples/FromTube.java" +--8<-- "mutiny-zero/src/test/java/docsamples/FromTube.java" ``` Since `SampleAsyncSource` does not support reactive streams but can be paused and resumed, the `Tube` API is used not just to send items but also to control `SampleAsyncSource`. diff --git a/mkdocs.yml b/mkdocs.yml index 673b2bb..1232e9f 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -4,8 +4,9 @@ edit_uri: edit/main/docs/ nav: - Overview: 'index.md' - - Quick start: 'quick-start.md' + - 'quick-start.md' - Javadoc: './apidocs/index.html' + - 'flow-adapters.md' copyright: >- Sponsored by Red Hat