Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Creating toObservable for Future #109

Merged
merged 4 commits into from
Jan 31, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions rxjava-core/src/main/java/rx/observables/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -42,6 +45,7 @@
import rx.observables.operations.OperationSkip;
import rx.observables.operations.OperationSynchronize;
import rx.observables.operations.OperationTake;
import rx.observables.operations.OperationToObservableFuture;
import rx.observables.operations.OperationToObservableIterable;
import rx.observables.operations.OperationToObservableList;
import rx.observables.operations.OperationToObservableSortedList;
Expand Down Expand Up @@ -1398,6 +1402,45 @@ public static <T> Observable<T> toObservable(Iterable<T> iterable) {
return create(OperationToObservableIterable.toObservableIterable(iterable));
}

/**
* Converts an Future to a Observable sequence.
*
* Any object that supports the {@link Future} interface can be converted into a Observable that emits
* the return value of the get() method in the object, by passing the object into the <code>toObservable</code> method.
* The subscribe method on this synchronously so the Subscription returned doesn't nothing.
*
* @param future
* the source {@link Future}
* @param <T>
* the type of of object that the future's returns and the type emitted by the resulting
* Observable
* @return a Observable that emits the item from the source Future
*/
public static <T> Observable<T> toObservable(Future<T> future) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overload with:

public static Observable toObservable(Future future, long timeout, TimeUnit unit)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

new method added

return create(OperationToObservableFuture.toObservableFuture(future));
}

/**
* Converts an Future to a Observable sequence.
*
* Any object that supports the {@link Future} interface can be converted into a Observable that emits
* the return value of the get() method in the object, by passing the object into the <code>toObservable</code> method.
* The subscribe method on this synchronously so the Subscription returned doesn't nothing.
* If the future timesout the {@link TimeoutException} exception is passed to the onError.
*
* @param future
* the source {@link Future}
* @param time the maximum time to wait
* @param unit the time unit of the time argument
* @param <T>
* the type of of object that the future's returns and the type emitted by the resulting
* Observable
* @return a Observable that emits the item from the source Future
*/
public static <T> Observable<T> toObservable(Future<T> future, long time, TimeUnit unit) {
return create(OperationToObservableFuture.toObservableFuture(future, time, unit));
}

/**
* Converts an Array sequence to a Observable sequence.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package rx.observables.operations;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

import rx.observables.Observable;
import rx.observables.Observer;
import rx.observables.Subscription;
import rx.util.functions.Func1;

public class OperationToObservableFuture {
private static class ToObservableFuture<T> implements OperatorSubscribeFunction<T> {
private final Future<T> that;
private final Long time;
private final TimeUnit unit;

public ToObservableFuture(Future<T> that) {
this.that = that;
this.time = null;
this.unit = null;
}

public ToObservableFuture(Future<T> that, long time, TimeUnit unit) {
this.that = that;
this.time = time;
this.unit = unit;
}

@Override
public Subscription call(Observer<T> observer) {
try {
T value = (time == null) ? that.get() : that.get(time, unit);

if (!that.isCancelled()) {
observer.onNext(value);
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}

// the get() has already completed so there is no point in
// giving the user a way to cancel.
return Observable.noOpSubscription();
}
}

public static <T> Func1<Observer<T>, Subscription> toObservableFuture(final Future<T> that) {
return new ToObservableFuture<T>(that);
}

public static <T> Func1<Observer<T>, Subscription> toObservableFuture(final Future<T> that, long time, TimeUnit unit) {
return new ToObservableFuture<T>(that, time, unit);
}

@SuppressWarnings("unchecked")
public static class UnitTest {
@Test
public void testSuccess() throws Exception {
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
ToObservableFuture ob = new ToObservableFuture(future);
Observer<Object> o = mock(Observer.class);

Subscription sub = ob.call(o);
sub.unsubscribe();

verify(o, times(1)).onNext(value);
verify(o, times(1)).onCompleted();
verify(o, never()).onError(null);
verify(future, never()).cancel(true);
}

@Test
public void testFailure() throws Exception {
Future<Object> future = mock(Future.class);
RuntimeException e = new RuntimeException();
when(future.get()).thenThrow(e);
ToObservableFuture ob = new ToObservableFuture(future);
Observer<Object> o = mock(Observer.class);

Subscription sub = ob.call(o);
sub.unsubscribe();

verify(o, never()).onNext(null);
verify(o, never()).onCompleted();
verify(o, times(1)).onError(e);
verify(future, never()).cancel(true);
}
}
}