From 8c6888858c7a3978340b54e478c031336a2ec377 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 4 May 2013 14:35:35 +0200 Subject: [PATCH 1/7] trying to generalize scan, however drop is still missing... --- .../main/java/rx/operators/OperationScan.java | 72 ++++++++++--------- 1 file changed, 39 insertions(+), 33 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationScan.java b/rxjava-core/src/main/java/rx/operators/OperationScan.java index 91d2078e730..a1b17ed4c1f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationScan.java +++ b/rxjava-core/src/main/java/rx/operators/OperationScan.java @@ -26,6 +26,7 @@ import rx.Observer; import rx.Subscription; import rx.util.AtomicObservableSubscription; +import rx.util.functions.Action1; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -43,8 +44,8 @@ public final class OperationScan { * @return An observable sequence whose elements are the result of accumulating the output from the list of Observables. * @see http://msdn.microsoft.com/en-us/library/hh211665(v=vs.103).aspx */ - public static Func1, Subscription> scan(Observable sequence, T initialValue, Func2 accumulator) { - return new Accumulator(sequence, initialValue, accumulator); + public static Func1, Subscription> scan(Observable sequence, R initialValue, Func2 accumulator) { + return new Accumulator(sequence, initialValue, accumulator); } /** @@ -59,26 +60,49 @@ public static Func1, Subscription> scan(Observable sequence, * @see http://msdn.microsoft.com/en-us/library/hh211665(v=vs.103).aspx */ public static Func1, Subscription> scan(Observable sequence, Func2 accumulator) { - return new Accumulator(sequence, null, accumulator); + return new AccuWithoutInitialValue(sequence, accumulator); } - private static class Accumulator implements Func1, Subscription> { + private static class AccuWithoutInitialValue implements Func1, Subscription> { private final Observable sequence; - private final T initialValue; - private Func2 accumlatorFunction; + private final Func2 accumlatorFunction; + private T initialValue; + + private AccuWithoutInitialValue(Observable sequence, Func2 accumulator) { + this.sequence = sequence; + this.accumlatorFunction = accumulator; + } + + @Override + public Subscription call(final Observer observer) { + sequence.take(1).subscribe(new Action1() { + @Override + public void call(T value) { + initialValue = value; + observer.onNext(value); + } + }); + Accumulator scan = new Accumulator(sequence /* FIXME .drop(1) */, initialValue, accumlatorFunction); + return scan.call(observer); + } + } + + private static class Accumulator implements Func1, Subscription> { + private final Observable sequence; + private final R initialValue; + private final Func2 accumlatorFunction; private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); - private Accumulator(Observable sequence, T initialValue, Func2 accumulator) { + private Accumulator(Observable sequence, R initialValue, Func2 accumulator) { this.sequence = sequence; this.initialValue = initialValue; this.accumlatorFunction = accumulator; } - public Subscription call(final Observer observer) { - + @Override + public Subscription call(final Observer observer) { return subscription.wrap(sequence.subscribe(new Observer() { - private T acc = initialValue; - private boolean hasSentInitialValue = false; + private R acc = initialValue; /** * We must synchronize this because we can't allow @@ -87,26 +111,11 @@ public Subscription call(final Observer observer) { * * Because it's synchronized it's using non-atomic variables since everything in this method is single-threaded */ + @Override public synchronized void onNext(T value) { - if (acc == null) { - // we assume that acc is not allowed to be returned from accumulatorValue - // so it's okay to check null as being the state we initialize on - acc = value; - // this is all we do for this first value if we didn't have an initialValue - return; - } - if (!hasSentInitialValue) { - hasSentInitialValue = true; - observer.onNext(acc); - } - try { acc = accumlatorFunction.call(acc, value); - if (acc == null) { - onError(new IllegalArgumentException("Null is an unsupported return value for an accumulator.")); - return; - } observer.onNext(acc); } catch (Exception ex) { observer.onError(ex); @@ -115,16 +124,13 @@ public synchronized void onNext(T value) { } } + @Override public void onError(Exception ex) { observer.onError(ex); } - // synchronized because we access 'hasSentInitialValue' - public synchronized void onCompleted() { - // if only one sequence value existed, we send it without any accumulation - if (!hasSentInitialValue) { - observer.onNext(acc); - } + @Override + public void onCompleted() { observer.onCompleted(); } })); From a643ba71dceb8678515621fff998e2aa396af917 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 4 May 2013 14:49:14 +0200 Subject: [PATCH 2/7] found drop (it's called skip here...) - finished with generalizing scan --- rxjava-core/src/main/java/rx/Observable.java | 49 ++++++++++++++++--- .../main/java/rx/operators/OperationScan.java | 34 ++++++------- 2 files changed, 58 insertions(+), 25 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 36215718ce2..55ab8c84fd7 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -252,6 +252,7 @@ public Subscription subscribe(final Map callbacks) { */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { Object onComplete = callbacks.get("onCompleted"); if (onComplete != null) { @@ -259,6 +260,7 @@ public void onCompleted() { } } + @Override public void onError(Exception e) { handleError(e); Object onError = callbacks.get("onError"); @@ -267,6 +269,7 @@ public void onError(Exception e) { } } + @Override public void onNext(Object args) { onNext.call(args); } @@ -298,15 +301,18 @@ public Subscription subscribe(final Object o) { */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { // do nothing } + @Override public void onError(Exception e) { handleError(e); // no callback defined } + @Override public void onNext(Object args) { onNext.call(args); } @@ -327,15 +333,18 @@ public Subscription subscribe(final Action1 onNext) { */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { // do nothing } + @Override public void onError(Exception e) { handleError(e); // no callback defined } + @Override public void onNext(T args) { if (onNext == null) { throw new RuntimeException("onNext must be implemented"); @@ -365,10 +374,12 @@ public Subscription subscribe(final Object onNext, final Object onError) { */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { // do nothing } + @Override public void onError(Exception e) { handleError(e); if (onError != null) { @@ -376,6 +387,7 @@ public void onError(Exception e) { } } + @Override public void onNext(Object args) { onNextFunction.call(args); } @@ -396,10 +408,12 @@ public Subscription subscribe(final Action1 onNext, final Action1 */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { // do nothing } + @Override public void onError(Exception e) { handleError(e); if (onError != null) { @@ -407,6 +421,7 @@ public void onError(Exception e) { } } + @Override public void onNext(T args) { if (onNext == null) { throw new RuntimeException("onNext must be implemented"); @@ -436,12 +451,14 @@ public Subscription subscribe(final Object onNext, final Object onError, final O */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { if (onComplete != null) { Functions.from(onComplete).call(); } } + @Override public void onError(Exception e) { handleError(e); if (onError != null) { @@ -449,6 +466,7 @@ public void onError(Exception e) { } } + @Override public void onNext(Object args) { onNextFunction.call(args); } @@ -469,10 +487,12 @@ public Subscription subscribe(final Action1 onNext, final Action1 */ return protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { onComplete.call(); } + @Override public void onError(Exception e) { handleError(e); if (onError != null) { @@ -480,6 +500,7 @@ public void onError(Exception e) { } } + @Override public void onNext(T args) { if (onNext == null) { throw new RuntimeException("onNext must be implemented"); @@ -516,10 +537,12 @@ public void forEach(final Action1 onNext) { * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" */ protectivelyWrapAndSubscribe(new Observer() { + @Override public void onCompleted() { latch.countDown(); } + @Override public void onError(Exception e) { /* * If we receive an onError event we set the reference on the outer thread @@ -531,6 +554,7 @@ public void onError(Exception e) { latch.countDown(); } + @Override public void onNext(T args) { onNext.call(args); } @@ -582,6 +606,7 @@ public void forEach(final Object o) { forEach(new Action1() { + @Override public void call(Object args) { onNext.call(args); } @@ -1846,6 +1871,8 @@ public T call(T t1, T t2) { * * @param * the type item emitted by the source Observable + * @param + * the type returned for each item of the target observable * @param sequence * the source Observable * @param initialValue @@ -1857,7 +1884,7 @@ public T call(T t1, T t2) { * output from the sequence emitted by the source Observable * @see MSDN: Observable.Scan */ - public static Observable scan(Observable sequence, T initialValue, Func2 accumulator) { + public static Observable scan(Observable sequence, R initialValue, Func2 accumulator) { return create(OperationScan.scan(sequence, initialValue, accumulator)); } @@ -1871,6 +1898,8 @@ public static Observable scan(Observable sequence, T initialValue, Fun * * @param * the type item emitted by the source Observable + * @param + * the type returned for each item of the target observable * @param sequence * the source Observable * @param initialValue @@ -1882,17 +1911,16 @@ public static Observable scan(Observable sequence, T initialValue, Fun * output from the sequence emitted by the source Observable * @see MSDN: Observable.Scan */ - public static Observable scan(final Observable sequence, final T initialValue, final Object accumulator) { + public static Observable scan(final Observable sequence, final R initialValue, final Object accumulator) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(accumulator); - return scan(sequence, initialValue, new Func2() { + return scan(sequence, initialValue, new Func2() { @SuppressWarnings("unchecked") @Override - public T call(T t1, T t2) { - return (T) _f.call(t1, t2); + public R call(R r, T t) { + return (R) _f.call(r, t); } - }); } @@ -2743,6 +2771,7 @@ public Observable filter(final Object callback) { final FuncN _f = Functions.from(callback); return filter(this, new Func1() { + @Override public Boolean call(T t1) { return (Boolean) _f.call(t1); } @@ -2913,6 +2942,7 @@ public Observable map(final Object callback) { final FuncN _f = Functions.from(callback); return map(this, new Func1() { + @Override @SuppressWarnings("unchecked") public R call(T t1) { return (R) _f.call(t1); @@ -2963,6 +2993,7 @@ public Observable mapMany(final Object callback) { final FuncN _f = Functions.from(callback); return mapMany(this, new Func1>() { + @Override @SuppressWarnings("unchecked") public Observable call(T t1) { return (Observable) _f.call(t1); @@ -3071,6 +3102,7 @@ public Observable onErrorResumeNext(final Object resumeFunction) { final FuncN _f = Functions.from(resumeFunction); return onErrorResumeNext(this, new Func1>() { + @Override @SuppressWarnings("unchecked") public Observable call(Exception e) { return (Observable) _f.call(e); @@ -3152,6 +3184,7 @@ public Observable onErrorReturn(final Object resumeFunction) { final FuncN _f = Functions.from(resumeFunction); return onErrorReturn(this, new Func1() { + @Override @SuppressWarnings("unchecked") public T call(Exception e) { return (T) _f.call(e); @@ -3330,7 +3363,7 @@ public Observable scan(final Object accumulator) { * the list of Observables. * @see MSDN: Observable.Scan */ - public Observable scan(T initialValue, Func2 accumulator) { + public Observable scan(R initialValue, Func2 accumulator) { return scan(this, initialValue, accumulator); } @@ -3353,7 +3386,7 @@ public Observable scan(T initialValue, Func2 accumulator) { * the list of Observables. * @see MSDN: Observable.Scan */ - public Observable scan(final T initialValue, final Object accumulator) { + public Observable scan(final R initialValue, final Object accumulator) { return scan(this, initialValue, accumulator); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationScan.java b/rxjava-core/src/main/java/rx/operators/OperationScan.java index a1b17ed4c1f..6417abb06d5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationScan.java +++ b/rxjava-core/src/main/java/rx/operators/OperationScan.java @@ -79,10 +79,9 @@ public Subscription call(final Observer observer) { @Override public void call(T value) { initialValue = value; - observer.onNext(value); } }); - Accumulator scan = new Accumulator(sequence /* FIXME .drop(1) */, initialValue, accumlatorFunction); + Accumulator scan = new Accumulator(sequence.skip(1), initialValue, accumlatorFunction); return scan.call(observer); } } @@ -101,6 +100,8 @@ private Accumulator(Observable sequence, R initialValue, Func2 accum @Override public Subscription call(final Observer observer) { + observer.onNext(initialValue); + return subscription.wrap(sequence.subscribe(new Observer() { private R acc = initialValue; @@ -114,7 +115,6 @@ public Subscription call(final Observer observer) { @Override public synchronized void onNext(T value) { try { - acc = accumlatorFunction.call(acc, value); observer.onNext(acc); } catch (Exception ex) { @@ -147,28 +147,28 @@ public void before() { @Test public void testScanIntegersWithInitialValue() { @SuppressWarnings("unchecked") - Observer Observer = mock(Observer.class); + Observer observer = mock(Observer.class); Observable observable = Observable.toObservable(1, 2, 3); - Observable m = Observable.create(scan(observable, 0, new Func2() { + Observable m = Observable.create(scan(observable, "", new Func2() { @Override - public Integer call(Integer t1, Integer t2) { - return t1 + t2; + public String call(String s, Integer n) { + return s + n.toString(); } })); - m.subscribe(Observer); - - verify(Observer, never()).onError(any(Exception.class)); - verify(Observer, times(1)).onNext(0); - verify(Observer, times(1)).onNext(1); - verify(Observer, times(1)).onNext(3); - verify(Observer, times(1)).onNext(6); - verify(Observer, times(4)).onNext(anyInt()); - verify(Observer, times(1)).onCompleted(); - verify(Observer, never()).onError(any(Exception.class)); + m.subscribe(observer); + + verify(observer, never()).onError(any(Exception.class)); + verify(observer, times(1)).onNext(""); + verify(observer, times(1)).onNext("1"); + verify(observer, times(1)).onNext("12"); + verify(observer, times(1)).onNext("123"); + verify(observer, times(4)).onNext(anyString()); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); } @Test From 4c2b58ef50277471dcc285e75f4b2cb5accc32ee Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 4 May 2013 14:56:41 +0200 Subject: [PATCH 3/7] also adapted type signatures of the reduce methods --- rxjava-core/src/main/java/rx/Observable.java | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 55ab8c84fd7..52d4ea23317 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1746,6 +1746,8 @@ public T call(T t1, T t2) { * * @param * the type item emitted by the source Observable + * @param + * the type returned for each item of the target observable * @param sequence * the source Observable * @param initialValue @@ -1760,7 +1762,7 @@ public T call(T t1, T t2) { * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ - public static Observable reduce(Observable sequence, T initialValue, Func2 accumulator) { + public static Observable reduce(Observable sequence, R initialValue, Func2 accumulator) { return takeLast(create(OperationScan.scan(sequence, initialValue, accumulator)), 1); } @@ -1778,6 +1780,8 @@ public static Observable reduce(Observable sequence, T initialValue, F * * @param * the type item emitted by the source Observable + * @param + * the type returned for each item of the target observable * @param sequence * the source Observable * @param initialValue @@ -1791,17 +1795,15 @@ public static Observable reduce(Observable sequence, T initialValue, F * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ - public static Observable reduce(final Observable sequence, final T initialValue, final Object accumulator) { + public static Observable reduce(final Observable sequence, final R initialValue, final Object accumulator) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(accumulator); - return reduce(sequence, initialValue, new Func2() { - + return reduce(sequence, initialValue, new Func2() { @SuppressWarnings("unchecked") @Override - public T call(T t1, T t2) { - return (T) _f.call(t1, t2); + public R call(R r, T t) { + return (R) _f.call(r, t); } - }); } From 19d12d55f4b2ab479ed2ede79bb93a4af3e5ca30 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sat, 4 May 2013 15:09:40 +0200 Subject: [PATCH 4/7] Also added aggregate (alias for reduce, see issue #20). --- rxjava-core/src/main/java/rx/Observable.java | 32 ++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 52d4ea23317..7bd8be01a43 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -3220,6 +3220,13 @@ public Observable reduce(Func2 accumulator) { return reduce(this, accumulator); } + /** + * @see #reduce(Func2) + */ + public Observable aggregate(Func2 accumulator) { + return reduce(accumulator); + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -3246,6 +3253,13 @@ public Observable reduce(Object accumulator) { return reduce(this, accumulator); } + /** + * @see #reduce(Object) + */ + public Observable aggregate(Object accumulator) { + return reduce(accumulator); + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -3270,10 +3284,17 @@ public Observable reduce(Object accumulator) { * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ - public Observable reduce(T initialValue, Func2 accumulator) { + public Observable reduce(R initialValue, Func2 accumulator) { return reduce(this, initialValue, accumulator); } + /** + * @see #reduce(R, Func2) + */ + public Observable aggregate(R initialValue, Func2 accumulator) { + return reduce(initialValue, accumulator); + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -3297,10 +3318,17 @@ public Observable reduce(T initialValue, Func2 accumulator) { * @see MSDN: Observable.Aggregate * @see Wikipedia: Fold (higher-order function) */ - public Observable reduce(T initialValue, Object accumulator) { + public Observable reduce(R initialValue, Object accumulator) { return reduce(this, initialValue, accumulator); } + /** + * @see #reduce(R, Object) + */ + public Observable aggregate(R initialValue, Object accumulator) { + return reduce(initialValue, accumulator); + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted From ca65bb0ba0ea33a5d641e4efe0f995661e19c06a Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sun, 5 May 2013 10:05:21 +0200 Subject: [PATCH 5/7] Fixed a typo, added missing error and completion handling --- .../main/java/rx/operators/OperationScan.java | 23 ++++++++++++++----- 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationScan.java b/rxjava-core/src/main/java/rx/operators/OperationScan.java index 6417abb06d5..f186191cc37 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationScan.java +++ b/rxjava-core/src/main/java/rx/operators/OperationScan.java @@ -26,6 +26,7 @@ import rx.Observer; import rx.Subscription; import rx.util.AtomicObservableSubscription; +import rx.util.functions.Action0; import rx.util.functions.Action1; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -65,12 +66,12 @@ public static Func1, Subscription> scan(Observable sequence, private static class AccuWithoutInitialValue implements Func1, Subscription> { private final Observable sequence; - private final Func2 accumlatorFunction; + private final Func2 accumulatorFunction; private T initialValue; private AccuWithoutInitialValue(Observable sequence, Func2 accumulator) { this.sequence = sequence; - this.accumlatorFunction = accumulator; + this.accumulatorFunction = accumulator; } @Override @@ -80,8 +81,18 @@ public Subscription call(final Observer observer) { public void call(T value) { initialValue = value; } + }, new Action1() { + @Override + public void call(Exception e) { + observer.onError(e); + } + }, new Action0() { + @Override + public void call() { + observer.onCompleted(); + } }); - Accumulator scan = new Accumulator(sequence.skip(1), initialValue, accumlatorFunction); + Accumulator scan = new Accumulator(sequence.skip(1), initialValue, accumulatorFunction); return scan.call(observer); } } @@ -89,13 +100,13 @@ public void call(T value) { private static class Accumulator implements Func1, Subscription> { private final Observable sequence; private final R initialValue; - private final Func2 accumlatorFunction; + private final Func2 accumulatorFunction; private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); private Accumulator(Observable sequence, R initialValue, Func2 accumulator) { this.sequence = sequence; this.initialValue = initialValue; - this.accumlatorFunction = accumulator; + this.accumulatorFunction = accumulator; } @Override @@ -115,7 +126,7 @@ public Subscription call(final Observer observer) { @Override public synchronized void onNext(T value) { try { - acc = accumlatorFunction.call(acc, value); + acc = accumulatorFunction.call(acc, value); observer.onNext(acc); } catch (Exception ex) { observer.onError(ex); From 458cd7c45206e88289160685b4401a48797ca3ba Mon Sep 17 00:00:00 2001 From: jmhofer Date: Sun, 5 May 2013 10:51:03 +0200 Subject: [PATCH 6/7] Switched away from using take and skip again due to concurrency issues. --- .../main/java/rx/operators/OperationScan.java | 105 ++++++++++-------- 1 file changed, 58 insertions(+), 47 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationScan.java b/rxjava-core/src/main/java/rx/operators/OperationScan.java index f186191cc37..4eb87256f3a 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationScan.java +++ b/rxjava-core/src/main/java/rx/operators/OperationScan.java @@ -25,9 +25,6 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.util.AtomicObservableSubscription; -import rx.util.functions.Action0; -import rx.util.functions.Action1; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -67,8 +64,9 @@ public static Func1, Subscription> scan(Observable sequence, private static class AccuWithoutInitialValue implements Func1, Subscription> { private final Observable sequence; private final Func2 accumulatorFunction; - private T initialValue; - + + private AccumulatingObserver accumulatingObserver; + private AccuWithoutInitialValue(Observable sequence, Func2 accumulator) { this.sequence = sequence; this.accumulatorFunction = accumulator; @@ -76,24 +74,29 @@ private AccuWithoutInitialValue(Observable sequence, Func2 accumulat @Override public Subscription call(final Observer observer) { - sequence.take(1).subscribe(new Action1() { + return sequence.subscribe(new Observer() { + + // has to be synchronized so that the initial value is always sent only once. @Override - public void call(T value) { - initialValue = value; + public synchronized void onNext(T value) { + if (accumulatingObserver == null) { + observer.onNext(value); + accumulatingObserver = new AccumulatingObserver(observer, value, accumulatorFunction); + } else { + accumulatingObserver.onNext(value); + } } - }, new Action1() { + @Override - public void call(Exception e) { + public void onError(Exception e) { observer.onError(e); } - }, new Action0() { + @Override - public void call() { + public void onCompleted() { observer.onCompleted(); } }); - Accumulator scan = new Accumulator(sequence.skip(1), initialValue, accumulatorFunction); - return scan.call(observer); } } @@ -101,7 +104,6 @@ private static class Accumulator implements Func1, Subscriptio private final Observable sequence; private final R initialValue; private final Func2 accumulatorFunction; - private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); private Accumulator(Observable sequence, R initialValue, Func2 accumulator) { this.sequence = sequence; @@ -112,42 +114,51 @@ private Accumulator(Observable sequence, R initialValue, Func2 accum @Override public Subscription call(final Observer observer) { observer.onNext(initialValue); - - return subscription.wrap(sequence.subscribe(new Observer() { - private R acc = initialValue; - - /** - * We must synchronize this because we can't allow - * multiple threads to execute the 'accumulatorFunction' at the same time because - * the accumulator code very often will be doing mutation of the 'acc' object such as a non-threadsafe HashMap - * - * Because it's synchronized it's using non-atomic variables since everything in this method is single-threaded - */ - @Override - public synchronized void onNext(T value) { - try { - acc = accumulatorFunction.call(acc, value); - observer.onNext(acc); - } catch (Exception ex) { - observer.onError(ex); - // this will work if the sequence is asynchronous, it will have no effect on a synchronous observable - subscription.unsubscribe(); - } - } + return sequence.subscribe(new AccumulatingObserver(observer, initialValue, accumulatorFunction)); + } + } - @Override - public void onError(Exception ex) { - observer.onError(ex); - } + private static class AccumulatingObserver implements Observer { + private final Observer observer; + private final Func2 accumulatorFunction; - @Override - public void onCompleted() { - observer.onCompleted(); - } - })); + private R acc; + + private AccumulatingObserver(Observer observer, R initialValue, Func2 accumulator) { + this.observer = observer; + this.accumulatorFunction = accumulator; + + this.acc = initialValue; } - } + /** + * We must synchronize this because we can't allow + * multiple threads to execute the 'accumulatorFunction' at the same time because + * the accumulator code very often will be doing mutation of the 'acc' object such as a non-threadsafe HashMap + * + * Because it's synchronized it's using non-atomic variables since everything in this method is single-threaded + */ + @Override + public synchronized void onNext(T value) { + try { + acc = accumulatorFunction.call(acc, value); + observer.onNext(acc); + } catch (Exception ex) { + observer.onError(ex); + } + } + + @Override + public void onError(Exception e) { + observer.onError(e); + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + } + public static class UnitTest { @Before From c3ef9a134428cc40e34268be02414d9bb8b23355 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Tue, 7 May 2013 09:49:13 +0200 Subject: [PATCH 7/7] fixed method signatures and respective javadocs of various aggregate/reduce/scan overloads --- rxjava-core/src/main/java/rx/Observable.java | 246 ++++++------------- 1 file changed, 71 insertions(+), 175 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 5057ff79424..9336237c7e9 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1696,30 +1696,9 @@ public static Observable reduce(Observable sequence, Func2 ac } /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the final result from the final call to your function as its sole - * output. - *

- * This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject - * method that does a similar operation on lists. - *

- * + * Used by dynamic languages. * - * @param - * the type item emitted by the source Observable - * @param sequence - * the source Observable - * @param accumulator - * an accumulator function to be invoked on each element from the sequence, whose - * result will be used in the next accumulator call (if applicable) - * - * @return an Observable that emits a single element that is the result of accumulating the - * output from applying the accumulator to the sequence of items emitted by the source - * Observable - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) + * @see #reduce(Observable, Func2) */ public static Observable reduce(final Observable sequence, final Object accumulator) { @SuppressWarnings("rawtypes") @@ -1735,6 +1714,22 @@ public T call(T t1, T t2) { }); } + /** + * @see #reduce(Observable, Func2) + */ + public static Observable aggregate(Observable sequence, Func2 accumulator) { + return reduce(sequence, accumulator); + } + + /** + * Used by dynamic languages. + * + * @see #reduce(Observable, Func2) + */ + public static Observable aggregate(Observable sequence, Object accumulator) { + return reduce(sequence, accumulator); + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -1770,33 +1765,9 @@ public static Observable reduce(Observable sequence, R initialValue } /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the final result from the final call to your function as its sole - * output. - *

