diff --git a/rxjava-core/src/main/java/rx/observables/Observable.java b/rxjava-core/src/main/java/rx/observables/Observable.java index 190876b0a18..14ace0d3579 100644 --- a/rxjava-core/src/main/java/rx/observables/Observable.java +++ b/rxjava-core/src/main/java/rx/observables/Observable.java @@ -22,6 +22,9 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.Before; import org.junit.Test; @@ -42,6 +45,7 @@ import rx.observables.operations.OperationSkip; import rx.observables.operations.OperationSynchronize; import rx.observables.operations.OperationTake; +import rx.observables.operations.OperationToObservableFuture; import rx.observables.operations.OperationToObservableIterable; import rx.observables.operations.OperationToObservableList; import rx.observables.operations.OperationToObservableSortedList; @@ -1398,6 +1402,45 @@ public static Observable toObservable(Iterable iterable) { return create(OperationToObservableIterable.toObservableIterable(iterable)); } + /** + * Converts an Future to a Observable sequence. + * + * Any object that supports the {@link Future} interface can be converted into a Observable that emits + * the return value of the get() method in the object, by passing the object into the toObservable method. + * The subscribe method on this synchronously so the Subscription returned doesn't nothing. + * + * @param future + * the source {@link Future} + * @param + * the type of of object that the future's returns and the type emitted by the resulting + * Observable + * @return a Observable that emits the item from the source Future + */ + public static Observable toObservable(Future future) { + return create(OperationToObservableFuture.toObservableFuture(future)); + } + + /** + * Converts an Future to a Observable sequence. + * + * Any object that supports the {@link Future} interface can be converted into a Observable that emits + * the return value of the get() method in the object, by passing the object into the toObservable method. + * The subscribe method on this synchronously so the Subscription returned doesn't nothing. + * If the future timesout the {@link TimeoutException} exception is passed to the onError. + * + * @param future + * the source {@link Future} + * @param time the maximum time to wait + * @param unit the time unit of the time argument + * @param + * the type of of object that the future's returns and the type emitted by the resulting + * Observable + * @return a Observable that emits the item from the source Future + */ + public static Observable toObservable(Future future, long time, TimeUnit unit) { + return create(OperationToObservableFuture.toObservableFuture(future, time, unit)); + } + /** * Converts an Array sequence to a Observable sequence. * diff --git a/rxjava-core/src/main/java/rx/observables/operations/OperationToObservableFuture.java b/rxjava-core/src/main/java/rx/observables/operations/OperationToObservableFuture.java new file mode 100644 index 00000000000..9b8c8d14e96 --- /dev/null +++ b/rxjava-core/src/main/java/rx/observables/operations/OperationToObservableFuture.java @@ -0,0 +1,100 @@ +package rx.observables.operations; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import rx.observables.Observable; +import rx.observables.Observer; +import rx.observables.Subscription; +import rx.util.functions.Func1; + +public class OperationToObservableFuture { + private static class ToObservableFuture implements OperatorSubscribeFunction { + private final Future that; + private final Long time; + private final TimeUnit unit; + + public ToObservableFuture(Future that) { + this.that = that; + this.time = null; + this.unit = null; + } + + public ToObservableFuture(Future that, long time, TimeUnit unit) { + this.that = that; + this.time = time; + this.unit = unit; + } + + @Override + public Subscription call(Observer observer) { + try { + T value = (time == null) ? that.get() : that.get(time, unit); + + if (!that.isCancelled()) { + observer.onNext(value); + } + observer.onCompleted(); + } catch (Exception e) { + observer.onError(e); + } + + // the get() has already completed so there is no point in + // giving the user a way to cancel. + return Observable.noOpSubscription(); + } + } + + public static Func1, Subscription> toObservableFuture(final Future that) { + return new ToObservableFuture(that); + } + + public static Func1, Subscription> toObservableFuture(final Future that, long time, TimeUnit unit) { + return new ToObservableFuture(that, time, unit); + } + + @SuppressWarnings("unchecked") + public static class UnitTest { + @Test + public void testSuccess() throws Exception { + Future future = mock(Future.class); + Object value = new Object(); + when(future.get()).thenReturn(value); + ToObservableFuture ob = new ToObservableFuture(future); + Observer o = mock(Observer.class); + + Subscription sub = ob.call(o); + sub.unsubscribe(); + + verify(o, times(1)).onNext(value); + verify(o, times(1)).onCompleted(); + verify(o, never()).onError(null); + verify(future, never()).cancel(true); + } + + @Test + public void testFailure() throws Exception { + Future future = mock(Future.class); + RuntimeException e = new RuntimeException(); + when(future.get()).thenThrow(e); + ToObservableFuture ob = new ToObservableFuture(future); + Observer o = mock(Observer.class); + + Subscription sub = ob.call(o); + sub.unsubscribe(); + + verify(o, never()).onNext(null); + verify(o, never()).onCompleted(); + verify(o, times(1)).onError(e); + verify(future, never()).cancel(true); + } + } +}