diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index b861cce886..36215718ce 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -64,6 +64,7 @@ import rx.operators.OperationTakeLast; import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; +import rx.operators.OperationToFuture; import rx.operators.OperationToIterator; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; @@ -2066,6 +2067,19 @@ public Boolean call(T t, Integer integer) })); } + /** + * Return a Future representing a single value of the Observable. + *

+ * This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use toList().toFuture(). + * + * @param that + * the source Observable + * @returna Future that expects a single item emitted by the source Observable + */ + public static Future toFuture(final Observable that) { + return OperationToFuture.toFuture(that); + } + /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. @@ -3473,6 +3487,17 @@ public Observable takeUntil(Observable other) { return takeUntil(this, other); } + /** + * Return a Future representing a single value of the Observable. + *

+ * This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use toList().toFuture(). + * + * @returna Future that expects a single item emitted by the source Observable + */ + public Future toFuture() { + return toFuture(this); + } + /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. diff --git a/rxjava-core/src/main/java/rx/operators/OperationToFuture.java b/rxjava-core/src/main/java/rx/operators/OperationToFuture.java new file mode 100644 index 0000000000..89b2bc294e --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationToFuture.java @@ -0,0 +1,167 @@ +package rx.operators; + +import static org.junit.Assert.*; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func1; + +/** + * Convert an Observable into a Future. + */ +public class OperationToFuture { + + /** + * Returns a Future that expects a single item from the observable. + * + * @param that + * an observable sequence to get a Future for. + * @param + * the type of source. + * @return the Future to retrieve a single elements from an Observable + */ + public static Future toFuture(Observable that) { + + final CountDownLatch finished = new CountDownLatch(1); + final AtomicReference value = new AtomicReference(); + final AtomicReference error = new AtomicReference(); + + final Subscription s = that.subscribe(new Observer() { + + @Override + public void onCompleted() { + finished.countDown(); + } + + @Override + public void onError(Exception e) { + error.compareAndSet(null, e); + finished.countDown(); + } + + @Override + public void onNext(T v) { + if (!value.compareAndSet(null, v)) { + // this means we received more than one value and must fail as a Future can handle only a single value + error.compareAndSet(null, new IllegalStateException("Observable.toFuture() only supports sequences with a single value. Use .toList().toFuture() if multiple values are expected.")); + finished.countDown(); + } + } + }); + + return new Future() { + + private volatile boolean cancelled = false; + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (finished.getCount() > 0) { + cancelled = true; + s.unsubscribe(); + // release the latch (a race condition may have already released it by now) + finished.countDown(); + return true; + } else { + // can't cancel + return false; + } + } + + @Override + public boolean isCancelled() { + return cancelled; + } + + @Override + public boolean isDone() { + return finished.getCount() == 0; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + finished.await(); + return getValue(); + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (finished.await(timeout, unit)) { + return getValue(); + } else { + throw new TimeoutException("Timed out after " + unit.toMillis(timeout) + "ms waiting for underlying Observable."); + } + } + + private T getValue() throws ExecutionException { + if (error.get() != null) { + throw new ExecutionException("Observable onError", error.get()); + } else { + return value.get(); + } + } + + }; + + } + + @Test + public void testToFuture() throws InterruptedException, ExecutionException { + Observable obs = Observable.toObservable("one"); + Future f = toFuture(obs); + assertEquals("one", f.get()); + } + + @Test + public void testToFutureList() throws InterruptedException, ExecutionException { + Observable obs = Observable.toObservable("one", "two", "three"); + Future> f = toFuture(obs.toList()); + assertEquals("one", f.get().get(0)); + assertEquals("two", f.get().get(1)); + assertEquals("three", f.get().get(2)); + } + + @Test(expected = ExecutionException.class) + public void testExceptionWithMoreThanOneElement() throws InterruptedException, ExecutionException { + Observable obs = Observable.toObservable("one", "two"); + Future f = toFuture(obs); + assertEquals("one", f.get()); + // we expect an exception since there are more than 1 element + } + + @Test + public void testToFutureWithException() { + Observable obs = Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + observer.onNext("one"); + observer.onError(new TestException()); + return Subscriptions.empty(); + } + }); + + Future f = toFuture(obs); + try { + f.get(); + fail("expected exception"); + } catch (Exception e) { + assertEquals(TestException.class, e.getCause().getClass()); + } + } + + private static class TestException extends RuntimeException { + private static final long serialVersionUID = 1L; + } +}