- * This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," "compress," or "inject" in other programming contexts. Groovy, for instance, has an inject - * method that does a similar operation on lists. - *

- * + * Used by dynamic languages. * - * @param - * the type item emitted by the source Observable - * @param - * the type returned for each item of the target observable - * @param sequence - * the source Observable - * @param initialValue - * a seed passed into the first execution of the accumulator function - * @param accumulator - * an accumulator function to be invoked on each element from the sequence, whose - * result will be used in the next accumulator call (if applicable) - * @return an Observable that emits a single element that is the result of accumulating the - * output from applying the accumulator to the sequence of items emitted by the source - * Observable - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) + * @see #reduce(Observable, Object, Func2) */ public static Observable reduce(final Observable sequence, final R initialValue, final Object accumulator) { @SuppressWarnings("rawtypes") @@ -1810,6 +1781,22 @@ public R call(R r, T t) { }); } + /** + * @see #reduce(Observable, Object, Func2) + */ + public static Observable aggregate(Observable sequence, R initialValue, Func2 accumulator) { + return reduce(sequence, initialValue, accumulator); + } + + /** + * Used by dynamic languages. + * + * @see #reduce(Observable, Object, Func2) + */ + public static Observable aggregate(Observable sequence, R initialValue, Object accumulator) { + return reduce(sequence, initialValue, accumulator); + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -1834,23 +1821,9 @@ public static Observable scan(Observable sequence, Func2 accu } /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the result of each of these iterations as its own sequence. - *

