Skip to content

Commit

Permalink
Merge pull request #252 from benjchristensen/toFuture
Browse files Browse the repository at this point in the history
Observable.toFuture
  • Loading branch information
benjchristensen committed May 1, 2013
2 parents cec81cb + 4b7772c commit 4c03715
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 0 deletions.
25 changes: 25 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import rx.operators.OperationTakeLast;
import rx.operators.OperationTakeUntil;
import rx.operators.OperationTakeWhile;
import rx.operators.OperationToFuture;
import rx.operators.OperationToIterator;
import rx.operators.OperationToObservableFuture;
import rx.operators.OperationToObservableIterable;
Expand Down Expand Up @@ -2066,6 +2067,19 @@ public Boolean call(T t, Integer integer)
}));
}

/**
* Return a Future representing a single value of the Observable.
* <p>
* This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use <code>toList().toFuture()</code>.
*
* @param that
* the source Observable
* @returna Future that expects a single item emitted by the source Observable
*/
public static <T> Future<T> toFuture(final Observable<T> that) {
return OperationToFuture.toFuture(that);
}

/**
* Returns an Observable that emits a single item, a list composed of all the items emitted by
* the source Observable.
Expand Down Expand Up @@ -3473,6 +3487,17 @@ public <E> Observable<T> takeUntil(Observable<E> other) {
return takeUntil(this, other);
}

/**
* Return a Future representing a single value of the Observable.
* <p>
* This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use <code>toList().toFuture()</code>.
*
* @returna Future that expects a single item emitted by the source Observable
*/
public Future<T> toFuture() {
return toFuture(this);
}

/**
* Returns an Observable that emits a single item, a list composed of all the items emitted by
* the source Observable.
Expand Down
167 changes: 167 additions & 0 deletions rxjava-core/src/main/java/rx/operators/OperationToFuture.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
package rx.operators;

import static org.junit.Assert.*;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;

import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

/**
* Convert an Observable into a Future.
*/
public class OperationToFuture {

/**
* Returns a Future that expects a single item from the observable.
*
* @param that
* an observable sequence to get a Future for.
* @param <T>
* the type of source.
* @return the Future to retrieve a single elements from an Observable
*/
public static <T> Future<T> toFuture(Observable<T> that) {

final CountDownLatch finished = new CountDownLatch(1);
final AtomicReference<T> value = new AtomicReference<T>();
final AtomicReference<Exception> error = new AtomicReference<Exception>();

final Subscription s = that.subscribe(new Observer<T>() {

@Override
public void onCompleted() {
finished.countDown();
}

@Override
public void onError(Exception e) {
error.compareAndSet(null, e);
finished.countDown();
}

@Override
public void onNext(T v) {
if (!value.compareAndSet(null, v)) {
// this means we received more than one value and must fail as a Future can handle only a single value
error.compareAndSet(null, new IllegalStateException("Observable.toFuture() only supports sequences with a single value. Use .toList().toFuture() if multiple values are expected."));
finished.countDown();
}
}
});

return new Future<T>() {

private volatile boolean cancelled = false;

@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if (finished.getCount() > 0) {
cancelled = true;
s.unsubscribe();
// release the latch (a race condition may have already released it by now)
finished.countDown();
return true;
} else {
// can't cancel
return false;
}
}

@Override
public boolean isCancelled() {
return cancelled;
}

@Override
public boolean isDone() {
return finished.getCount() == 0;
}

@Override
public T get() throws InterruptedException, ExecutionException {
finished.await();
return getValue();
}

@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
if (finished.await(timeout, unit)) {
return getValue();
} else {
throw new TimeoutException("Timed out after " + unit.toMillis(timeout) + "ms waiting for underlying Observable.");
}
}

private T getValue() throws ExecutionException {
if (error.get() != null) {
throw new ExecutionException("Observable onError", error.get());
} else {
return value.get();
}
}

};

}

@Test
public void testToFuture() throws InterruptedException, ExecutionException {
Observable<String> obs = Observable.toObservable("one");
Future<String> f = toFuture(obs);
assertEquals("one", f.get());
}

@Test
public void testToFutureList() throws InterruptedException, ExecutionException {
Observable<String> obs = Observable.toObservable("one", "two", "three");
Future<List<String>> f = toFuture(obs.toList());
assertEquals("one", f.get().get(0));
assertEquals("two", f.get().get(1));
assertEquals("three", f.get().get(2));
}

@Test(expected = ExecutionException.class)
public void testExceptionWithMoreThanOneElement() throws InterruptedException, ExecutionException {
Observable<String> obs = Observable.toObservable("one", "two");
Future<String> f = toFuture(obs);
assertEquals("one", f.get());
// we expect an exception since there are more than 1 element
}

@Test
public void testToFutureWithException() {
Observable<String> obs = Observable.create(new Func1<Observer<String>, Subscription>() {

@Override
public Subscription call(Observer<String> observer) {
observer.onNext("one");
observer.onError(new TestException());
return Subscriptions.empty();
}
});

Future<String> f = toFuture(obs);
try {
f.get();
fail("expected exception");
} catch (Exception e) {
assertEquals(TestException.class, e.getCause().getClass());
}
}

private static class TestException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
}

0 comments on commit 4c03715

Please sign in to comment.