Skip to content

Commit

Permalink
Merge pull request #109 from abersnaze/future
Browse files Browse the repository at this point in the history
Creating toObservable for Future
  • Loading branch information
benjchristensen committed Jan 31, 2013
2 parents b764ef1 + 38ad57c commit ca75023
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 0 deletions.
43 changes: 43 additions & 0 deletions rxjava-core/src/main/java/rx/observables/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.junit.Before;
import org.junit.Test;
Expand All @@ -42,6 +45,7 @@
import rx.observables.operations.OperationSkip;
import rx.observables.operations.OperationSynchronize;
import rx.observables.operations.OperationTake;
import rx.observables.operations.OperationToObservableFuture;
import rx.observables.operations.OperationToObservableIterable;
import rx.observables.operations.OperationToObservableList;
import rx.observables.operations.OperationToObservableSortedList;
Expand Down Expand Up @@ -1398,6 +1402,45 @@ public static <T> Observable<T> toObservable(Iterable<T> iterable) {
return create(OperationToObservableIterable.toObservableIterable(iterable));
}

/**
* Converts an Future to a Observable sequence.
*
* Any object that supports the {@link Future} interface can be converted into a Observable that emits
* the return value of the get() method in the object, by passing the object into the <code>toObservable</code> method.
* The subscribe method on this synchronously so the Subscription returned doesn't nothing.
*
* @param future
* the source {@link Future}
* @param <T>
* the type of of object that the future's returns and the type emitted by the resulting
* Observable
* @return a Observable that emits the item from the source Future
*/
public static <T> Observable<T> toObservable(Future<T> future) {
return create(OperationToObservableFuture.toObservableFuture(future));
}

/**
* Converts an Future to a Observable sequence.
*
* Any object that supports the {@link Future} interface can be converted into a Observable that emits
* the return value of the get() method in the object, by passing the object into the <code>toObservable</code> method.
* The subscribe method on this synchronously so the Subscription returned doesn't nothing.
* If the future timesout the {@link TimeoutException} exception is passed to the onError.
*
* @param future
* the source {@link Future}
* @param time the maximum time to wait
* @param unit the time unit of the time argument
* @param <T>
* the type of of object that the future's returns and the type emitted by the resulting
* Observable
* @return a Observable that emits the item from the source Future
*/
public static <T> Observable<T> toObservable(Future<T> future, long time, TimeUnit unit) {
return create(OperationToObservableFuture.toObservableFuture(future, time, unit));
}

/**
* Converts an Array sequence to a Observable sequence.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package rx.observables.operations;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

import rx.observables.Observable;
import rx.observables.Observer;
import rx.observables.Subscription;
import rx.util.functions.Func1;

public class OperationToObservableFuture {
private static class ToObservableFuture<T> implements OperatorSubscribeFunction<T> {
private final Future<T> that;
private final Long time;
private final TimeUnit unit;

public ToObservableFuture(Future<T> that) {
this.that = that;
this.time = null;
this.unit = null;
}

public ToObservableFuture(Future<T> that, long time, TimeUnit unit) {
this.that = that;
this.time = time;
this.unit = unit;
}

@Override
public Subscription call(Observer<T> observer) {
try {
T value = (time == null) ? that.get() : that.get(time, unit);

if (!that.isCancelled()) {
observer.onNext(value);
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}

// the get() has already completed so there is no point in
// giving the user a way to cancel.
return Observable.noOpSubscription();
}
}

public static <T> Func1<Observer<T>, Subscription> toObservableFuture(final Future<T> that) {
return new ToObservableFuture<T>(that);
}

public static <T> Func1<Observer<T>, Subscription> toObservableFuture(final Future<T> that, long time, TimeUnit unit) {
return new ToObservableFuture<T>(that, time, unit);
}

@SuppressWarnings("unchecked")
public static class UnitTest {
@Test
public void testSuccess() throws Exception {
Future<Object> future = mock(Future.class);
Object value = new Object();
when(future.get()).thenReturn(value);
ToObservableFuture ob = new ToObservableFuture(future);
Observer<Object> o = mock(Observer.class);

Subscription sub = ob.call(o);
sub.unsubscribe();

verify(o, times(1)).onNext(value);
verify(o, times(1)).onCompleted();
verify(o, never()).onError(null);
verify(future, never()).cancel(true);
}

@Test
public void testFailure() throws Exception {
Future<Object> future = mock(Future.class);
RuntimeException e = new RuntimeException();
when(future.get()).thenThrow(e);
ToObservableFuture ob = new ToObservableFuture(future);
Observer<Object> o = mock(Observer.class);

Subscription sub = ob.call(o);
sub.unsubscribe();

verify(o, never()).onNext(null);
verify(o, never()).onCompleted();
verify(o, times(1)).onError(e);
verify(future, never()).cancel(true);
}
}
}

0 comments on commit ca75023

Please sign in to comment.