Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observable.toFuture #252

Merged
merged 1 commit into from
May 1, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 41 additions & 10 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@
import rx.operators.OperationConcat;
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
import rx.operators.OperationGroupBy;
import rx.operators.OperationFilter;
import rx.operators.OperationFinally;
import rx.operators.OperationGroupBy;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
import rx.operators.OperationMerge;
Expand All @@ -64,6 +64,7 @@
import rx.operators.OperationTakeLast;
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationToFuture;
import rx.operators.OperationToIterator;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
Expand Down Expand Up @@ -590,9 +591,11 @@ public void call(Object args) {

/**
* Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*
* @param subject the subject to push source elements into.
* @param <R> result type
*
* @param subject
* the subject to push source elements into.
* @param <R>
* result type
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*/
public <R> ConnectableObservable<R> multicast(Subject<T, R> subject) {
Expand Down Expand Up @@ -2005,6 +2008,19 @@ public Boolean call(T t, Integer integer)
}));
}

/**
* Return a Future representing a single value of the Observable.
* <p>
* This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use <code>toList().toFuture()</code>.
*
* @param that
* the source Observable
* @returna Future that expects a single item emitted by the source Observable
*/
public static <T> Future<T> toFuture(final Observable<T> that) {
return OperationToFuture.toFuture(that);
}

/**
* Returns an Observable that emits a single item, a list composed of all the items emitted by
* the source Observable.
Expand Down Expand Up @@ -2088,11 +2104,15 @@ public static <T> Iterable<T> mostRecent(Observable<T> source, T initialValue) {

/**
* Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*
* @param source the source sequence whose elements will be pushed into the specified subject.
* @param subject the subject to push source elements into.
* @param <T> source type
* @param <R> result type
*
* @param source
* the source sequence whose elements will be pushed into the specified subject.
* @param subject
* the subject to push source elements into.
* @param <T>
* source type
* @param <R>
* result type
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
*/
public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, final Subject<T, R> subject) {
Expand All @@ -2101,7 +2121,7 @@ public static <T, R> ConnectableObservable<R> multicast(Observable<T> source, fi

/**
* Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence.
*
*
* @param that
* the source Observable
* @return The single element in the observable sequence.
Expand Down Expand Up @@ -3360,6 +3380,17 @@ public <E> Observable<T> takeUntil(Observable<E> other) {
return takeUntil(this, other);
}

/**
* Return a Future representing a single value of the Observable.
* <p>
* This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use <code>toList().toFuture()</code>.
*
* @returna Future that expects a single item emitted by the source Observable
*/
public Future<T> toFuture() {
return toFuture(this);
}

/**
* Returns an Observable that emits a single item, a list composed of all the items emitted by
* the source Observable.
Expand Down
167 changes: 167 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationToFuture.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package rx.operators;

import static org.junit.Assert.*;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

/**
* Convert an Observable into a Future.
*/
public class OperationToFuture {

/**
* Returns a Future that expects a single item from the observable.
*
* @param that
* an observable sequence to get a Future for.
* @param <T>
* the type of source.
* @return the Future to retrieve a single elements from an Observable
*/
public static <T> Future<T> toFuture(Observable<T> that) {

final CountDownLatch finished = new CountDownLatch(1);
final AtomicReference<T> value = new AtomicReference<T>();
final AtomicReference<Exception> error = new AtomicReference<Exception>();

final Subscription s = that.subscribe(new Observer<T>() {

@Override
public void onCompleted() {
finished.countDown();
}

@Override
public void onError(Exception e) {
error.compareAndSet(null, e);
finished.countDown();
}

@Override
public void onNext(T v) {
if (!value.compareAndSet(null, v)) {
// this means we received more than one value and must fail as a Future can handle only a single value
error.compareAndSet(null, new IllegalStateException("Observable.toFuture() only supports sequences with a single value. Use .toList().toFuture() if multiple values are expected."));
finished.countDown();
}
}
});

return new Future<T>() {

private volatile boolean cancelled = false;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (finished.getCount() > 0) {
cancelled = true;
s.unsubscribe();
// release the latch (a race condition may have already released it by now)
finished.countDown();
return true;
} else {
// can't cancel
return false;
}
}

@Override
public boolean isCancelled() {
return cancelled;
}

@Override
public boolean isDone() {
return finished.getCount() == 0;
}

@Override
public T get() throws InterruptedException, ExecutionException {
finished.await();
return getValue();
}

@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (finished.await(timeout, unit)) {
return getValue();
} else {
throw new TimeoutException("Timed out after " + unit.toMillis(timeout) + "ms waiting for underlying Observable.");
}
}

private T getValue() throws ExecutionException {
if (error.get() != null) {
throw new ExecutionException("Observable onError", error.get());
} else {
return value.get();
}
}

};

}

@Test
public void testToFuture() throws InterruptedException, ExecutionException {
Observable<String> obs = Observable.toObservable("one");
Future<String> f = toFuture(obs);
assertEquals("one", f.get());
}

@Test
public void testToFutureList() throws InterruptedException, ExecutionException {
Observable<String> obs = Observable.toObservable("one", "two", "three");
Future<List<String>> f = toFuture(obs.toList());
assertEquals("one", f.get().get(0));
assertEquals("two", f.get().get(1));
assertEquals("three", f.get().get(2));
}

@Test(expected = ExecutionException.class)
public void testExceptionWithMoreThanOneElement() throws InterruptedException, ExecutionException {
Observable<String> obs = Observable.toObservable("one", "two");
Future<String> f = toFuture(obs);
assertEquals("one", f.get());
// we expect an exception since there are more than 1 element
}

@Test
public void testToFutureWithException() {
Observable<String> obs = Observable.create(new Func1<Observer<String>, Subscription>() {

@Override
public Subscription call(Observer<String> observer) {
observer.onNext("one");
observer.onError(new TestException());
return Subscriptions.empty();
}
});

Future<String> f = toFuture(obs);
try {
f.get();
fail("expected exception");
} catch (Exception e) {
assertEquals(TestException.class, e.getCause().getClass());
}
}

private static class TestException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
}