Skip to content

Commit

Permalink
Merge pull request ReactiveX#184 from benjchristensen/issue-57-last
Browse files Browse the repository at this point in the history
Convert 'last' from non-blocking to blocking to match Rx.Net
  • Loading branch information
benjchristensen committed Mar 12, 2013
2 parents 0e30134 + f7d5766 commit 3ef1fa0
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 116 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import rx.Notification;
import rx.Observable;
import rx.Observer;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Func1;

def class ObservableTests {
Expand Down Expand Up @@ -61,8 +62,12 @@ def class ObservableTests {

@Test
public void testLast() {
new TestFactory().getObservable().last().subscribe({ result -> a.received(result)});
verify(a, times(1)).received("hello_1");
assertEquals("three", Observable.toObservable("one", "two", "three").last())
}

@Test
public void testLastWithPredicate() {
assertEquals("two", Observable.toObservable("one", "two", "three").last({ x -> x.length() == 3}))
}

@Test
Expand Down Expand Up @@ -175,6 +180,12 @@ def class ObservableTests {
verify(a, times(0)).received(3);
}

@Test
public void testTakeLast() {
new TestFactory().getObservable().takeLast(1).subscribe({ result -> a.received(result)});
verify(a, times(1)).received("hello_1");
}

@Test
public void testTakeWhileViaGroovy() {
Observable.takeWhile(Observable.toObservable(1, 2, 3), { x -> x < 3}).subscribe({ result -> a.received(result)});
Expand Down Expand Up @@ -280,7 +291,7 @@ def class ObservableTests {
observer.onCompleted();
}
}).start();
return Observable.noOpSubscription();
return Subscriptions.empty();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testFilterViaGroovy() {

@Test
public void testLast() {
String script = "mockApiCall.getObservable().last().subscribe(lambda{|result| a.received(result)})";
String script = "mockApiCall.getObservable().takeLast(1).subscribe(lambda{|result| a.received(result)})";
runGroovyScript(script);
verify(assertion, times(1)).received("hello_1");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ class UnitTestSuite extends JUnitSuite {
verify(assertion, times(1)).received(List(2,4,6,8))
}

@Test def testLast() {
@Test def testTakeLast() {
val numbers = Observable.toObservable[Int](1, 2, 3, 4, 5, 6, 7, 8, 9)
numbers.last().subscribe((callback: Int) => {
println("testLast: onNext -> got " + callback)
numbers.takeLast(1).subscribe((callback: Int) => {
println("testTakeLast: onNext -> got " + callback)
assertion.received(callback)
})
verify(assertion, times(1)).received(9)
Expand Down
113 changes: 95 additions & 18 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
import rx.operators.OperationFilter;
import rx.operators.OperationLast;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
import rx.operators.OperationMerge;
Expand Down Expand Up @@ -537,7 +536,6 @@ public Subscription call(Observer<T> t1) {
}
}


/**
* an Observable that calls {@link Observer#onError(Exception)} when the Observer subscribes.
*
Expand Down Expand Up @@ -807,18 +805,44 @@ public static <T> Observable<T> just(T value) {
}

/**
* Takes the last item emitted by a source Observable and returns an Observable that emits only
* that item as its sole emission.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/last.png">
* Returns the last element of an observable sequence with a specified source.
*
* @param that
* the source Observable
* @return the last element in the observable sequence.
*/
public static <T> T last(final Observable<T> that) {
T result = null;
for (T value : that.toIterable()) {
result = value;
}
return result;
}

/**
* Returns the last element of an observable sequence that matches the predicate.
*
* @param that
* the source Observable
* @param predicate
* a predicate function to evaluate for elements in the sequence.
* @return the last element in the observable sequence.
*/
public static <T> T last(final Observable<T> that, final Func1<T, Boolean> predicate) {
return last(that.filter(predicate));
}

/**
* Returns the last element of an observable sequence that matches the predicate.
*
* @param that
* the source Observable
* @return an Observable that emits a single item, which is identical to the last item emitted
* by the source Observable
* @param predicate
* a predicate function to evaluate for elements in the sequence.
* @return the last element in the observable sequence.
*/
public static <T> Observable<T> last(final Observable<T> that) {
return _create(OperationLast.last(that));
public static <T> T last(final Observable<T> that, final Object predicate) {
return last(that.filter(predicate));
}

/**
Expand Down Expand Up @@ -1363,7 +1387,7 @@ public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<Ex
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public static <T> Observable<T> reduce(Observable<T> sequence, Func2<T, T, T> accumulator) {
return last(_create(OperationScan.scan(sequence, accumulator)));
return takeLast(_create(OperationScan.scan(sequence, accumulator)), 1);
}

/**
Expand Down Expand Up @@ -1435,7 +1459,7 @@ public T call(T t1, T t2) {
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public static <T> Observable<T> reduce(Observable<T> sequence, T initialValue, Func2<T, T, T> accumulator) {
return last(_create(OperationScan.scan(sequence, initialValue, accumulator)));
return takeLast(_create(OperationScan.scan(sequence, initialValue, accumulator)), 1);
}

/**
Expand Down Expand Up @@ -2364,17 +2388,44 @@ public Boolean call(T t1) {
}

/**
* Converts an Observable that emits a sequence of objects into one that only emits the last
* object in this sequence before completing.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/last.png">
* Returns the last element of an observable sequence with a specified source.
*
* @return an Observable that emits only the last item emitted by the original Observable
* @return the last element in the observable sequence.
*/
public Observable<T> last() {
public T last() {
return last(this);
}

/**
* Returns the last element of an observable sequence that matches the predicate.
*
* @param predicate
* a predicate function to evaluate for elements in the sequence.
* @return the last element in the observable sequence.
*/
public T last(final Func1<T, Boolean> predicate) {
return last(this, predicate);
}

/**
* Returns the last element of an observable sequence that matches the predicate.
*
* @param predicate
* a predicate function to evaluate for elements in the sequence.
* @return the last element in the observable sequence.
*/
public T last(final Object predicate) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(predicate);

return last(this, new Func1<T, Boolean>() {
@Override
public Boolean call(T args) {
return (Boolean) _f.call(args);
}
});
}

/**
* Returns the last element, or a default value if no value is found.
*
Expand Down Expand Up @@ -3333,6 +3384,32 @@ public Boolean call(String args) {
});
}

@Test
public void testLast() {
Observable<String> obs = Observable.toObservable("one", "two", "three");

assertEquals("three", obs.last());
}

@Test
public void testLastWithPredicate() {
Observable<String> obs = Observable.toObservable("one", "two", "three");

assertEquals("two", obs.last(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.length() == 3;
}
}));
}

@Test
public void testLastEmptyObservable() {
Observable<String> obs = Observable.toObservable();

assertNull(obs.last());
}

private static class TestException extends RuntimeException {
private static final long serialVersionUID = 1L;
}
Expand Down
91 changes: 0 additions & 91 deletions rxjava-core/src/main/java/rx/operators/OperationLast.java

This file was deleted.

0 comments on commit 3ef1fa0

Please sign in to comment.