- * + * Used by dynamic languages. * - * @param - * the type item emitted by the source Observable - * @param sequence - * the source Observable - * @param accumulator - * an accumulator function to be invoked on each element from the sequence, whose - * result will be emitted and used in the next accumulator call (if applicable) - * @return an Observable that emits a sequence of items that are the result of accumulating the - * output from the sequence emitted by the source Observable - * @see MSDN: Observable.Scan + * @see #scan(Observable, Func2) */ public static Observable scan(final Observable sequence, final Object accumulator) { @SuppressWarnings("rawtypes") @@ -1894,27 +1867,9 @@ public static Observable scan(Observable sequence, R initialValue, } /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the result of each of these iterations as its own sequence. - *

- * + * Used by dynamic languages. * - * @param - * the type item emitted by the source Observable - * @param - * the type returned for each item of the target observable - * @param sequence - * the source Observable - * @param initialValue - * the initial (seed) accumulator value - * @param accumulator - * an accumulator function to be invoked on each element from the sequence, whose - * result will be emitted and used in the next accumulator call (if applicable) - * @return an Observable that emits a sequence of items that are the result of accumulating the - * output from the sequence emitted by the source Observable - * @see MSDN: Observable.Scan + * @see #scan(Observable, Object, Func2) */ public static Observable scan(final Observable sequence, final R initialValue, final Object accumulator) { @SuppressWarnings("rawtypes") @@ -3232,43 +3187,28 @@ public Observable reduce(Func2 accumulator) { } /** - * @see #reduce(Func2) - */ - public Observable aggregate(Func2 accumulator) { - return reduce(accumulator); - } - - /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the final result from the final call to your function as its sole - * output. - *

- * This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," - * "compress," or "inject" in other programming contexts. Groovy, for instance, has an - * inject method that does a similar operation on lists. - *

- * + * Used by dynamic languages. * - * @param accumulator - * An accumulator function to be invoked on each element from the sequence, whose result - * will be used in the next accumulator call (if applicable). - * - * @return an Observable that emits a single element from the result of accumulating the output - * from the list of Observables. - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) + * @see #reduce(Func2) */ public Observable reduce(Object accumulator) { return reduce(this, accumulator); } /** - * @see #reduce(Object) + * @see #reduce(Func2) + */ + public Observable aggregate(Func2 accumulator) { + return aggregate(this, accumulator); + } + + /** + * Used by dynamic languages. + * + * @see #reduce(Func2) */ public Observable aggregate(Object accumulator) { - return reduce(accumulator); + return aggregate(this, accumulator); } /** @@ -3300,44 +3240,28 @@ public Observable reduce(R initialValue, Func2 accumulator) { } /** - * @see #reduce(R, Func2) + * Used by dynamic languages. + * + * @see #reduce(Object, Func2) */ - public Observable aggregate(R initialValue, Func2 accumulator) { - return reduce(initialValue, accumulator); + public Observable reduce(R initialValue, Object accumulator) { + return reduce(this, initialValue, accumulator); } /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the final result from the final call to your function as its sole - * output. - *

