diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 9336237c7e..208e4c54d7 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -39,6 +39,7 @@ import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; import rx.operators.OperationAll; +import rx.operators.OperationCache; import rx.operators.OperationConcat; import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; @@ -77,6 +78,8 @@ import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; +import rx.subjects.PublishSubject; +import rx.subjects.ReplaySubject; import rx.subjects.Subject; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; @@ -1665,6 +1668,44 @@ public static Observable onErrorReturn(final Observable that, Func1 ConnectableObservable replay(final Observable that) { + return OperationMulticast.multicast(that, ReplaySubject. create()); + } + + /** + * Similar to {@link #replay()} except that this auto-subscribes to the source sequence. + *

+ * This is useful when returning an Observable that you wish to cache responses but can't control the + * subscribe/unsubscribe behavior of all the Observers. + *

+ * NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not + * use this on infinite or very large sequences that will use up memory. This is similar to + * the {@link Observable#toList()} operator in this caution. + * + * @return an observable sequence that upon first subscription caches all events for subsequent subscriptions. + */ + public static Observable cache(final Observable that) { + return create(OperationCache.cache(that)); + } + + /** + * Returns a connectable observable sequence that shares a single subscription to the underlying sequence. + * + * @param that + * the source Observable + * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. + */ + public static ConnectableObservable publish(final Observable that) { + return OperationMulticast.multicast(that, PublishSubject. create()); + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -1720,7 +1761,7 @@ public T call(T t1, T t2) { public static Observable aggregate(Observable sequence, Func2 accumulator) { return reduce(sequence, accumulator); } - + /** * Used by dynamic languages. * @@ -1729,7 +1770,7 @@ public static Observable aggregate(Observable sequence, Func2 public static Observable aggregate(Observable sequence, Object accumulator) { return reduce(sequence, accumulator); } - + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -1787,7 +1828,7 @@ public R call(R r, T t) { public static Observable aggregate(Observable sequence, R initialValue, Func2 accumulator) { return reduce(sequence, initialValue, accumulator); } - + /** * Used by dynamic languages. * @@ -1796,7 +1837,7 @@ public static Observable aggregate(Observable sequence, R initialVa public static Observable aggregate(Observable sequence, R initialValue, Object accumulator) { return reduce(sequence, initialValue, accumulator); } - + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -2003,7 +2044,7 @@ public static Observable takeLast(final Observable items, final int co * @param items * @param predicate * a function to test each source element for a condition - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public static Observable takeWhile(final Observable items, Func1 predicate) { return create(OperationTakeWhile.takeWhile(items, predicate)); @@ -2015,7 +2056,7 @@ public static Observable takeWhile(final Observable items, Func1 Observable takeWhile(final Observable items, Object predicate) { @SuppressWarnings("rawtypes") @@ -2035,7 +2076,7 @@ public Boolean call(T t) { * @param items * @param predicate * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public static Observable takeWhileWithIndex(final Observable items, Func2 predicate) { return create(OperationTakeWhile.takeWhileWithIndex(items, predicate)); @@ -2057,12 +2098,13 @@ public Boolean call(T t, Integer integer) /** * Adds a timestamp to each item emitted by this observable. + * * @return An observable sequence of timestamped items. */ public Observable> timestamp() { return create(OperationTimestamp.timestamp(this)); } - + /** * Return a Future representing a single value of the Observable. *

@@ -2384,7 +2426,7 @@ public static Observable toObservable(T... items) { * @param sequence * @throws ClassCastException * if T objects do not implement Comparable - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public static Observable> toSortedList(Observable sequence) { return create(OperationToObservableSortedList.toSortedList(sequence)); @@ -2397,7 +2439,7 @@ public static Observable> toSortedList(Observable sequence) { * * @param sequence * @param sortFunction - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public static Observable> toSortedList(Observable sequence, Func2 sortFunction) { return create(OperationToObservableSortedList.toSortedList(sequence, sortFunction)); @@ -2410,7 +2452,7 @@ public static Observable> toSortedList(Observable sequence, Func2 * * @param sequence * @param sortFunction - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public static Observable> toSortedList(Observable sequence, final Object sortFunction) { @SuppressWarnings("rawtypes") @@ -3186,6 +3228,40 @@ public Observable reduce(Func2 accumulator) { return reduce(this, accumulator); } + /** + * Returns a connectable observable sequence that shares a single subscription to the underlying sequence replaying all notifications. + * + * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. + */ + public ConnectableObservable replay() { + return replay(this); + } + + /** + * Similar to {@link #replay()} except that this auto-subscribes to the source sequence. + *

+ * This is useful when returning an Observable that you wish to cache responses but can't control the + * subscribe/unsubscribe behavior of all the Observers. + *

+ * NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not + * use this on infinite or very large sequences that will use up memory. This is similar to + * the {@link Observable#toList()} operator in this caution. + * + * @return an observable sequence that upon first subscription caches all events for subsequent subscriptions. + */ + public Observable cache() { + return cache(this); + } + + /** + * Returns a connectable observable sequence that shares a single subscription to the underlying sequence. + * + * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. + */ + public ConnectableObservable publish() { + return publish(this); + } + /** * Used by dynamic languages. * @@ -3201,7 +3277,7 @@ public Observable reduce(Object accumulator) { public Observable aggregate(Func2 accumulator) { return aggregate(this, accumulator); } - + /** * Used by dynamic languages. * @@ -3210,7 +3286,7 @@ public Observable aggregate(Func2 accumulator) { public Observable aggregate(Object accumulator) { return aggregate(this, accumulator); } - + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -3263,7 +3339,7 @@ public Observable aggregate(R initialValue, Func2 accumulator) { public Observable aggregate(R initialValue, Object accumulator) { return aggregate(this, initialValue, accumulator); } - + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -3298,7 +3374,7 @@ public Observable scan(Func2 accumulator) { public Observable sample(long period, TimeUnit unit) { return create(OperationSample.sample(this, period, unit)); } - + /** * Samples the observable sequence at each interval. * @@ -3313,10 +3389,10 @@ public Observable sample(long period, TimeUnit unit) { public Observable sample(long period, TimeUnit unit, Scheduler scheduler) { return create(OperationSample.sample(this, period, unit, scheduler)); } - + /** * Used by dynamic languages. - * + * * @see #scan(Func2) */ public Observable scan(final Object accumulator) { @@ -3348,7 +3424,7 @@ public Observable scan(R initialValue, Func2 accumulator) { /** * Used by dynamic languages. - * + * * @see #scan(Object, Func2) */ public Observable scan(final R initialValue, final Object accumulator) { @@ -3419,7 +3495,7 @@ public Observable take(final int num) { * * @param predicate * a function to test each source element for a condition - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public Observable takeWhile(final Func1 predicate) { return takeWhile(this, predicate); @@ -3430,7 +3506,7 @@ public Observable takeWhile(final Func1 predicate) { * * @param predicate * a function to test each source element for a condition - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public Observable takeWhile(final Object predicate) { return takeWhile(this, predicate); @@ -3441,7 +3517,7 @@ public Observable takeWhile(final Object predicate) { * * @param predicate * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public Observable takeWhileWithIndex(final Func2 predicate) { return takeWhileWithIndex(this, predicate); @@ -3452,7 +3528,7 @@ public Observable takeWhileWithIndex(final Func2 predica * * @param predicate * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public Observable takeWhileWithIndex(final Object predicate) { return takeWhileWithIndex(this, predicate); @@ -3523,7 +3599,7 @@ public Observable> toList() { * * @throws ClassCastException * if T objects do not implement Comparable - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public Observable> toSortedList() { return toSortedList(this); @@ -3535,7 +3611,7 @@ public Observable> toSortedList() { * * * @param sortFunction - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public Observable> toSortedList(Func2 sortFunction) { return toSortedList(this, sortFunction); @@ -3547,7 +3623,7 @@ public Observable> toSortedList(Func2 sortFunction) { * * * @param sortFunction - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public Observable> toSortedList(final Object sortFunction) { return toSortedList(this, sortFunction); @@ -4140,6 +4216,167 @@ public void call(String t1) { } } + @Test + public void testPublish() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + ConnectableObservable o = Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription subscription = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + counter.incrementAndGet(); + observer.onNext("one"); + observer.onCompleted(); + } + }).start(); + return subscription; + } + }).publish(); + + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + latch.countDown(); + } + }); + + Subscription s = o.connect(); + try { + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } finally { + s.unsubscribe(); + } + } + + @Test + public void testReplay() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + ConnectableObservable o = Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription subscription = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + counter.incrementAndGet(); + observer.onNext("one"); + observer.onCompleted(); + } + }).start(); + return subscription; + } + }).replay(); + + // we connect immediately and it will emit the value + Subscription s = o.connect(); + try { + + // we then expect the following 2 subscriptions to get that same value + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + latch.countDown(); + } + }); + + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } finally { + s.unsubscribe(); + } + } + + @Test + public void testCache() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + Observable o = Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription subscription = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + counter.incrementAndGet(); + observer.onNext("one"); + observer.onCompleted(); + } + }).start(); + return subscription; + } + }).cache(); + + // we then expect the following 2 subscriptions to get that same value + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + latch.countDown(); + } + }); + + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } + private static class TestException extends RuntimeException { private static final long serialVersionUID = 1L; } diff --git a/rxjava-core/src/main/java/rx/operators/OperationCache.java b/rxjava-core/src/main/java/rx/operators/OperationCache.java new file mode 100644 index 0000000000..5a5d932bad --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationCache.java @@ -0,0 +1,128 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subjects.ReplaySubject; +import rx.subscriptions.BooleanSubscription; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +/** + * Similar to {@link Observable#replay()} except that this auto-subscribes to the source sequence. + *

+ * This is useful when returning an Observable that you wish to cache responses but can't control the + * subscribe/unsubscribe behavior of all the Observers. + *

+ * NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not + * use this on infinite or very large sequences that will use up memory. This is similar to + * the {@link Observable#toList()} operator in this caution. + * + */ +public class OperationCache { + + public static Func1, Subscription> cache(final Observable source) { + return new Func1, Subscription>() { + + final AtomicBoolean subscribed = new AtomicBoolean(false); + private final ReplaySubject cache = ReplaySubject.create(); + + @Override + public Subscription call(Observer observer) { + if (subscribed.compareAndSet(false, true)) { + // subscribe to the source once + source.subscribe(cache); + /* + * Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values. + * + * This means this should never be used on an infinite or very large sequence, similar to toList(). + */ + } + + return cache.subscribe(observer); + } + + }; + } + + public static class UnitTest { + + @Test + public void testCache() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + Observable o = Observable.create(cache(Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription subscription = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + counter.incrementAndGet(); + System.out.println("published observable being executed"); + observer.onNext("one"); + observer.onCompleted(); + } + }).start(); + return subscription; + } + }))); + + // we then expect the following 2 subscriptions to get that same value + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } + } + +}