Skip to content

Commit

Permalink
Reimplemented Last operation
Browse files Browse the repository at this point in the history
  • Loading branch information
mairbek committed Mar 4, 2013
1 parent 8b39896 commit 3fab886
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,12 @@ def class ObservableTests {

@Test
public void testLast() {
new TestFactory().getObservable().last().subscribe({ result -> a.received(result)});
verify(a, times(1)).received("hello_1");
assertEquals("three", Observable.toObservable("one", "two", "three").last())
}

@Test
public void testLastWithPredicate() {
assertEquals("two", Observable.toObservable("one", "two", "three").last({ x -> x.length() == 3}))
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public void testFilterViaGroovy() {

@Test
public void testLast() {
String script = "mockApiCall.getObservable().last().subscribe(lambda{|result| a.received(result)})";
String script = "mockApiCall.getObservable().takeLast(1).subscribe(lambda{|result| a.received(result)})";
runGroovyScript(script);
verify(assertion, times(1)).received("hello_1");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ class UnitTestSuite extends JUnitSuite {
verify(assertion, times(1)).received(List(2,4,6,8))
}

@Test def testLast() {
@Test def testTakeLast() {
val numbers = Observable.toObservable[Int](1, 2, 3, 4, 5, 6, 7, 8, 9)
numbers.last().subscribe((callback: Int) => {
println("testLast: onNext -> got " + callback)
numbers.takeLast(1).subscribe((callback: Int) => {
println("testTakeLast: onNext -> got " + callback)
assertion.received(callback)
})
verify(assertion, times(1)).received(9)
Expand Down
104 changes: 83 additions & 21 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.*;
import static org.mockito.Mockito.*;

Expand All @@ -31,7 +32,6 @@

import rx.operators.OperationConcat;
import rx.operators.OperationFilter;
import rx.operators.OperationLast;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
import rx.operators.OperationMerge;
Expand Down Expand Up @@ -732,18 +732,28 @@ public static <T> Observable<T> just(T value) {
}

/**
* Takes the last item emitted by a source Observable and returns an Observable that emits only
* that item as its sole emission.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/last.png">
*
* @param that
* the source Observable
* @return an Observable that emits a single item, which is identical to the last item emitted
* by the source Observable
* Returns the last element of an observable sequence with a specified source.
*
* @param that the source Observable
* @return the last element in the observable sequence.
*/
public static <T> Observable<T> last(final Observable<T> that) {
return _create(OperationLast.last(that));
public static <T> T last(final Observable<T> that) {
T result = null;
for (T value : that.toIterable()) {
result = value;
}
return result;
}

/**
* Returns the last element of an observable sequence that matches the predicate.
*
* @param that the source Observable
* @param predicate a predicate function to evaluate for elements in the sequence.
* @return the last element in the observable sequence.
*/
public static <T> T last(final Observable<T> that, final Func1<T, Boolean> predicate) {
return last(that.filter(predicate));
}

/**
Expand Down Expand Up @@ -1255,7 +1265,7 @@ public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<Ex
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public static <T> Observable<T> reduce(Observable<T> sequence, Func2<T, T, T> accumulator) {
return last(_create(OperationScan.scan(sequence, accumulator)));
return takeLast(_create(OperationScan.scan(sequence, accumulator)), 1);
}

/**
Expand Down Expand Up @@ -1327,7 +1337,7 @@ public T call(T t1, T t2) {
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public static <T> Observable<T> reduce(Observable<T> sequence, T initialValue, Func2<T, T, T> accumulator) {
return last(_create(OperationScan.scan(sequence, initialValue, accumulator)));
return takeLast(_create(OperationScan.scan(sequence, initialValue, accumulator)), 1);
}

/**
Expand Down Expand Up @@ -2269,17 +2279,42 @@ public Boolean call(T t1) {
}

/**
* Converts an Observable that emits a sequence of objects into one that only emits the last
* object in this sequence before completing.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/last.png">
*
* @return an Observable that emits only the last item emitted by the original Observable
* Returns the last element of an observable sequence with a specified source.
*
* @return the last element in the observable sequence.
*/
public Observable<T> last() {
public T last() {
return last(this);
}

/**
* Returns the last element of an observable sequence that matches the predicate.
*
* @param predicate a predicate function to evaluate for elements in the sequence.
* @return the last element in the observable sequence.
*/
public T last(final Func1<T, Boolean> predicate) {
return last(this, predicate);
}

/**
* Returns the last element of an observable sequence that matches the predicate.
*
* @param predicate a predicate function to evaluate for elements in the sequence.
* @return the last element in the observable sequence.
*/
public T last(final Object predicate) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(predicate);

return last(this, new Func1<T, Boolean>() {
@Override
public Boolean call(T args) {
return (Boolean) _f.call(args);
}
});
}

/**
* Returns the last element, or a default value if no value is found.
*
Expand Down Expand Up @@ -3151,6 +3186,33 @@ public Boolean call(String args) {
});
}

@Test
public void testLast() {
Observable<String> obs = Observable.toObservable("one", "two", "three");

assertEquals("three", obs.last());
}

@Test
public void testLastWithPredicate() {
Observable<String> obs = Observable.toObservable("one", "two", "three");

assertEquals("two", obs.last(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.length() == 3;
}
}));
}

@Test
public void testLastEmptyObservable() {
Observable<String> obs = Observable.toObservable();

assertNull(obs.last());
}


private static class TestException extends RuntimeException {

}
Expand Down
91 changes: 0 additions & 91 deletions rxjava-core/src/main/java/rx/operators/OperationLast.java

This file was deleted.

0 comments on commit 3fab886

Please sign in to comment.