- * This technique, which is called "reduce" here, is sometimes called "fold," "accumulate," - * "compress," or "inject" in other programming contexts. Groovy, for instance, has an - * inject method that does a similar operation on lists. - *

- * - * - * @param initialValue - * The initial (seed) accumulator value. - * @param accumulator - * An accumulator function to be invoked on each element from the sequence, whose - * result will be used in the next accumulator call (if applicable). - * @return an Observable that emits a single element from the result of accumulating the output - * from the list of Observables. - * @see MSDN: Observable.Aggregate - * @see Wikipedia: Fold (higher-order function) + * @see #reduce(Object, Func2) */ - public Observable reduce(R initialValue, Object accumulator) { - return reduce(this, initialValue, accumulator); + public Observable aggregate(R initialValue, Func2 accumulator) { + return aggregate(this, initialValue, accumulator); } /** - * @see #reduce(R, Object) + * Used by dynamic languages. + * + * @see #reduce(Object, Func2) */ public Observable aggregate(R initialValue, Object accumulator) { - return reduce(initialValue, accumulator); + return aggregate(this, initialValue, accumulator); } /** @@ -3391,23 +3315,9 @@ public Observable sample(long period, TimeUnit unit, Scheduler scheduler) { } /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, and so on until all items have been emitted by the - * source Observable, emitting the result of each of these iterations. It emits the result of - * each of these iterations as a sequence from the returned Observable. This sort of function is - * sometimes called an accumulator. - *

