Skip to content

Commit

Permalink
Publish operator on Observable
Browse files Browse the repository at this point in the history
  • Loading branch information
benjchristensen committed May 7, 2013
1 parent 8e054fd commit 16879eb
Showing 1 changed file with 104 additions and 25 deletions.
129 changes: 104 additions & 25 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import rx.plugins.RxJavaErrorHandler;
import rx.plugins.RxJavaObservableExecutionHook;
import rx.plugins.RxJavaPlugins;
import rx.subjects.PublishSubject;
import rx.subjects.Subject;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
Expand Down Expand Up @@ -1665,6 +1666,17 @@ public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<Ex
return create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction));
}

/**
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
*
* @param that
* the source Observable
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*/
public static <T> ConnectableObservable<T> publish(final Observable<T> that) {
return OperationMulticast.multicast(that, PublishSubject.<T> create());
}

/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a
* source Observable, then feeds the result of that function along with the second item emitted
Expand Down Expand Up @@ -1720,7 +1732,7 @@ public T call(T t1, T t2) {
public static <T> Observable<T> aggregate(Observable<T> sequence, Func2<T, T, T> accumulator) {
return reduce(sequence, accumulator);
}

/**
* Used by dynamic languages.
*
Expand All @@ -1729,7 +1741,7 @@ public static <T> Observable<T> aggregate(Observable<T> sequence, Func2<T, T, T>
public static <T> Observable<T> aggregate(Observable<T> sequence, Object accumulator) {
return reduce(sequence, accumulator);
}

/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a
* source Observable, then feeds the result of that function along with the second item emitted
Expand Down Expand Up @@ -1787,7 +1799,7 @@ public R call(R r, T t) {
public static <T, R> Observable<R> aggregate(Observable<T> sequence, R initialValue, Func2<R, T, R> accumulator) {
return reduce(sequence, initialValue, accumulator);
}

/**
* Used by dynamic languages.
*
Expand All @@ -1796,7 +1808,7 @@ public static <T, R> Observable<R> aggregate(Observable<T> sequence, R initialVa
public static <T, R> Observable<R> aggregate(Observable<T> sequence, R initialValue, Object accumulator) {
return reduce(sequence, initialValue, accumulator);
}

/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a
* source Observable, then feeds the result of that function along with the second item emitted
Expand Down Expand Up @@ -2003,7 +2015,7 @@ public static <T> Observable<T> takeLast(final Observable<T> items, final int co
* @param items
* @param predicate
* a function to test each source element for a condition
* @return the values from the start of the given sequence
* @return the values from the start of the given sequence
*/
public static <T> Observable<T> takeWhile(final Observable<T> items, Func1<T, Boolean> predicate) {
return create(OperationTakeWhile.takeWhile(items, predicate));
Expand All @@ -2015,7 +2027,7 @@ public static <T> Observable<T> takeWhile(final Observable<T> items, Func1<T, Bo
* @param items
* @param predicate
* a function to test each source element for a condition
* @return the values from the start of the given sequence
* @return the values from the start of the given sequence
*/
public static <T> Observable<T> takeWhile(final Observable<T> items, Object predicate) {
@SuppressWarnings("rawtypes")
Expand All @@ -2035,7 +2047,7 @@ public Boolean call(T t) {
* @param items
* @param predicate
* a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
* @return the values from the start of the given sequence
* @return the values from the start of the given sequence
*/
public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Func2<T, Integer, Boolean> predicate) {
return create(OperationTakeWhile.takeWhileWithIndex(items, predicate));
Expand All @@ -2057,12 +2069,13 @@ public Boolean call(T t, Integer integer)

/**
* Adds a timestamp to each item emitted by this observable.
*
* @return An observable sequence of timestamped items.
*/
public Observable<Timestamped<T>> timestamp() {
return create(OperationTimestamp.timestamp(this));
}

/**
* Return a Future representing a single value of the Observable.
* <p>
Expand Down Expand Up @@ -2384,7 +2397,7 @@ public static <T> Observable<T> toObservable(T... items) {
* @param sequence
* @throws ClassCastException
* if T objects do not implement Comparable
* @return an observable containing the sorted list
* @return an observable containing the sorted list
*/
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence) {
return create(OperationToObservableSortedList.toSortedList(sequence));
Expand All @@ -2397,7 +2410,7 @@ public static <T> Observable<List<T>> toSortedList(Observable<T> sequence) {
*
* @param sequence
* @param sortFunction
* @return an observable containing the sorted list
* @return an observable containing the sorted list
*/
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, Func2<T, T, Integer> sortFunction) {
return create(OperationToObservableSortedList.toSortedList(sequence, sortFunction));
Expand All @@ -2410,7 +2423,7 @@ public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, Func2
*
* @param sequence
* @param sortFunction
* @return an observable containing the sorted list
* @return an observable containing the sorted list
*/
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, final Object sortFunction) {
@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -3186,6 +3199,15 @@ public Observable<T> reduce(Func2<T, T, T> accumulator) {
return reduce(this, accumulator);
}

/**
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
*
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*/
public ConnectableObservable<T> publish() {
return OperationMulticast.multicast(this, PublishSubject.<T> create());
}

/**
* Used by dynamic languages.
*
Expand All @@ -3201,7 +3223,7 @@ public Observable<T> reduce(Object accumulator) {
public Observable<T> aggregate(Func2<T, T, T> accumulator) {
return aggregate(this, accumulator);
}

/**
* Used by dynamic languages.
*
Expand All @@ -3210,7 +3232,7 @@ public Observable<T> aggregate(Func2<T, T, T> accumulator) {
public Observable<T> aggregate(Object accumulator) {
return aggregate(this, accumulator);
}

/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a
* source Observable, then feeds the result of that function along with the second item emitted
Expand Down Expand Up @@ -3263,7 +3285,7 @@ public <R> Observable<R> aggregate(R initialValue, Func2<R, T, R> accumulator) {
public <R> Observable<R> aggregate(R initialValue, Object accumulator) {
return aggregate(this, initialValue, accumulator);
}

/**
* Returns an Observable that applies a function of your choosing to the first item emitted by a
* source Observable, then feeds the result of that function along with the second item emitted
Expand Down Expand Up @@ -3298,7 +3320,7 @@ public Observable<T> scan(Func2<T, T, T> accumulator) {
public Observable<T> sample(long period, TimeUnit unit) {
return create(OperationSample.sample(this, period, unit));
}

/**
* Samples the observable sequence at each interval.
*
Expand All @@ -3313,10 +3335,10 @@ public Observable<T> sample(long period, TimeUnit unit) {
public Observable<T> sample(long period, TimeUnit unit, Scheduler scheduler) {
return create(OperationSample.sample(this, period, unit, scheduler));
}

/**
* Used by dynamic languages.
*
*
* @see #scan(Func2)
*/
public Observable<T> scan(final Object accumulator) {
Expand Down Expand Up @@ -3348,7 +3370,7 @@ public <R> Observable<R> scan(R initialValue, Func2<R, T, R> accumulator) {

/**
* Used by dynamic languages.
*
*
* @see #scan(Object, Func2)
*/
public <R> Observable<R> scan(final R initialValue, final Object accumulator) {
Expand Down Expand Up @@ -3419,7 +3441,7 @@ public Observable<T> take(final int num) {
*
* @param predicate
* a function to test each source element for a condition
* @return the values from the start of the given sequence
* @return the values from the start of the given sequence
*/
public Observable<T> takeWhile(final Func1<T, Boolean> predicate) {
return takeWhile(this, predicate);
Expand All @@ -3430,7 +3452,7 @@ public Observable<T> takeWhile(final Func1<T, Boolean> predicate) {
*
* @param predicate
* a function to test each source element for a condition
* @return the values from the start of the given sequence
* @return the values from the start of the given sequence
*/
public Observable<T> takeWhile(final Object predicate) {
return takeWhile(this, predicate);
Expand All @@ -3441,7 +3463,7 @@ public Observable<T> takeWhile(final Object predicate) {
*
* @param predicate
* a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
* @return the values from the start of the given sequence
* @return the values from the start of the given sequence
*/
public Observable<T> takeWhileWithIndex(final Func2<T, Integer, Boolean> predicate) {
return takeWhileWithIndex(this, predicate);
Expand All @@ -3452,7 +3474,7 @@ public Observable<T> takeWhileWithIndex(final Func2<T, Integer, Boolean> predica
*
* @param predicate
* a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false.
* @return the values from the start of the given sequence
* @return the values from the start of the given sequence
*/
public Observable<T> takeWhileWithIndex(final Object predicate) {
return takeWhileWithIndex(this, predicate);
Expand Down Expand Up @@ -3523,7 +3545,7 @@ public Observable<List<T>> toList() {
*
* @throws ClassCastException
* if T objects do not implement Comparable
* @return an observable containing the sorted list
* @return an observable containing the sorted list
*/
public Observable<List<T>> toSortedList() {
return toSortedList(this);
Expand All @@ -3535,7 +3557,7 @@ public Observable<List<T>> toSortedList() {
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/toSortedList.png">
*
* @param sortFunction
* @return an observable containing the sorted list
* @return an observable containing the sorted list
*/
public Observable<List<T>> toSortedList(Func2<T, T, Integer> sortFunction) {
return toSortedList(this, sortFunction);
Expand All @@ -3547,7 +3569,7 @@ public Observable<List<T>> toSortedList(Func2<T, T, Integer> sortFunction) {
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/toSortedList.png">
*
* @param sortFunction
* @return an observable containing the sorted list
* @return an observable containing the sorted list
*/
public Observable<List<T>> toSortedList(final Object sortFunction) {
return toSortedList(this, sortFunction);
Expand Down Expand Up @@ -4140,6 +4162,63 @@ public void call(String t1) {
}
}

@Test
public void testPublish() throws InterruptedException {
final AtomicInteger counter = new AtomicInteger();
ConnectableObservable<String> o = Observable.create(new Func1<Observer<String>, Subscription>() {

@Override
public Subscription call(final Observer<String> observer) {
final BooleanSubscription subscription = new BooleanSubscription();
new Thread(new Runnable() {

@Override
public void run() {
System.out.println("published observable being executed");
observer.onNext("one");
observer.onCompleted();
counter.incrementAndGet();
}
}).start();
return subscription;
}
}).publish();

final CountDownLatch latch = new CountDownLatch(2);

// subscribe once
o.subscribe(new Action1<String>() {

@Override
public void call(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});

// subscribe again
o.subscribe(new Action1<String>() {

@Override
public void call(String v) {
assertEquals("one", v);
System.out.println("v: " + v);
latch.countDown();
}
});

Subscription s = o.connect();
try {
if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
fail("subscriptions did not receive values");
}
assertEquals(1, counter.get());
} finally {
s.unsubscribe();
}
}

private static class TestException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
Expand Down

0 comments on commit 16879eb

Please sign in to comment.