From d58d3b3ca405a5bd151f3958893715971d28a89e Mon Sep 17 00:00:00 2001 From: George Campbell Date: Tue, 22 Jan 2013 22:22:39 -0800 Subject: [PATCH 1/3] Creating toObservable for Future --- .../main/java/rx/observables/Observable.java | 19 +++++ .../OperationToObservableFuture.java | 84 +++++++++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/observables/operations/OperationToObservableFuture.java diff --git a/rxjava-core/src/main/java/rx/observables/Observable.java b/rxjava-core/src/main/java/rx/observables/Observable.java index 79ad9fa84d..3afa07b781 100644 --- a/rxjava-core/src/main/java/rx/observables/Observable.java +++ b/rxjava-core/src/main/java/rx/observables/Observable.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.concurrent.Future; import org.junit.Before; import org.junit.Test; @@ -42,6 +43,7 @@ import rx.observables.operations.OperationSkip; import rx.observables.operations.OperationSynchronize; import rx.observables.operations.OperationTake; +import rx.observables.operations.OperationToObservableFuture; import rx.observables.operations.OperationToObservableIterable; import rx.observables.operations.OperationToObservableList; import rx.observables.operations.OperationToObservableSortedList; @@ -1359,6 +1361,23 @@ public static Observable toObservable(Iterable iterable) { return create(OperationToObservableIterable.toObservableIterable(iterable)); } + /** + * Converts an Future to a Observable sequence. + * + * Any object that supports the {@link Future} interface can be converted into a Observable that emits + * the return value of the get() method in the object, by passing the object into the toObservable method. + * + * @param future + * the source {@link Future} + * @param + * the type of of object that the future's returns and the type emitted by the resulting + * Observable + * @return a Observable that emits the item from the source Future + */ + public static Observable toObservable(Future future) { + return create(OperationToObservableFuture.toObservableFuture(future)); + } + /** * Converts an Array sequence to a Observable sequence. * diff --git a/rxjava-core/src/main/java/rx/observables/operations/OperationToObservableFuture.java b/rxjava-core/src/main/java/rx/observables/operations/OperationToObservableFuture.java new file mode 100644 index 0000000000..0bfd751f35 --- /dev/null +++ b/rxjava-core/src/main/java/rx/observables/operations/OperationToObservableFuture.java @@ -0,0 +1,84 @@ +package rx.observables.operations; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.concurrent.Future; + +import org.junit.Test; + +import rx.observables.Observable; +import rx.observables.Observer; +import rx.observables.Subscription; +import rx.util.functions.Func1; + +public class OperationToObservableFuture { + private static class ToObservableFuture implements OperatorSubscribeFunction { + private final Future that; + + public ToObservableFuture(Future that) { + this.that = that; + } + + @Override + public Subscription call(Observer observer) { + try { + T value = that.get(); + if (!that.isCancelled()) { + observer.onNext(value); + } + observer.onCompleted(); + } catch (Exception e) { + observer.onError(e); + } + + // the get() has already completed so there is no point in + // giving the user a way to cancel. + return Observable.noOpSubscription(); + } + } + + public static Func1, Subscription> toObservableFuture(final Future that) { + return new ToObservableFuture(that); + } + + @SuppressWarnings("unchecked") + public static class UnitTest { + @Test + public void testSuccess() throws Exception { + Future future = mock(Future.class); + Object value = new Object(); + when(future.get()).thenReturn(value); + ToObservableFuture ob = new ToObservableFuture(future); + Observer o = mock(Observer.class); + + Subscription sub = ob.call(o); + sub.unsubscribe(); + + verify(o, times(1)).onNext(value); + verify(o, times(1)).onCompleted(); + verify(o, never()).onError(null); + verify(future, never()).cancel(true); + } + + @Test + public void testFailure() throws Exception { + Future future = mock(Future.class); + RuntimeException e = new RuntimeException(); + when(future.get()).thenThrow(e); + ToObservableFuture ob = new ToObservableFuture(future); + Observer o = mock(Observer.class); + + Subscription sub = ob.call(o); + sub.unsubscribe(); + + verify(o, never()).onNext(null); + verify(o, never()).onCompleted(); + verify(o, times(1)).onError(e); + verify(future, never()).cancel(true); + } + } +} From 24c067384ef3ab583e86c26dbae472601633fbc3 Mon Sep 17 00:00:00 2001 From: George Campbell Date: Mon, 28 Jan 2013 13:57:53 -0800 Subject: [PATCH 2/3] adding support for also get time outs. --- .../main/java/rx/observables/Observable.java | 20 +++++++++++++++++++ .../OperationToObservableFuture.java | 18 ++++++++++++++++- 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/observables/Observable.java b/rxjava-core/src/main/java/rx/observables/Observable.java index 3afa07b781..f3bdb52209 100644 --- a/rxjava-core/src/main/java/rx/observables/Observable.java +++ b/rxjava-core/src/main/java/rx/observables/Observable.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.junit.Before; import org.junit.Test; @@ -1377,6 +1378,25 @@ public static Observable toObservable(Iterable iterable) { public static Observable toObservable(Future future) { return create(OperationToObservableFuture.toObservableFuture(future)); } + + /** + * Converts an Future to a Observable sequence. + * + * Any object that supports the {@link Future} interface can be converted into a Observable that emits + * the return value of the get() method in the object, by passing the object into the toObservable method. + * + * @param future + * the source {@link Future} + * @param time the maximum time to wait + * @param unit the time unit of the time argument + * @param + * the type of of object that the future's returns and the type emitted by the resulting + * Observable + * @return a Observable that emits the item from the source Future + */ + public static Observable toObservable(Future future, long time, TimeUnit unit) { + return create(OperationToObservableFuture.toObservableFuture(future, time, unit)); + } /** * Converts an Array sequence to a Observable sequence. diff --git a/rxjava-core/src/main/java/rx/observables/operations/OperationToObservableFuture.java b/rxjava-core/src/main/java/rx/observables/operations/OperationToObservableFuture.java index 0bfd751f35..9b8c8d14e9 100644 --- a/rxjava-core/src/main/java/rx/observables/operations/OperationToObservableFuture.java +++ b/rxjava-core/src/main/java/rx/observables/operations/OperationToObservableFuture.java @@ -7,6 +7,7 @@ import static org.mockito.Mockito.when; import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import org.junit.Test; @@ -18,15 +19,26 @@ public class OperationToObservableFuture { private static class ToObservableFuture implements OperatorSubscribeFunction { private final Future that; + private final Long time; + private final TimeUnit unit; public ToObservableFuture(Future that) { this.that = that; + this.time = null; + this.unit = null; + } + + public ToObservableFuture(Future that, long time, TimeUnit unit) { + this.that = that; + this.time = time; + this.unit = unit; } @Override public Subscription call(Observer observer) { try { - T value = that.get(); + T value = (time == null) ? that.get() : that.get(time, unit); + if (!that.isCancelled()) { observer.onNext(value); } @@ -45,6 +57,10 @@ public static Func1, Subscription> toObservableFuture(final Futu return new ToObservableFuture(that); } + public static Func1, Subscription> toObservableFuture(final Future that, long time, TimeUnit unit) { + return new ToObservableFuture(that, time, unit); + } + @SuppressWarnings("unchecked") public static class UnitTest { @Test From 38ad57c1e90f982a2715c350ce3994c4f4d5c0e7 Mon Sep 17 00:00:00 2001 From: George Campbell Date: Tue, 29 Jan 2013 17:13:21 -0800 Subject: [PATCH 3/3] adding more details about how the toObservable behaves. --- rxjava-core/src/main/java/rx/observables/Observable.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rxjava-core/src/main/java/rx/observables/Observable.java b/rxjava-core/src/main/java/rx/observables/Observable.java index d6e51784ce..14ace0d357 100644 --- a/rxjava-core/src/main/java/rx/observables/Observable.java +++ b/rxjava-core/src/main/java/rx/observables/Observable.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import org.junit.Before; import org.junit.Test; @@ -1406,6 +1407,7 @@ public static Observable toObservable(Iterable iterable) { * * Any object that supports the {@link Future} interface can be converted into a Observable that emits * the return value of the get() method in the object, by passing the object into the toObservable method. + * The subscribe method on this synchronously so the Subscription returned doesn't nothing. * * @param future * the source {@link Future} @@ -1423,6 +1425,8 @@ public static Observable toObservable(Future future) { * * Any object that supports the {@link Future} interface can be converted into a Observable that emits * the return value of the get() method in the object, by passing the object into the toObservable method. + * The subscribe method on this synchronously so the Subscription returned doesn't nothing. + * If the future timesout the {@link TimeoutException} exception is passed to the onError. * * @param future * the source {@link Future}