Skip to content

Commit

Permalink
Merge pull request #1761 from jponge/feat/emitOn-bufferSize
Browse files Browse the repository at this point in the history
  • Loading branch information
cescoffier authored Dec 10, 2024
2 parents fbfda01 + 2d225a8 commit 4136156
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 15 deletions.
16 changes: 15 additions & 1 deletion implementation/revapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,21 @@
"criticality" : "highlight",
"minSeverity" : "POTENTIALLY_BREAKING",
"minCriticality" : "documented",
"differences" : [ ]
"differences" : [
{
"ignore": true,
"code": "java.method.addedToInterface",
"new": "method io.smallrye.mutiny.Multi<T> io.smallrye.mutiny.Multi<T>::emitOn(java.util.concurrent.Executor, int)",
"justification": "The emitOn() internal buffer size must be configurable"
},
{
"ignore": true,
"code": "java.method.numberOfParametersChanged",
"old": "method void io.smallrye.mutiny.operators.multi.MultiEmitOnOp<T>::<init>(io.smallrye.mutiny.Multi<? extends T>, java.util.concurrent.Executor)",
"new": "method void io.smallrye.mutiny.operators.multi.MultiEmitOnOp<T>::<init>(io.smallrye.mutiny.Multi<? extends T>, java.util.concurrent.Executor, int)",
"justification": "The emitOn() internal buffer size must be configurable"
}
]
}
}, {
"extension" : "revapi.reporter.json",
Expand Down
31 changes: 31 additions & 0 deletions implementation/src/main/java/io/smallrye/mutiny/Multi.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,9 @@ default <O> O stage(Function<Multi<T>, O> stage) {
* Produces a new {@link Multi} invoking the {@code onItem}, {@code onFailure} and {@code onCompletion} methods
* on the supplied {@link Executor}.
* <p>
* This operator delegates to {@link #emitOn(Executor, int)} with a default buffer size of
* {@link Infrastructure#getBufferSizeS()} items.
* <p>
* Instead of receiving the {@code item} event on the thread firing the event, this method influences the
* threading context to switch to a thread from the given executor. Same behavior for failure and completion.
* <p>
Expand All @@ -223,10 +226,38 @@ default <O> O stage(Function<Multi<T>, O> stage) {
*
* @param executor the executor to use, must not be {@code null}
* @return a new {@link Multi}
* @see #emitOn(Executor, int)
*/
@CheckReturnValue
Multi<T> emitOn(Executor executor);

/**
* Produces a new {@link Multi} invoking the {@code onItem}, {@code onFailure} and {@code onCompletion} methods
* on the supplied {@link Executor}.
* <p>
* This operator uses a queue of capacity {@code bufferSize} to emit items from a running executor thread,
* reducing the need for thread context switches when items are emitted fast from the upstream.
* <p>
* This operator tracks {@link Subscription#request(long)} demand, but it does not forward requests to the upstream.
* It instead requests {@code bufferSize} elements at subscription time and whenever {@code bufferSize} items have
* been emitted, allowing for efficient batching.
* <p>
* Instead of receiving the {@code item} event on the thread firing the event, this method influences the
* threading context to switch to a thread from the given executor. Same behavior for failure and completion.
* <p>
* Note that the subscriber is guaranteed to never be called concurrently.
* <p>
* <strong>Be careful as this operator can lead to concurrency problems with non thread-safe objects such as
* CDI request-scoped beans.
* It might also break reactive-streams semantics with items being emitted concurrently.</strong>
*
* @param executor the executor to use, must not be {@code null}
* @param bufferSize the buffer size, must be strictly positive
* @return a new {@link Multi}
*/
@CheckReturnValue
Multi<T> emitOn(Executor executor, int bufferSize);

/**
* When a subscriber subscribes to this {@link Multi}, execute the subscription to the upstream {@link Multi} on a
* thread from the given executor. As a result, the {@link Subscriber#onSubscribe(Subscription)} method will be called
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.smallrye.mutiny.groups.MultiSelect;
import io.smallrye.mutiny.groups.MultiSkip;
import io.smallrye.mutiny.groups.MultiSubscribe;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.operators.multi.MultiCacheOp;
import io.smallrye.mutiny.operators.multi.MultiDemandCapping;
Expand Down Expand Up @@ -102,7 +103,16 @@ public Multi<T> cache() {

@Override
public Multi<T> emitOn(Executor executor) {
return Infrastructure.onMultiCreation(new MultiEmitOnOp<>(this, nonNull(executor, "executor")));
return emitOn(executor, Infrastructure.getBufferSizeS());
}

@Override
public Multi<T> emitOn(Executor executor, int bufferSize) {
return Infrastructure.onMultiCreation(
new MultiEmitOnOp<>(
this,
nonNull(executor, "executor"),
ParameterValidation.positive(bufferSize, "bufferSize")));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.ParameterValidation;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.helpers.queues.Queues;
import io.smallrye.mutiny.infrastructure.Infrastructure;
import io.smallrye.mutiny.subscription.BackPressureFailure;
import io.smallrye.mutiny.subscription.MultiSubscriber;

Expand All @@ -27,24 +25,25 @@
public class MultiEmitOnOp<T> extends AbstractMultiOperator<T, T> {

private final Executor executor;
private final Supplier<? extends Queue<T>> queueSupplier = Queues.get(Infrastructure.getBufferSizeS());
private final int bufferSize;

public MultiEmitOnOp(Multi<? extends T> upstream, Executor executor) {
public MultiEmitOnOp(Multi<? extends T> upstream, Executor executor, int bufferSize) {
super(upstream);
this.executor = ParameterValidation.nonNull(executor, "executor");
this.executor = executor;
this.bufferSize = bufferSize;
}

@Override
public void subscribe(MultiSubscriber<? super T> downstream) {
ParameterValidation.nonNullNpe(downstream, "subscriber");
upstream.subscribe().withSubscriber(new MultiEmitOnProcessor<>(downstream, executor, queueSupplier));
upstream.subscribe().withSubscriber(new MultiEmitOnProcessor<>(downstream, executor, bufferSize));
}

static final class MultiEmitOnProcessor<T> extends MultiOperatorProcessor<T, T> implements Runnable {

private final Executor executor;

private final int limit;
private final int bufferSize;

// State variables

Expand Down Expand Up @@ -75,18 +74,18 @@ static final class MultiEmitOnProcessor<T> extends MultiOperatorProcessor<T, T>

MultiEmitOnProcessor(MultiSubscriber<? super T> downstream,
Executor executor,
Supplier<? extends Queue<T>> queueSupplier) {
int bufferSize) {
super(downstream);
this.executor = executor;
this.limit = 16;
this.queue = queueSupplier.get();
this.bufferSize = bufferSize;
this.queue = Queues.createMpscArrayQueue(bufferSize);
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
if (compareAndSetUpstreamSubscription(null, subscription)) {
downstream.onSubscribe(this);
subscription.request(16);
subscription.request(bufferSize);
} else {
subscription.cancel();
}
Expand Down Expand Up @@ -200,7 +199,7 @@ public void run() {

// updating the number of emitted items.
emitted++;
if (emitted == limit) {
if (emitted == bufferSize) {
if (requests != Long.MAX_VALUE) {
requests = requested.addAndGet(-emitted);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package io.smallrye.mutiny.operators;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.*;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -46,6 +47,20 @@ public void testWithSequenceOfItems() {
.assertItems(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
}

@RepeatedTest(10)
public void testWithSequenceOfItemsAndBufferSize() {
AtomicInteger requestSignalsCount = new AtomicInteger();
AssertSubscriber<Integer> subscriber = Multi.createFrom().items(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.onRequest().invoke(requestSignalsCount::incrementAndGet)
.emitOn(executor, 3)
.subscribe().withSubscriber(AssertSubscriber.create());

subscriber.request(Long.MAX_VALUE)
.awaitCompletion()
.assertItems(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
assertThat(requestSignalsCount.get()).isEqualTo(4);
}

@Test
public void testWithRequest0() {
AssertSubscriber<Integer> subscriber = Multi.createFrom().items(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
Expand Down Expand Up @@ -92,4 +107,18 @@ public void subscribe(MultiSubscriber<? super Integer> subscriber) {
subscriber.assertFailedWith(BackPressureFailure.class, "");
}

@Test
public void testBufferSizeValidation() {
Multi<Integer> multi = Multi.createFrom().items(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

assertThatThrownBy(() -> multi.emitOn(executor, -58))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("bufferSize");

assertThatThrownBy(() -> multi.emitOn(executor, 0))
.isInstanceOf(IllegalArgumentException.class)
.hasMessageContaining("bufferSize");

assertThatCode(() -> multi.emitOn(executor, 58)).doesNotThrowAnyException();
}
}

0 comments on commit 4136156

Please sign in to comment.