Skip to content

Commit

Permalink
2.x Add concatMapCompletable() to Observable (#5649)
Browse files Browse the repository at this point in the history
* Initial implementation of Observable.concatMapCompletable()

* Update serialVersionUID

* Fix javadoc and verify prefetch is positive

* Put back auto-changed whitespace

* Put back more whitespace intellij is removing

* More javadoc fixes

* switch from testng to junit

* Add experimental annotation and change prefetch to capacityHint
  • Loading branch information
dweebo authored and akarnokd committed Oct 9, 2017
1 parent bb1e313 commit e1cb606
Show file tree
Hide file tree
Showing 3 changed files with 548 additions and 0 deletions.
46 changes: 46 additions & 0 deletions src/main/java/io/reactivex/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -6251,6 +6251,52 @@ public final <R> Observable<R> concatMapEagerDelayError(Function<? super T, ? ex
return RxJavaPlugins.onAssembly(new ObservableConcatMapEager<T, R>(this, mapper, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY, maxConcurrency, prefetch));
}

/**
* Maps each element of the upstream Observable into CompletableSources, subscribes to them one at a time in
* order and waits until the upstream and all CompletableSources complete.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param mapper
* a function that, when applied to an item emitted by the source ObservableSource, returns a CompletableSource
* @return a Completable that signals {@code onComplete} when the upstream and all CompletableSources complete
* @since 2.1.6 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable concatMapCompletable(Function<? super T, ? extends CompletableSource> mapper) {
return concatMapCompletable(mapper, 2);
}

/**
* Maps each element of the upstream Observable into CompletableSources, subscribes to them one at a time in
* order and waits until the upstream and all CompletableSources complete.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code concatMapCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
* </dl>
*
* @param mapper
* a function that, when applied to an item emitted by the source ObservableSource, returns a CompletableSource
*
* @param capacityHint
* the number of upstream items expected to be buffered until the current CompletableSource, mapped from
* the current item, completes.
* @return a Completable that signals {@code onComplete} when the upstream and all CompletableSources complete
* @since 2.1.6 - experimental
*/
@Experimental
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Completable concatMapCompletable(Function<? super T, ? extends CompletableSource> mapper, int capacityHint) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
ObjectHelper.verifyPositive(capacityHint, "capacityHint");
return RxJavaPlugins.onAssembly(new ObservableConcatMapCompletable<T>(this, mapper, capacityHint));
}

/**
* Returns an Observable that concatenate each item emitted by the source ObservableSource with the values in an
* Iterable corresponding to that item that is generated by a selector.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
/**
* Copyright (c) 2016-present, RxJava Contributors.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
* the License for the specific language governing permissions and limitations under the License.
*/
package io.reactivex.internal.operators.observable;

import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.Function;
import io.reactivex.internal.disposables.DisposableHelper;
import io.reactivex.internal.disposables.SequentialDisposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.QueueDisposable;
import io.reactivex.internal.fuseable.SimpleQueue;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;
import io.reactivex.plugins.RxJavaPlugins;

import java.util.concurrent.atomic.AtomicInteger;

public final class ObservableConcatMapCompletable<T> extends Completable {

final ObservableSource<T> source;
final Function<? super T, ? extends CompletableSource> mapper;
final int bufferSize;

public ObservableConcatMapCompletable(ObservableSource<T> source,
Function<? super T, ? extends CompletableSource> mapper, int bufferSize) {
this.source = source;
this.mapper = mapper;
this.bufferSize = Math.max(8, bufferSize);
}
@Override
public void subscribeActual(CompletableObserver observer) {
source.subscribe(new SourceObserver<T>(observer, mapper, bufferSize));
}

static final class SourceObserver<T> extends AtomicInteger implements Observer<T>, Disposable {

private static final long serialVersionUID = 6893587405571511048L;
final CompletableObserver actual;
final SequentialDisposable sa;
final Function<? super T, ? extends CompletableSource> mapper;
final CompletableObserver inner;
final int bufferSize;

SimpleQueue<T> queue;

Disposable s;

volatile boolean active;

volatile boolean disposed;

volatile boolean done;

int sourceMode;

SourceObserver(CompletableObserver actual,
Function<? super T, ? extends CompletableSource> mapper, int bufferSize) {
this.actual = actual;
this.mapper = mapper;
this.bufferSize = bufferSize;
this.inner = new InnerObserver(actual, this);
this.sa = new SequentialDisposable();
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;

int m = qd.requestFusion(QueueDisposable.ANY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;

actual.onSubscribe(this);

drain();
return;
}

if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;

actual.onSubscribe(this);

return;
}
}

queue = new SpscLinkedArrayQueue<T>(bufferSize);

actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode == QueueDisposable.NONE) {
queue.offer(t);
}
drain();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
done = true;
dispose();
actual.onError(t);
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
drain();
}

void innerComplete() {
active = false;
drain();
}

@Override
public boolean isDisposed() {
return disposed;
}

@Override
public void dispose() {
disposed = true;
sa.dispose();
s.dispose();

if (getAndIncrement() == 0) {
queue.clear();
}
}

void innerSubscribe(Disposable s) {
sa.update(s);
}

void drain() {
if (getAndIncrement() != 0) {
return;
}

for (;;) {
if (disposed) {
queue.clear();
return;
}
if (!active) {

boolean d = done;

T t;

try {
t = queue.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
dispose();
queue.clear();
actual.onError(ex);
return;
}

boolean empty = t == null;

if (d && empty) {
disposed = true;
actual.onComplete();
return;
}

if (!empty) {
CompletableSource c;

try {
c = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null CompletableSource");
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
dispose();
queue.clear();
actual.onError(ex);
return;
}

active = true;
c.subscribe(inner);
}
}

if (decrementAndGet() == 0) {
break;
}
}
}

static final class InnerObserver implements CompletableObserver {
final CompletableObserver actual;
final SourceObserver<?> parent;

InnerObserver(CompletableObserver actual, SourceObserver<?> parent) {
this.actual = actual;
this.parent = parent;
}

@Override
public void onSubscribe(Disposable s) {
parent.innerSubscribe(s);
}

@Override
public void onError(Throwable t) {
parent.dispose();
actual.onError(t);
}
@Override
public void onComplete() {
parent.innerComplete();
}
}
}
}
Loading

0 comments on commit e1cb606

Please sign in to comment.