Skip to content

Commit

Permalink
Merge pull request #272 from benjchristensen/BlockingObservable
Browse files Browse the repository at this point in the history
BlockingObservable
  • Loading branch information
benjchristensen committed May 16, 2013
2 parents 62ec36e + 678f65d commit ce3ee1b
Show file tree
Hide file tree
Showing 46 changed files with 970 additions and 866 deletions.
2 changes: 1 addition & 1 deletion language-adaptors/rxjava-groovy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This adaptor allows 'groovy.lang.Closure' functions to be used and RxJava will k
This enables code such as:

```groovy
Observable.toObservable("one", "two", "three")
Observable.from("one", "two", "three")
.take(2)
.subscribe({arg -> println(arg)})
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import rx.util.functions.Func1;
// --------------------------------------------------

def hello(String[] names) {
Observable.toObservable(names)
Observable.from(names)
.subscribe({ println "Hello " + it + "!"})
}

Expand All @@ -38,15 +38,15 @@ hello("Ben", "George")
// --------------------------------------------------

def existingDataFromNumbers() {
Observable<Integer> o = Observable.toObservable(1, 2, 3, 4, 5, 6);
Observable<Integer> o = Observable.from(1, 2, 3, 4, 5, 6);
}

def existingDataFromNumbersUsingFrom() {
Observable<Integer> o2 = Observable.from(1, 2, 3, 4, 5, 6);
}

def existingDataFromObjects() {
Observable<String> o = Observable.toObservable("a", "b", "c");
Observable<String> o = Observable.from("a", "b", "c");
}

def existingDataFromObjectsUsingFrom() {
Expand All @@ -55,7 +55,7 @@ def existingDataFromObjectsUsingFrom() {

def existingDataFromList() {
def list = [5, 6, 7, 8]
Observable<Integer> o = Observable.toObservable(list);
Observable<Integer> o = Observable.from(list);
}

def existingDataFromListUsingFrom() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,20 @@ def class ObservableTests {

@Test
public void testFilter() {
Observable.filter(Observable.toObservable(1, 2, 3), {it >= 2}).subscribe({ result -> a.received(result)});
Observable.filter(Observable.from(1, 2, 3), {it >= 2}).subscribe({ result -> a.received(result)});
verify(a, times(0)).received(1);
verify(a, times(1)).received(2);
verify(a, times(1)).received(3);
}

@Test
public void testLast() {
assertEquals("three", Observable.toObservable("one", "two", "three").last())
assertEquals("three", Observable.from("one", "two", "three").toBlockingObservable().last())
}

@Test
public void testLastWithPredicate() {
assertEquals("two", Observable.toObservable("one", "two", "three").last({ x -> x.length() == 3}))
assertEquals("two", Observable.from("one", "two", "three").toBlockingObservable().last({ x -> x.length() == 3}))
}

@Test
Expand All @@ -78,15 +78,15 @@ def class ObservableTests {

@Test
public void testMap2() {
Observable.map(Observable.toObservable(1, 2, 3), {'hello_' + it}).subscribe({ result -> a.received(result)});
Observable.map(Observable.from(1, 2, 3), {'hello_' + it}).subscribe({ result -> a.received(result)});
verify(a, times(1)).received("hello_" + 1);
verify(a, times(1)).received("hello_" + 2);
verify(a, times(1)).received("hello_" + 3);
}

@Test
public void testMaterialize() {
Observable.materialize(Observable.toObservable(1, 2, 3)).subscribe({ result -> a.received(result)});
Observable.materialize(Observable.from(1, 2, 3)).subscribe({ result -> a.received(result)});
// we expect 4 onNext calls: 3 for 1, 2, 3 ObservableNotification.OnNext and 1 for ObservableNotification.OnCompleted
verify(a, times(4)).received(any(Notification.class));
verify(a, times(0)).error(any(Exception.class));
Expand All @@ -95,12 +95,12 @@ def class ObservableTests {
@Test
public void testMergeDelayError() {
Observable.mergeDelayError(
Observable.toObservable(1, 2, 3),
Observable.from(1, 2, 3),
Observable.merge(
Observable.toObservable(6),
Observable.from(6),
Observable.error(new NullPointerException()),
Observable.toObservable(7)),
Observable.toObservable(4, 5))
Observable.from(7)),
Observable.from(4, 5))
.subscribe( { result -> a.received(result)}, { exception -> a.error(exception)});

verify(a, times(1)).received(1);
Expand All @@ -116,12 +116,12 @@ def class ObservableTests {
@Test
public void testMerge() {
Observable.merge(
Observable.toObservable(1, 2, 3),
Observable.from(1, 2, 3),
Observable.merge(
Observable.toObservable(6),
Observable.from(6),
Observable.error(new NullPointerException()),
Observable.toObservable(7)),
Observable.toObservable(4, 5))
Observable.from(7)),
Observable.from(4, 5))
.subscribe({ result -> a.received(result)}, { exception -> a.error(exception)});

// executing synchronously so we can deterministically know what order things will come
Expand Down Expand Up @@ -158,23 +158,23 @@ def class ObservableTests {

@Test
public void testSkipTake() {
Observable.skip(Observable.toObservable(1, 2, 3), 1).take(1).subscribe({ result -> a.received(result)});
Observable.skip(Observable.from(1, 2, 3), 1).take(1).subscribe({ result -> a.received(result)});
verify(a, times(0)).received(1);
verify(a, times(1)).received(2);
verify(a, times(0)).received(3);
}

@Test
public void testSkip() {
Observable.skip(Observable.toObservable(1, 2, 3), 2).subscribe({ result -> a.received(result)});
Observable.skip(Observable.from(1, 2, 3), 2).subscribe({ result -> a.received(result)});
verify(a, times(0)).received(1);
verify(a, times(0)).received(2);
verify(a, times(1)).received(3);
}

@Test
public void testTake() {
Observable.take(Observable.toObservable(1, 2, 3), 2).subscribe({ result -> a.received(result)});
Observable.take(Observable.from(1, 2, 3), 2).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(1);
verify(a, times(1)).received(2);
verify(a, times(0)).received(3);
Expand All @@ -188,15 +188,15 @@ def class ObservableTests {

@Test
public void testTakeWhileViaGroovy() {
Observable.takeWhile(Observable.toObservable(1, 2, 3), { x -> x < 3}).subscribe({ result -> a.received(result)});
Observable.takeWhile(Observable.from(1, 2, 3), { x -> x < 3}).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(1);
verify(a, times(1)).received(2);
verify(a, times(0)).received(3);
}

@Test
public void testTakeWhileWithIndexViaGroovy() {
Observable.takeWhileWithIndex(Observable.toObservable(1, 2, 3), { x, i -> i < 2}).subscribe({ result -> a.received(result)});
Observable.takeWhileWithIndex(Observable.from(1, 2, 3), { x, i -> i < 2}).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(1);
verify(a, times(1)).received(2);
verify(a, times(0)).received(3);
Expand All @@ -210,7 +210,7 @@ def class ObservableTests {

@Test
public void testToSortedListStatic() {
Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4)).subscribe({ result -> a.received(result)});
Observable.toSortedList(Observable.from(1, 3, 2, 5, 4)).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5));
}

Expand All @@ -222,7 +222,7 @@ def class ObservableTests {

@Test
public void testToSortedListWithFunctionStatic() {
Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)});
Observable.toSortedList(Observable.from(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5));
}

Expand All @@ -246,29 +246,29 @@ def class ObservableTests {

@Test
public void testLastOrDefault() {
def val = Observable.toObservable("one", "two").lastOrDefault("default", { x -> x.length() == 3})
def val = Observable.from("one", "two").toBlockingObservable().lastOrDefault("default", { x -> x.length() == 3})
assertEquals("two", val)
}

@Test
public void testLastOrDefault2() {
def val = Observable.toObservable("one", "two").lastOrDefault("default", { x -> x.length() > 3})
def val = Observable.from("one", "two").toBlockingObservable().lastOrDefault("default", { x -> x.length() > 3})
assertEquals("default", val)
}

public void testSingle1() {
def s = Observable.toObservable("one").single({ x -> x.length() == 3})
def s = Observable.from("one").toBlockingObservable().single({ x -> x.length() == 3})
assertEquals("one", s)
}

@Test(expected = IllegalStateException.class)
public void testSingle2() {
Observable.toObservable("one", "two").single({ x -> x.length() == 3})
Observable.from("one", "two").toBlockingObservable().single({ x -> x.length() == 3})
}

@Test
public void testDefer() {
def obs = Observable.toObservable(1, 2)
def obs = Observable.from(1, 2)
Observable.defer({-> obs }).subscribe({ result -> a.received(result)})
verify(a, times(1)).received(1);
verify(a, times(1)).received(2);
Expand All @@ -277,7 +277,7 @@ def class ObservableTests {

@Test
public void testAll() {
Observable.toObservable(1, 2, 3).all({ x -> x > 0 }).subscribe({ result -> a.received(result) });
Observable.from(1, 2, 3).all({ x -> x > 0 }).subscribe({ result -> a.received(result) });
verify(a, times(1)).received(true);
}

Expand Down Expand Up @@ -305,7 +305,7 @@ def class ObservableTests {
int counter = 1;

public Observable<Integer> getNumbers() {
return Observable.toObservable(1, 3, 2, 5, 4);
return Observable.from(1, 3, 2, 5, 4);
}

public TestObservable getObservable() {
Expand Down
2 changes: 1 addition & 1 deletion language-adaptors/rxjava-jruby/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This adaptor allows `org.jruby.RubyProc` lambda functions to be used and RxJava
This enables code such as:

```ruby
Observable.toObservable("one", "two", "three")
Observable.from("one", "two", "three")
.take(2)
.subscribe(lambda { |arg| puts arg })
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testCreateViaGroovy() {

@Test
public void testFilterViaGroovy() {
runGroovyScript("Observable.filter(Observable.toObservable(1, 2, 3), lambda{|it| it >= 2}).subscribe(lambda{|result| a.received(result)});");
runGroovyScript("Observable.filter(Observable.from(1, 2, 3), lambda{|it| it >= 2}).subscribe(lambda{|result| a.received(result)});");
verify(assertion, times(0)).received(1L);
verify(assertion, times(1)).received(2L);
verify(assertion, times(1)).received(3L);
Expand All @@ -98,7 +98,7 @@ public void testMap() {

@Test
public void testMaterializeViaGroovy() {
runGroovyScript("Observable.materialize(Observable.toObservable(1, 2, 3)).subscribe(lambda{|result| a.received(result)});");
runGroovyScript("Observable.materialize(Observable.from(1, 2, 3)).subscribe(lambda{|result| a.received(result)});");
// we expect 4 onNext calls: 3 for 1, 2, 3 ObservableNotification.OnNext and 1 for ObservableNotification.OnCompleted
verify(assertion, times(4)).received(any(Notification.class));
verify(assertion, times(0)).error(any(Exception.class));
Expand Down Expand Up @@ -129,23 +129,23 @@ public void testScriptWithOnNext() {

@Test
public void testSkipTakeViaGroovy() {
runGroovyScript("Observable.skip(Observable.toObservable(1, 2, 3), 1).take(1).subscribe(lambda{|result| a.received(result)});");
runGroovyScript("Observable.skip(Observable.from(1, 2, 3), 1).take(1).subscribe(lambda{|result| a.received(result)});");
verify(assertion, times(0)).received(1);
verify(assertion, times(1)).received(2L);
verify(assertion, times(0)).received(3);
}

@Test
public void testSkipViaGroovy() {
runGroovyScript("Observable.skip(Observable.toObservable(1, 2, 3), 2).subscribe(lambda{|result| a.received(result)});");
runGroovyScript("Observable.skip(Observable.from(1, 2, 3), 2).subscribe(lambda{|result| a.received(result)});");
verify(assertion, times(0)).received(1);
verify(assertion, times(0)).received(2);
verify(assertion, times(1)).received(3L);
}

@Test
public void testTakeViaGroovy() {
runGroovyScript("Observable.take(Observable.toObservable(1, 2, 3), 2).subscribe(lambda{|result| a.received(result)});");
runGroovyScript("Observable.take(Observable.from(1, 2, 3), 2).subscribe(lambda{|result| a.received(result)});");
verify(assertion, times(1)).received(1L);
verify(assertion, times(1)).received(2L);
verify(assertion, times(0)).received(3);
Expand Down Expand Up @@ -183,7 +183,7 @@ public static class TestFactory {
int counter = 1;

public Observable<Integer> getNumbers() {
return Observable.toObservable(1, 3, 2, 5, 4);
return Observable.from(1, 3, 2, 5, 4);
}

public TestObservable getObservable() {
Expand Down
2 changes: 1 addition & 1 deletion language-adaptors/rxjava-scala/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This adaptor allows 'fn' functions to be used and RxJava will know how to invoke
This enables code such as:

```scala
Observable.toObservable("1", "2", "3")
Observable.from("1", "2", "3")
.take(2)
.subscribe((callback: String) => {
println(callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class UnitTestSuite extends JUnitSuite {
}

@Test def testTake() {
Observable.toObservable("1", "2", "3").take(1).subscribe(Map(
Observable.from("1", "2", "3").take(1).subscribe(Map(
"onNext" -> ((callback: String) => {
print("testTake: callback = " + callback)
assertion.received(callback)
Expand All @@ -133,14 +133,14 @@ class UnitTestSuite extends JUnitSuite {

@Test def testClosureVersusMap() {
// using closure
Observable.toObservable("1", "2", "3")
Observable.from("1", "2", "3")
.take(2)
.subscribe((callback: String) => {
println(callback)
})

// using Map of closures
Observable.toObservable("1", "2", "3")
Observable.from("1", "2", "3")
.take(2)
.subscribe(Map(
"onNext" -> ((callback: String) => {
Expand All @@ -149,7 +149,7 @@ class UnitTestSuite extends JUnitSuite {
}

@Test def testFilterWithToList() {
val numbers = Observable.toObservable[Int](1, 2, 3, 4, 5, 6, 7, 8, 9)
val numbers = Observable.from[Int](1, 2, 3, 4, 5, 6, 7, 8, 9)
numbers.filter((x: Int) => 0 == (x % 2)).toList().subscribe(
(callback: java.util.List[Int]) => {
val lst = callback.asScala.toList
Expand All @@ -161,7 +161,7 @@ class UnitTestSuite extends JUnitSuite {
}

@Test def testTakeLast() {
val numbers = Observable.toObservable[Int](1, 2, 3, 4, 5, 6, 7, 8, 9)
val numbers = Observable.from[Int](1, 2, 3, 4, 5, 6, 7, 8, 9)
numbers.takeLast(1).subscribe((callback: Int) => {
println("testTakeLast: onNext -> got " + callback)
assertion.received(callback)
Expand All @@ -170,7 +170,7 @@ class UnitTestSuite extends JUnitSuite {
}

@Test def testMap() {
val numbers = Observable.toObservable(1, 2, 3, 4, 5, 6, 7, 8, 9)
val numbers = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9)
val mappedNumbers = new ArrayBuffer[Int]()
numbers.map(((x: Int)=> { x * x })).subscribe(((squareVal: Int) => {
println("square is " + squareVal )
Expand All @@ -181,9 +181,9 @@ class UnitTestSuite extends JUnitSuite {
}

@Test def testZip() {
val numbers = Observable.toObservable(1, 2, 3)
val colors = Observable.toObservable("red", "green", "blue")
val characters = Observable.toObservable("lion-o", "cheetara", "panthro")
val numbers = Observable.from(1, 2, 3)
val colors = Observable.from("red", "green", "blue")
val characters = Observable.from("lion-o", "cheetara", "panthro")

Observable.zip(numbers.toList, colors.toList, characters.toList, ((n: java.util.List[Int], c: java.util.List[String], t: java.util.List[String]) => { Map(
"numbers" -> n,
Expand Down
Loading

0 comments on commit ce3ee1b

Please sign in to comment.