- * - * - * @param accumulator - * An accumulator function to be invoked on each element from the sequence whose - * result will be sent via onNext and used in the next accumulator call - * (if applicable). - * - * @return an Observable sequence whose elements are the result of accumulating the output from - * the list of Observables. - * @see MSDN: Observable.Scan + * Used by dynamic languages. + * + * @see #scan(Func2) */ public Observable scan(final Object accumulator) { return scan(this, accumulator); @@ -3437,23 +3347,9 @@ public Observable scan(R initialValue, Func2 accumulator) { } /** - * Returns an Observable that applies a function of your choosing to the first item emitted by a - * source Observable, then feeds the result of that function along with the second item emitted - * by an Observable into the same function, then feeds the result of that function along with the - * third item into the same function, and so on, emitting the result of each of these - * iterations. This sort of function is sometimes called an accumulator. - *

- * - * - * @param initialValue - * The initial (seed) accumulator value. - * @param accumulator - * An accumulator function to be invoked on each element from the sequence whose result - * will be sent via onNext and used in the next accumulator call (if - * applicable). - * @return an Observable sequence whose elements are the result of accumulating the output from - * the list of Observables. - * @see MSDN: Observable.Scan + * Used by dynamic languages. + * + * @see #scan(Object, Func2) */ public Observable scan(final R initialValue, final Object accumulator) { return scan(this, initialValue, accumulator);