Skip to content

Commit

Permalink
Merge edab212 into 912193f
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier authored Mar 13, 2023
2 parents 912193f + edab212 commit df34777
Show file tree
Hide file tree
Showing 28 changed files with 80 additions and 77 deletions.
1 change: 0 additions & 1 deletion documentation/docs/guides/shortcut-methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ The following table lists the available shortcuts available by the `Uni` class:
| `uni.map(x -> y)` | `uni.onItem().transform(x -> y)` |
| `uni.flatMap(x -> uni2)` | `uni.onItem().transformToUni(x -> uni2)` |
| `uni.chain(x -> uni2)` | `uni.onItem().transformToUni(x -> uni2)` |
| `uni.then(() -> uni2)` | `uni.onItem().transformToUni(ignored -> uni2)` |
| `uni.invoke(x -> System.out.println(x))` | `uni.onItem().invoke(x -> System.out.println(x))` |
| `uni.call(x -> uni2)` | `uni.onItem().call(x -> uni2)` |
| `uni.eventually(() -> System.out.println("eventually"))` | `uni.onItemOrFailure().invoke((ignoredItem, ignoredException) -> System.out.println("eventually"))` |
Expand Down
2 changes: 1 addition & 1 deletion implementation/src/main/java/io/smallrye/mutiny/Multi.java
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,7 @@ default Multi<ItemWithContext<T>> attachContext() {
* Cap all downstream subscriber requests to a maximum value.
* <p>
* This is a shortcut for:
*
*
* <pre>
* multi.capDemandsUsing(outstanding -&gt; Math.min(outstanding, actual))
* </pre>
Expand Down
2 changes: 1 addition & 1 deletion implementation/src/main/java/io/smallrye/mutiny/Uni.java
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ default <O> Uni<O> chain(Supplier<Uni<? extends O>> supplier) {
* Session session = getSomeSession();
* session.find(Fruit.class, id)
* .chain(fruit -> session.remove(fruit)
* .then(() -> session.flush())
* .chain(ignored -> session.flush())
* .eventually(() -> session.close());
* }
* </pre>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ public interface GeneratorEmitter<T> {

/**
* Emit an item.
*
*
* @param item the item
*/
void emit(T item);

/**
* Emit a failure and terminate the stream.
*
*
* @param failure the failure
*/
void fail(Throwable failure);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* retry ({@link #retry()}). Maybe, you just want to look at the failure ({@link #invoke(Consumer)}).
* <p>
* You can configure the type of failure on which your handler is called using:
*
*
* <pre>
* {@code
* multi.onFailure(IOException.class).recoverWithItem("boom")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public MultiOnRequest(Multi<T> upstream) {
* Action when items are being requested.
* The request is propagated upstream when the action has completed.
* An error is forwarded downstream if the action throws an exception.
*
*
* @param consumer the action
* @return the new {@link Multi}
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public Multi<T> invoke(Runnable action) {
/**
* Attaches an action that is executed when the {@link Multi} emits a completion or a failure or when the subscriber
* cancels the subscription.
*
*
* @param mapper the function to execute where the first argument is a non-{@code null} exception on failure, and
* the second argument is a boolean which is {@code true} when the subscriber cancels the subscription.
* The function returns a {@link Uni} and must not be {@code null}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public <T> Uni<T> completionStage(Supplier<? extends CompletionStage<? extends T
* immediately after subscription. If it's not the case the callbacks of the subscriber are called on the thread used to
* wait the result (a thread from the Mutiny infrastructure default executor).
* <p>
*
*
* @param future the future, must not be {@code null}
* @param <T> the type of item
* @return the produced {@link Uni}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public boolean getAsBoolean() {

/**
* Memoize the received item or failure indefinitely.
*
*
* @return a new {@link Uni}
* @apiNote This is an experimental API
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* ({@link #invoke(Consumer)}).
* <p>
* You can configure the type of failure on which your handler is called using:
*
*
* <pre>
* {@code
* uni.onFailure(IOException.class).recoverWithItem("boom")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* uni.onSubscription().invoke(sub -> System.out.println("subscribed"));
* // Delay the subscription by 1 second (or until an asynchronous action completes)
* uni.onSubscription().call(sub -> Uni.createFrom(1).onItem().delayIt().by(Duration.ofSecond(1)));
*}
* }
* </pre>
*
* @param <T> the type of item
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
* <li>3.9: negative requests should emit an onError(IllegalArgumentException)</li>
* <li>2.12: onSubscribe must be called at most once (subscription cancelled and onError called)</li>
* </ul>
*
*
* @param <T> the type of item
*/
public class StrictMultiSubscriber<T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public static <T> Supplier<Queue<T>> get(int bufferSize) {
/**
* Returns an unbounded Queue.
* The queue is array-backed. Each array has the given size. If the queue is full, new arrays can be allocated.
*
*
* @param size the size of the array
* @param <T> the type of item
* @return the unbound queue supplier
Expand All @@ -90,7 +90,7 @@ public static <T> Supplier<Queue<T>> unbounded(int size) {

/**
* Creates a new multi-producer single consumer unbounded queue.
*
*
* @param <T> the type of item
* @return the queue
*/
Expand All @@ -100,7 +100,7 @@ public static <T> Queue<T> createMpscQueue() {

/**
* Create a queue of a strict fixed size.
*
*
* @param size the queue size
* @param <T> the elements type
* @return a new queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public interface Spy {

/**
* Spy {@link Uni#onSubscription()} events.
*
*
* @param upstream the upstream
* @param <T> the item type
* @return a new {@link Uni}
Expand Down Expand Up @@ -195,7 +195,7 @@ static <T> MultiOnItemSpy<T> onItem(Multi<T> upstream) {

/**
* Spy {@link Multi#onItem()} events and optionally keep track of the items.
*
*
* @param upstream the upstream
* @param trackItems {@code true} when items shall be tracked, {@code false} otherwise
* @param <T> the items type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ static final class ConcatMapInner<O> extends SwitchableSubscriptionSubscriber<O>
/**
* Downstream passed as {@code null} to {@link SwitchableSubscriptionSubscriber} as accessors are not reachable.
* Effective downstream is {@code parent}.
*
*
* @param parent parent as downstream
*/
ConcatMapInner(ConcatMapMainSubscriber<?, O> parent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

/**
* Scan operator accumulating items of the same type as the upstream.
*
*
* @param <T> the type of item
*/
public final class MultiScanOp<T> extends AbstractMultiOperator<T, T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public MultiSkipFirstOp(Multi<? extends T> upstream, long numberOfItems) {
@Override
public void subscribe(MultiSubscriber<? super T> actual) {
if (numberOfItems == 0) {
// Pass-through
// Pass-through
upstream.subscribe(actual);
} else {
upstream.subscribe(new SkipFirstProcessor<>(actual, numberOfItems));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public SafeSubscriber(Subscriber<? super T> downstream) {

/**
* For testing purpose only.
*
*
* @return whether the subscriber is in a terminal state.
*/
boolean isDone() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void testCreationFromSupplier() {
throw new IllegalStateException("boom");
})
.await().indefinitely()).isInstanceOf(IllegalStateException.class)
.hasMessageContaining("boom");
.hasMessageContaining("boom");

AtomicInteger counter = new AtomicInteger();
Uni<Integer> uni = Uni.createFrom().item(counter::incrementAndGet);
Expand Down Expand Up @@ -76,7 +76,7 @@ public void testCreationFromFailureSupplier() {
throw new IllegalStateException("boom");
})
.await().indefinitely()).isInstanceOf(IllegalStateException.class)
.hasMessageContaining("boom");
.hasMessageContaining("boom");

AtomicInteger counter = new AtomicInteger();
Uni<Integer> uni = Uni.createFrom().failure(() -> new IllegalStateException("boom-" + counter.incrementAndGet()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,12 @@ public void testAwaitWithTimeout() {
assertThatThrownBy(() -> await()
.pollDelay(Duration.ofMillis(1))
.atMost(Duration.ofMillis(2)).untilAsserted(() -> subscriber.awaitFailure(DEFAULT_TIMEOUT)))
.isInstanceOf(ConditionTimeoutException.class);
.isInstanceOf(ConditionTimeoutException.class);

assertThatThrownBy(() -> await()
.pollDelay(Duration.ofMillis(1))
.atMost(Duration.ofMillis(2)).untilAsserted(subscriber::awaitCompletion))
.isInstanceOf(ConditionTimeoutException.class);
.isInstanceOf(ConditionTimeoutException.class);
}

@Test
Expand Down
Loading

0 comments on commit df34777

Please sign in to comment.