Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean-room adapters #57

Merged
merged 4 commits into from
May 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/flow-adapters.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# JDK Flow from/to Reactive Streams Adapters

The `rs-flow-adapters` library (Maven coordinates `io.smallrye.reactive:rs-flow-adapters`) provides a *clean room* implementation of adapters from and to the [JDK Flow interfaces](https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/util/concurrent/Flow.html) that match those from Reactive Streams.

## Why another adapter library?

The implementation of our adapters is similar in spirit to [those from the Reactive Streams library](https://github.com/reactive-streams/reactive-streams-jvm/tree/master/api/src/main/java9/org/reactivestreams), but they differ by:

- failing early rather than passing `null` through in some cases,
- shipping under a [proper open source license](https://www.apache.org/licenses/LICENSE-2.0) while the Reactive Streams library hasn't made any progress towards publishing a new release, see [#536](https://github.com/reactive-streams/reactive-streams-jvm/issues/536) and [#530](https://github.com/reactive-streams/reactive-streams-jvm/issues/530)
- having correct JPMS (Java modules) descriptors for those who might need modules rather than the classpath.

## How to use it?

The public API exposes 2 types:

- `AdaptersToFlow` to convert Reactive Streams types to `Flow` types, and
- `AdaptersToReactiveStreams` to convert `Flow` types to Reactive Streams types.

Each type offers factory methods to convert from one type to the other.
For instance here's how you can convert from a Reactive Streams `Publisher` to a `Flow.Publisher`:

```java
Publisher<String> rsPublisher = connect("foo"); // ... where 'connect' returns a Publisher<String>

Flow.Publisher<String> flowPublisher = AdaptersToFlow.publisher(rsPublisher);
```
6 changes: 3 additions & 3 deletions docs/quick-start.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Your main entry point in the Mutiny Zero API is the `mutiny.zero.ZeroPublisher`
If you already know the values to be emitted (or a failure), then you can use following factory methods:

```java linenums="1"
--8<-- "src/test/java/docsamples/FromKnownValues.java"
--8<-- "mutiny-zero/src/test/java/docsamples/FromKnownValues.java"
```

### Creating from `CompletionStage`
Expand All @@ -30,7 +30,7 @@ If you already know the values to be emitted (or a failure), then you can use fo
Mutiny Zero can create a `Publisher` from a `CompletionStage` that emits exactly 1 item or a failure, then a completion signal:

```java linenums="1"
--8<-- "src/test/java/docsamples/FromCompletionStage.java"
--8<-- "mutiny-zero/src/test/java/docsamples/FromCompletionStage.java"
```

### Creating using the general-purpose `Tube` API
Expand All @@ -44,7 +44,7 @@ A `Tube` is a good abstraction if you want to pass events from an existing async
Here is a not so fictional example where `SampleAsyncSource` (an asynchronous I/O API) has to be adapted to a `Publisher`:

```java linenums="1"
--8<-- "src/test/java/docsamples/FromTube.java"
--8<-- "mutiny-zero/src/test/java/docsamples/FromTube.java"
```

Since `SampleAsyncSource` does not support reactive streams but can be paused and resumed, the `Tube` API is used not just to send items but also to control `SampleAsyncSource`.
Expand Down
3 changes: 2 additions & 1 deletion mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ edit_uri: edit/main/docs/

nav:
- Overview: 'index.md'
- Quick start: 'quick-start.md'
- 'quick-start.md'
- Javadoc: './apidocs/index.html'
- 'flow-adapters.md'

copyright: >-
Sponsored by <a href="https://www.redhat.com"><img style="vertical-align: middle; height: 2.5em;" alt="Red Hat" src="./assets/redhat_reversed.svg"/></a> <br/>
Expand Down
30 changes: 30 additions & 0 deletions mutiny-zero-flow-adapters/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<artifactId>mutiny-zero-parent</artifactId>
<groupId>io.smallrye.reactive</groupId>
<version>0.3.0-SNAPSHOT</version>
</parent>

<artifactId>rs-flow-adapters</artifactId>
<name>SmallRye Mutiny Zero JDK Flow / Reactive Streams Adapters</name>
<packaging>jar</packaging>
<description>Adapters from/to JDK Flow and Reactive Streams</description>

<dependencies>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
46 changes: 46 additions & 0 deletions mutiny-zero-flow-adapters/revapi.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
[
{
"extension": "revapi.java",
"id": "java",
"configuration": {
"missing-classes": {
"behavior": "report",
"ignoreMissingAnnotations": false
}
}
},
{
"extension": "revapi.filter",
"configuration": {
"elements": {
"include": [
{
"matcher": "java-package",
"match": "mutiny.zero.flow.adapters"
}
]
}
}
},
{
"extension": "revapi.differences",
"id": "breaking-changes",
"configuration": {
"criticality": "highlight",
"minSeverity": "POTENTIALLY_BREAKING",
"minCriticality": "documented",
"differences": []
}
},
{
"extension": "revapi.reporter.json",
"configuration": {
"minSeverity": "POTENTIALLY_BREAKING",
"minCriticality": "documented",
"output": "target/compatibility.json",
"indent": true,
"append": false,
"keepEmptyFile": true
}
}
]
4 changes: 4 additions & 0 deletions mutiny-zero-flow-adapters/src/main/java/module-info.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module mutiny.zero.flow.adapters {
requires transitive org.reactivestreams;
exports mutiny.zero.flow.adapters;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package mutiny.zero.flow.adapters;

import static java.util.Objects.requireNonNull;

import java.util.concurrent.Flow;

import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import mutiny.zero.flow.adapters.common.Wrapper;
import mutiny.zero.flow.adapters.toflow.ProcessorAdapterFromRs;
import mutiny.zero.flow.adapters.toflow.PublisherAdapterFromRs;
import mutiny.zero.flow.adapters.toflow.SubscriberAdapterFromRs;
import mutiny.zero.flow.adapters.toflow.SubscriptionAdapterFromRs;

/**
* Adapters from Reactive Streams types to {@link Flow} types.
*/
@SuppressWarnings("unchecked")
public interface AdaptersToFlow {

/**
* Convert a {@link Publisher} to a {@link Flow.Publisher}.
*
* @param publisher the publisher
* @param <T> the items type
* @return the wrapped publisher
*/
static <T> Flow.Publisher<T> publisher(Publisher<T> publisher) {
requireNonNull(publisher, "The publisher must not be null");
if (publisher instanceof Wrapper) {
return (Flow.Publisher<T>) ((Wrapper<?>) publisher).unwrap();
} else {
return new PublisherAdapterFromRs<>(publisher);
}
}

/**
* Convert a {@link Subscriber} to a {@link Flow.Subscriber}.
*
* @param subscriber the subscriber
* @param <T> the items type
* @return the wrapped subscriber
*/
static <T> Flow.Subscriber<T> subscriber(Subscriber<T> subscriber) {
requireNonNull(subscriber, "The subscriber must not be null");
if (subscriber instanceof Wrapper) {
return (Flow.Subscriber<T>) ((Wrapper<?>) subscriber).unwrap();
} else {
return new SubscriberAdapterFromRs<>(subscriber);
}
}

/**
* Convert a {@link Subscription} to a {@link Flow.Subscription}.
*
* @param subscription the subscription
* @return the wrapped subscription
*/
static Flow.Subscription subscription(Subscription subscription) {
requireNonNull(subscription, "The subscription must not be null");
if (subscription instanceof Wrapper) {
return (Flow.Subscription) ((Wrapper<?>) subscription).unwrap();
} else {
return new SubscriptionAdapterFromRs(subscription);
}
}

/**
* Convert a {@link Processor} to a {@link Flow.Processor}.
*
* @param processor the processor
* @param <T> the items type
* @param <R> the output items type
* @return the wrapped processor
*/
static <T, R> Flow.Processor<T, R> processor(Processor<T, R> processor) {
requireNonNull(processor, "The processor must not be null");
if (processor instanceof Wrapper) {
return (Flow.Processor<T, R>) ((Wrapper<?>) processor).unwrap();
}
return new ProcessorAdapterFromRs<>(processor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package mutiny.zero.flow.adapters;

import static java.util.Objects.requireNonNull;

import java.util.concurrent.Flow;

import org.reactivestreams.Processor;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import mutiny.zero.flow.adapters.common.Wrapper;
import mutiny.zero.flow.adapters.tors.ProcessorAdapterFromFlow;
import mutiny.zero.flow.adapters.tors.PublisherAdapterFromFlow;
import mutiny.zero.flow.adapters.tors.SubscriberAdapterFromFlow;
import mutiny.zero.flow.adapters.tors.SubscriptionAdapterFromFlow;

/**
* Adapters from {@link Flow} types to Reactive Streams types.
*/
@SuppressWarnings("unchecked")
public interface AdaptersToReactiveStreams {

/**
* Convert a {@link Flow.Publisher} to a {@link Publisher}.
*
* @param publisher the publisher
* @param <T> the items type
* @return the wrapped publisher
*/
static <T> Publisher<T> publisher(Flow.Publisher<T> publisher) {
requireNonNull(publisher, "The publisher must not be null");
if (publisher instanceof Wrapper) {
return (Publisher<T>) ((Wrapper<?>) publisher).unwrap();
} else {
return new PublisherAdapterFromFlow<>(publisher);
}
}

/**
* Convert a {@link Flow.Subscriber} to a {@link Subscriber}.
*
* @param subscriber the subscriber
* @param <T> the items type
* @return the wrapped subscriber
*/
static <T> Subscriber<T> subscriber(Flow.Subscriber<T> subscriber) {
requireNonNull(subscriber, "The subscriber must not be null");
if (subscriber instanceof Wrapper) {
return (Subscriber<T>) ((Wrapper<?>) subscriber).unwrap();
} else {
return new SubscriberAdapterFromFlow<>(subscriber);
}
}

/**
* Convert a {@link Flow.Subscription} to a {@link Subscription}.
*
* @param subscription the subscription
* @return the wrapped subscription
*/
static Subscription subscription(Flow.Subscription subscription) {
requireNonNull(subscription, "The subscription must not be null");
if (subscription instanceof Wrapper) {
return (Subscription) ((Wrapper<?>) subscription).unwrap();
} else {
return new SubscriptionAdapterFromFlow(subscription);
}
}

/**
* Convert a {@link Flow.Processor} to a {@link Processor}.
*
* @param processor the processor
* @param <T> the items type
* @param <R> the output items type
* @return the wrapped processor
*/
static <T, R> Processor<T, R> processor(Flow.Processor<T, R> processor) {
requireNonNull(processor, "The processor must not be null");
if (processor instanceof Wrapper) {
return (Processor<T, R>) ((Wrapper<?>) processor).unwrap();
} else {
return new ProcessorAdapterFromFlow<>(processor);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package mutiny.zero.flow.adapters.common;

public interface Wrapper<T> {

T unwrap();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/**
* A set of adapters from Reactive Streams to/from {@link java.util.concurrent.Flow}.
* <p>
* The methods found in {@link mutiny.zero.flow.adapters.AdaptersToFlow} and
* {@link mutiny.zero.flow.adapters.AdaptersToReactiveStreams}
* avoid excessive wrapping when possible.
* For instance wrapping a flow publisher {@code p1} to a reactive streams publisher {@code p2} and back to a flow
* publisher {@code p3 } yields the original flow publisher where {@code p1 == p3}.
*/
package mutiny.zero.flow.adapters;
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package mutiny.zero.flow.adapters.toflow;

import java.util.concurrent.Flow;

import org.reactivestreams.Processor;

import mutiny.zero.flow.adapters.AdaptersToReactiveStreams;
import mutiny.zero.flow.adapters.common.Wrapper;

public class ProcessorAdapterFromRs<T, R> implements Flow.Processor<T, R>, Wrapper<Processor<T, R>> {

private final Processor<T, R> processor;

public ProcessorAdapterFromRs(Processor<T, R> processor) {
this.processor = processor;
}

@Override
public void subscribe(Flow.Subscriber<? super R> subscriber) {
processor.subscribe(AdaptersToReactiveStreams.subscriber(subscriber));
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
processor.onSubscribe(AdaptersToReactiveStreams.subscription(subscription));
}

@Override
public void onNext(T item) {
processor.onNext(item);
}

@Override
public void onError(Throwable throwable) {
processor.onError(throwable);
}

@Override
public void onComplete() {
processor.onComplete();
}

@Override
public Processor<T, R> unwrap() {
return processor;
}
}
Loading