Skip to content

Commit

Permalink
Merge pull request #57 from smallrye/feature/flow-adapters
Browse files Browse the repository at this point in the history
Clean-room adapters
  • Loading branch information
jponge authored May 16, 2022
2 parents 79e2486 + 4b458fc commit 2cfe48f
Show file tree
Hide file tree
Showing 21 changed files with 700 additions and 9 deletions.
27 changes: 27 additions & 0 deletions docs/flow-adapters.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# JDK Flow from/to Reactive Streams Adapters

The `rs-flow-adapters` library (Maven coordinates `io.smallrye.reactive:rs-flow-adapters`) provides a *clean room* implementation of adapters from and to the [JDK Flow interfaces](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.html) that match those from Reactive Streams.

## Why another adapter library?

The implementation of our adapters is similar in spirit to [those from the Reactive Streams library](https://github.com/reactive-streams/reactive-streams-jvm/tree/master/api/src/main/java9/org/reactivestreams), but they differ by:

- failing early rather than passing `null` through in some cases,
- shipping under a [proper open source license](https://www.apache.org/licenses/LICENSE-2.0) while the Reactive Streams library hasn't made any progress towards publishing a new release, see [#536](https://github.com/reactive-streams/reactive-streams-jvm/issues/536) and [#530](https://github.com/reactive-streams/reactive-streams-jvm/issues/530)
- having correct JPMS (Java modules) descriptors for those who might need modules rather than the classpath.

## How to use it?

The public API exposes 2 types:

- `AdaptersToFlow` to convert Reactive Streams types to `Flow` types, and
- `AdaptersToReactiveStreams` to convert `Flow` types to Reactive Streams types.

Each type offers factory methods to convert from one type to the other.
For instance here's how you can convert from a Reactive Streams `Publisher` to a `Flow.Publisher`:

```java
Publisher<String> rsPublisher = connect("foo"); // ... where 'connect' returns a Publisher<String>

Flow.Publisher<String> flowPublisher = AdaptersToFlow.publisher(rsPublisher);
```
6 changes: 3 additions & 3 deletions docs/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Your main entry point in the Mutiny Zero API is the `mutiny.zero.ZeroPublisher`
If you already know the values to be emitted (or a failure), then you can use following factory methods:

```java linenums="1"
--8<-- "src/test/java/docsamples/FromKnownValues.java"
--8<-- "mutiny-zero/src/test/java/docsamples/FromKnownValues.java"
```

### Creating from `CompletionStage`
Expand All @@ -30,7 +30,7 @@ If you already know the values to be emitted (or a failure), then you can use fo
Mutiny Zero can create a `Publisher` from a `CompletionStage` that emits exactly 1 item or a failure, then a completion signal:

```java linenums="1"
--8<-- "src/test/java/docsamples/FromCompletionStage.java"
--8<-- "mutiny-zero/src/test/java/docsamples/FromCompletionStage.java"
```

### Creating using the general-purpose `Tube` API
Expand All @@ -44,7 +44,7 @@ A `Tube` is a good abstraction if you want to pass events from an existing async
Here is a not so fictional example where `SampleAsyncSource` (an asynchronous I/O API) has to be adapted to a `Publisher`:

```java linenums="1"
--8<-- "src/test/java/docsamples/FromTube.java"
--8<-- "mutiny-zero/src/test/java/docsamples/FromTube.java"
```

Since `SampleAsyncSource` does not support reactive streams but can be paused and resumed, the `Tube` API is used not just to send items but also to control `SampleAsyncSource`.
Expand Down
3 changes: 2 additions & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ edit_uri: edit/main/docs/

nav:
- Overview: 'index.md'
- Quick start: 'quick-start.md'
- 'quick-start.md'
- Javadoc: './apidocs/index.html'
- 'flow-adapters.md'

theme:
name: material
Expand Down
30 changes: 30 additions & 0 deletions mutiny-zero-flow-adapters/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>mutiny-zero-parent</artifactId>
<groupId>io.smallrye.reactive</groupId>
<version>0.3.0-SNAPSHOT</version>
</parent>

<artifactId>rs-flow-adapters</artifactId>
<name>SmallRye Mutiny Zero JDK Flow / Reactive Streams Adapters</name>
<packaging>jar</packaging>
<description>Adapters from/to JDK Flow and Reactive Streams</description>

<dependencies>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
46 changes: 46 additions & 0 deletions mutiny-zero-flow-adapters/revapi.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[
{
"extension": "revapi.java",
"id": "java",
"configuration": {
"missing-classes": {
"behavior": "report",
"ignoreMissingAnnotations": false
}
}
},
{
"extension": "revapi.filter",
"configuration": {
"elements": {
"include": [
{
"matcher": "java-package",
"match": "mutiny.zero.flow.adapters"
}
]
}
}
},
{
"extension": "revapi.differences",
"id": "breaking-changes",
"configuration": {
"criticality": "highlight",
"minSeverity": "POTENTIALLY_BREAKING",
"minCriticality": "documented",
"differences": []
}
},
{
"extension": "revapi.reporter.json",
"configuration": {
"minSeverity": "POTENTIALLY_BREAKING",
"minCriticality": "documented",
"output": "target/compatibility.json",
"indent": true,
"append": false,
"keepEmptyFile": true
}
}
]
4 changes: 4 additions & 0 deletions mutiny-zero-flow-adapters/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module mutiny.zero.flow.adapters {
requires transitive org.reactivestreams;
exports mutiny.zero.flow.adapters;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package mutiny.zero.flow.adapters;

import static java.util.Objects.requireNonNull;

import java.util.concurrent.Flow;

import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import mutiny.zero.flow.adapters.common.Wrapper;
import mutiny.zero.flow.adapters.toflow.ProcessorAdapterFromRs;
import mutiny.zero.flow.adapters.toflow.PublisherAdapterFromRs;
import mutiny.zero.flow.adapters.toflow.SubscriberAdapterFromRs;
import mutiny.zero.flow.adapters.toflow.SubscriptionAdapterFromRs;

/**
* Adapters from Reactive Streams types to {@link Flow} types.
*/
@SuppressWarnings("unchecked")
public interface AdaptersToFlow {

/**
* Convert a {@link Publisher} to a {@link Flow.Publisher}.
*
* @param publisher the publisher
* @param <T> the items type
* @return the wrapped publisher
*/
static <T> Flow.Publisher<T> publisher(Publisher<T> publisher) {
requireNonNull(publisher, "The publisher must not be null");
if (publisher instanceof Wrapper) {
return (Flow.Publisher<T>) ((Wrapper<?>) publisher).unwrap();
} else {
return new PublisherAdapterFromRs<>(publisher);
}
}

/**
* Convert a {@link Subscriber} to a {@link Flow.Subscriber}.
*
* @param subscriber the subscriber
* @param <T> the items type
* @return the wrapped subscriber
*/
static <T> Flow.Subscriber<T> subscriber(Subscriber<T> subscriber) {
requireNonNull(subscriber, "The subscriber must not be null");
if (subscriber instanceof Wrapper) {
return (Flow.Subscriber<T>) ((Wrapper<?>) subscriber).unwrap();
} else {
return new SubscriberAdapterFromRs<>(subscriber);
}
}

/**
* Convert a {@link Subscription} to a {@link Flow.Subscription}.
*
* @param subscription the subscription
* @return the wrapped subscription
*/
static Flow.Subscription subscription(Subscription subscription) {
requireNonNull(subscription, "The subscription must not be null");
if (subscription instanceof Wrapper) {
return (Flow.Subscription) ((Wrapper<?>) subscription).unwrap();
} else {
return new SubscriptionAdapterFromRs(subscription);
}
}

/**
* Convert a {@link Processor} to a {@link Flow.Processor}.
*
* @param processor the processor
* @param <T> the items type
* @param <R> the output items type
* @return the wrapped processor
*/
static <T, R> Flow.Processor<T, R> processor(Processor<T, R> processor) {
requireNonNull(processor, "The processor must not be null");
if (processor instanceof Wrapper) {
return (Flow.Processor<T, R>) ((Wrapper<?>) processor).unwrap();
}
return new ProcessorAdapterFromRs<>(processor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package mutiny.zero.flow.adapters;

import static java.util.Objects.requireNonNull;

import java.util.concurrent.Flow;

import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import mutiny.zero.flow.adapters.common.Wrapper;
import mutiny.zero.flow.adapters.tors.ProcessorAdapterFromFlow;
import mutiny.zero.flow.adapters.tors.PublisherAdapterFromFlow;
import mutiny.zero.flow.adapters.tors.SubscriberAdapterFromFlow;
import mutiny.zero.flow.adapters.tors.SubscriptionAdapterFromFlow;

/**
* Adapters from {@link Flow} types to Reactive Streams types.
*/
@SuppressWarnings("unchecked")
public interface AdaptersToReactiveStreams {

/**
* Convert a {@link Flow.Publisher} to a {@link Publisher}.
*
* @param publisher the publisher
* @param <T> the items type
* @return the wrapped publisher
*/
static <T> Publisher<T> publisher(Flow.Publisher<T> publisher) {
requireNonNull(publisher, "The publisher must not be null");
if (publisher instanceof Wrapper) {
return (Publisher<T>) ((Wrapper<?>) publisher).unwrap();
} else {
return new PublisherAdapterFromFlow<>(publisher);
}
}

/**
* Convert a {@link Flow.Subscriber} to a {@link Subscriber}.
*
* @param subscriber the subscriber
* @param <T> the items type
* @return the wrapped subscriber
*/
static <T> Subscriber<T> subscriber(Flow.Subscriber<T> subscriber) {
requireNonNull(subscriber, "The subscriber must not be null");
if (subscriber instanceof Wrapper) {
return (Subscriber<T>) ((Wrapper<?>) subscriber).unwrap();
} else {
return new SubscriberAdapterFromFlow<>(subscriber);
}
}

/**
* Convert a {@link Flow.Subscription} to a {@link Subscription}.
*
* @param subscription the subscription
* @return the wrapped subscription
*/
static Subscription subscription(Flow.Subscription subscription) {
requireNonNull(subscription, "The subscription must not be null");
if (subscription instanceof Wrapper) {
return (Subscription) ((Wrapper<?>) subscription).unwrap();
} else {
return new SubscriptionAdapterFromFlow(subscription);
}
}

/**
* Convert a {@link Flow.Processor} to a {@link Processor}.
*
* @param processor the processor
* @param <T> the items type
* @param <R> the output items type
* @return the wrapped processor
*/
static <T, R> Processor<T, R> processor(Flow.Processor<T, R> processor) {
requireNonNull(processor, "The processor must not be null");
if (processor instanceof Wrapper) {
return (Processor<T, R>) ((Wrapper<?>) processor).unwrap();
} else {
return new ProcessorAdapterFromFlow<>(processor);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package mutiny.zero.flow.adapters.common;

public interface Wrapper<T> {

T unwrap();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/**
* A set of adapters from Reactive Streams to/from {@link java.util.concurrent.Flow}.
* <p>
* The methods found in {@link mutiny.zero.flow.adapters.AdaptersToFlow} and
* {@link mutiny.zero.flow.adapters.AdaptersToReactiveStreams}
* avoid excessive wrapping when possible.
* For instance wrapping a flow publisher {@code p1} to a reactive streams publisher {@code p2} and back to a flow
* publisher {@code p3 } yields the original flow publisher where {@code p1 == p3}.
*/
package mutiny.zero.flow.adapters;
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package mutiny.zero.flow.adapters.toflow;

import java.util.concurrent.Flow;

import org.reactivestreams.Processor;

import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
import mutiny.zero.flow.adapters.common.Wrapper;

public class ProcessorAdapterFromRs<T, R> implements Flow.Processor<T, R>, Wrapper<Processor<T, R>> {

private final Processor<T, R> processor;

public ProcessorAdapterFromRs(Processor<T, R> processor) {
this.processor = processor;
}

@Override
public void subscribe(Flow.Subscriber<? super R> subscriber) {
processor.subscribe(AdaptersToReactiveStreams.subscriber(subscriber));
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
processor.onSubscribe(AdaptersToReactiveStreams.subscription(subscription));
}

@Override
public void onNext(T item) {
processor.onNext(item);
}

@Override
public void onError(Throwable throwable) {
processor.onError(throwable);
}

@Override
public void onComplete() {
processor.onComplete();
}

@Override
public Processor<T, R> unwrap() {
return processor;
}
}
Loading

0 comments on commit 2cfe48f

Please sign in to comment.