Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlockingObservable #272

Merged
merged 6 commits into from
May 16, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion language-adaptors/rxjava-groovy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This adaptor allows 'groovy.lang.Closure' functions to be used and RxJava will k
This enables code such as:

```groovy
Observable.toObservable("one", "two", "three")
Observable.from("one", "two", "three")
.take(2)
.subscribe({arg -> println(arg)})
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import rx.util.functions.Func1;
// --------------------------------------------------

def hello(String[] names) {
Observable.toObservable(names)
Observable.from(names)
.subscribe({ println "Hello " + it + "!"})
}

Expand All @@ -38,15 +38,15 @@ hello("Ben", "George")
// --------------------------------------------------

def existingDataFromNumbers() {
Observable<Integer> o = Observable.toObservable(1, 2, 3, 4, 5, 6);
Observable<Integer> o = Observable.from(1, 2, 3, 4, 5, 6);
}

def existingDataFromNumbersUsingFrom() {
Observable<Integer> o2 = Observable.from(1, 2, 3, 4, 5, 6);
}

def existingDataFromObjects() {
Observable<String> o = Observable.toObservable("a", "b", "c");
Observable<String> o = Observable.from("a", "b", "c");
}

def existingDataFromObjectsUsingFrom() {
Expand All @@ -55,7 +55,7 @@ def existingDataFromObjectsUsingFrom() {

def existingDataFromList() {
def list = [5, 6, 7, 8]
Observable<Integer> o = Observable.toObservable(list);
Observable<Integer> o = Observable.from(list);
}

def existingDataFromListUsingFrom() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,20 +54,20 @@ def class ObservableTests {

@Test
public void testFilter() {
Observable.filter(Observable.toObservable(1, 2, 3), {it >= 2}).subscribe({ result -> a.received(result)});
Observable.filter(Observable.from(1, 2, 3), {it >= 2}).subscribe({ result -> a.received(result)});
verify(a, times(0)).received(1);
verify(a, times(1)).received(2);
verify(a, times(1)).received(3);
}

@Test
public void testLast() {
assertEquals("three", Observable.toObservable("one", "two", "three").last())
assertEquals("three", Observable.from("one", "two", "three").toBlockingObservable().last())
}

@Test
public void testLastWithPredicate() {
assertEquals("two", Observable.toObservable("one", "two", "three").last({ x -> x.length() == 3}))
assertEquals("two", Observable.from("one", "two", "three").toBlockingObservable().last({ x -> x.length() == 3}))
}

@Test
Expand All @@ -78,15 +78,15 @@ def class ObservableTests {

@Test
public void testMap2() {
Observable.map(Observable.toObservable(1, 2, 3), {'hello_' + it}).subscribe({ result -> a.received(result)});
Observable.map(Observable.from(1, 2, 3), {'hello_' + it}).subscribe({ result -> a.received(result)});
verify(a, times(1)).received("hello_" + 1);
verify(a, times(1)).received("hello_" + 2);
verify(a, times(1)).received("hello_" + 3);
}

@Test
public void testMaterialize() {
Observable.materialize(Observable.toObservable(1, 2, 3)).subscribe({ result -> a.received(result)});
Observable.materialize(Observable.from(1, 2, 3)).subscribe({ result -> a.received(result)});
// we expect 4 onNext calls: 3 for 1, 2, 3 ObservableNotification.OnNext and 1 for ObservableNotification.OnCompleted
verify(a, times(4)).received(any(Notification.class));
verify(a, times(0)).error(any(Exception.class));
Expand All @@ -95,12 +95,12 @@ def class ObservableTests {
@Test
public void testMergeDelayError() {
Observable.mergeDelayError(
Observable.toObservable(1, 2, 3),
Observable.from(1, 2, 3),
Observable.merge(
Observable.toObservable(6),
Observable.from(6),
Observable.error(new NullPointerException()),
Observable.toObservable(7)),
Observable.toObservable(4, 5))
Observable.from(7)),
Observable.from(4, 5))
.subscribe( { result -> a.received(result)}, { exception -> a.error(exception)});

verify(a, times(1)).received(1);
Expand All @@ -116,12 +116,12 @@ def class ObservableTests {
@Test
public void testMerge() {
Observable.merge(
Observable.toObservable(1, 2, 3),
Observable.from(1, 2, 3),
Observable.merge(
Observable.toObservable(6),
Observable.from(6),
Observable.error(new NullPointerException()),
Observable.toObservable(7)),
Observable.toObservable(4, 5))
Observable.from(7)),
Observable.from(4, 5))
.subscribe({ result -> a.received(result)}, { exception -> a.error(exception)});

// executing synchronously so we can deterministically know what order things will come
Expand Down Expand Up @@ -158,23 +158,23 @@ def class ObservableTests {

@Test
public void testSkipTake() {
Observable.skip(Observable.toObservable(1, 2, 3), 1).take(1).subscribe({ result -> a.received(result)});
Observable.skip(Observable.from(1, 2, 3), 1).take(1).subscribe({ result -> a.received(result)});
verify(a, times(0)).received(1);
verify(a, times(1)).received(2);
verify(a, times(0)).received(3);
}

@Test
public void testSkip() {
Observable.skip(Observable.toObservable(1, 2, 3), 2).subscribe({ result -> a.received(result)});
Observable.skip(Observable.from(1, 2, 3), 2).subscribe({ result -> a.received(result)});
verify(a, times(0)).received(1);
verify(a, times(0)).received(2);
verify(a, times(1)).received(3);
}

@Test
public void testTake() {
Observable.take(Observable.toObservable(1, 2, 3), 2).subscribe({ result -> a.received(result)});
Observable.take(Observable.from(1, 2, 3), 2).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(1);
verify(a, times(1)).received(2);
verify(a, times(0)).received(3);
Expand All @@ -188,15 +188,15 @@ def class ObservableTests {

@Test
public void testTakeWhileViaGroovy() {
Observable.takeWhile(Observable.toObservable(1, 2, 3), { x -> x < 3}).subscribe({ result -> a.received(result)});
Observable.takeWhile(Observable.from(1, 2, 3), { x -> x < 3}).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(1);
verify(a, times(1)).received(2);
verify(a, times(0)).received(3);
}

@Test
public void testTakeWhileWithIndexViaGroovy() {
Observable.takeWhileWithIndex(Observable.toObservable(1, 2, 3), { x, i -> i < 2}).subscribe({ result -> a.received(result)});
Observable.takeWhileWithIndex(Observable.from(1, 2, 3), { x, i -> i < 2}).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(1);
verify(a, times(1)).received(2);
verify(a, times(0)).received(3);
Expand All @@ -210,7 +210,7 @@ def class ObservableTests {

@Test
public void testToSortedListStatic() {
Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4)).subscribe({ result -> a.received(result)});
Observable.toSortedList(Observable.from(1, 3, 2, 5, 4)).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5));
}

Expand All @@ -222,7 +222,7 @@ def class ObservableTests {

@Test
public void testToSortedListWithFunctionStatic() {
Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)});
Observable.toSortedList(Observable.from(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)});
verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5));
}

Expand All @@ -246,29 +246,29 @@ def class ObservableTests {

@Test
public void testLastOrDefault() {
def val = Observable.toObservable("one", "two").lastOrDefault("default", { x -> x.length() == 3})
def val = Observable.from("one", "two").toBlockingObservable().lastOrDefault("default", { x -> x.length() == 3})
assertEquals("two", val)
}

@Test
public void testLastOrDefault2() {
def val = Observable.toObservable("one", "two").lastOrDefault("default", { x -> x.length() > 3})
def val = Observable.from("one", "two").toBlockingObservable().lastOrDefault("default", { x -> x.length() > 3})
assertEquals("default", val)
}

public void testSingle1() {
def s = Observable.toObservable("one").single({ x -> x.length() == 3})
def s = Observable.from("one").toBlockingObservable().single({ x -> x.length() == 3})
assertEquals("one", s)
}

@Test(expected = IllegalStateException.class)
public void testSingle2() {
Observable.toObservable("one", "two").single({ x -> x.length() == 3})
Observable.from("one", "two").toBlockingObservable().single({ x -> x.length() == 3})
}

@Test
public void testDefer() {
def obs = Observable.toObservable(1, 2)
def obs = Observable.from(1, 2)
Observable.defer({-> obs }).subscribe({ result -> a.received(result)})
verify(a, times(1)).received(1);
verify(a, times(1)).received(2);
Expand All @@ -277,7 +277,7 @@ def class ObservableTests {

@Test
public void testAll() {
Observable.toObservable(1, 2, 3).all({ x -> x > 0 }).subscribe({ result -> a.received(result) });
Observable.from(1, 2, 3).all({ x -> x > 0 }).subscribe({ result -> a.received(result) });
verify(a, times(1)).received(true);
}

Expand Down Expand Up @@ -305,7 +305,7 @@ def class ObservableTests {
int counter = 1;

public Observable<Integer> getNumbers() {
return Observable.toObservable(1, 3, 2, 5, 4);
return Observable.from(1, 3, 2, 5, 4);
}

public TestObservable getObservable() {
Expand Down
2 changes: 1 addition & 1 deletion language-adaptors/rxjava-jruby/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This adaptor allows `org.jruby.RubyProc` lambda functions to be used and RxJava
This enables code such as:

```ruby
Observable.toObservable("one", "two", "three")
Observable.from("one", "two", "three")
.take(2)
.subscribe(lambda { |arg| puts arg })
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ public void testCreateViaGroovy() {

@Test
public void testFilterViaGroovy() {
runGroovyScript("Observable.filter(Observable.toObservable(1, 2, 3), lambda{|it| it >= 2}).subscribe(lambda{|result| a.received(result)});");
runGroovyScript("Observable.filter(Observable.from(1, 2, 3), lambda{|it| it >= 2}).subscribe(lambda{|result| a.received(result)});");
verify(assertion, times(0)).received(1L);
verify(assertion, times(1)).received(2L);
verify(assertion, times(1)).received(3L);
Expand All @@ -98,7 +98,7 @@ public void testMap() {

@Test
public void testMaterializeViaGroovy() {
runGroovyScript("Observable.materialize(Observable.toObservable(1, 2, 3)).subscribe(lambda{|result| a.received(result)});");
runGroovyScript("Observable.materialize(Observable.from(1, 2, 3)).subscribe(lambda{|result| a.received(result)});");
// we expect 4 onNext calls: 3 for 1, 2, 3 ObservableNotification.OnNext and 1 for ObservableNotification.OnCompleted
verify(assertion, times(4)).received(any(Notification.class));
verify(assertion, times(0)).error(any(Exception.class));
Expand Down Expand Up @@ -129,23 +129,23 @@ public void testScriptWithOnNext() {

@Test
public void testSkipTakeViaGroovy() {
runGroovyScript("Observable.skip(Observable.toObservable(1, 2, 3), 1).take(1).subscribe(lambda{|result| a.received(result)});");
runGroovyScript("Observable.skip(Observable.from(1, 2, 3), 1).take(1).subscribe(lambda{|result| a.received(result)});");
verify(assertion, times(0)).received(1);
verify(assertion, times(1)).received(2L);
verify(assertion, times(0)).received(3);
}

@Test
public void testSkipViaGroovy() {
runGroovyScript("Observable.skip(Observable.toObservable(1, 2, 3), 2).subscribe(lambda{|result| a.received(result)});");
runGroovyScript("Observable.skip(Observable.from(1, 2, 3), 2).subscribe(lambda{|result| a.received(result)});");
verify(assertion, times(0)).received(1);
verify(assertion, times(0)).received(2);
verify(assertion, times(1)).received(3L);
}

@Test
public void testTakeViaGroovy() {
runGroovyScript("Observable.take(Observable.toObservable(1, 2, 3), 2).subscribe(lambda{|result| a.received(result)});");
runGroovyScript("Observable.take(Observable.from(1, 2, 3), 2).subscribe(lambda{|result| a.received(result)});");
verify(assertion, times(1)).received(1L);
verify(assertion, times(1)).received(2L);
verify(assertion, times(0)).received(3);
Expand Down Expand Up @@ -183,7 +183,7 @@ public static class TestFactory {
int counter = 1;

public Observable<Integer> getNumbers() {
return Observable.toObservable(1, 3, 2, 5, 4);
return Observable.from(1, 3, 2, 5, 4);
}

public TestObservable getObservable() {
Expand Down
2 changes: 1 addition & 1 deletion language-adaptors/rxjava-scala/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ This adaptor allows 'fn' functions to be used and RxJava will know how to invoke
This enables code such as:

```scala
Observable.toObservable("1", "2", "3")
Observable.from("1", "2", "3")
.take(2)
.subscribe((callback: String) => {
println(callback)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class UnitTestSuite extends JUnitSuite {
}

@Test def testTake() {
Observable.toObservable("1", "2", "3").take(1).subscribe(Map(
Observable.from("1", "2", "3").take(1).subscribe(Map(
"onNext" -> ((callback: String) => {
print("testTake: callback = " + callback)
assertion.received(callback)
Expand All @@ -133,14 +133,14 @@ class UnitTestSuite extends JUnitSuite {

@Test def testClosureVersusMap() {
// using closure
Observable.toObservable("1", "2", "3")
Observable.from("1", "2", "3")
.take(2)
.subscribe((callback: String) => {
println(callback)
})

// using Map of closures
Observable.toObservable("1", "2", "3")
Observable.from("1", "2", "3")
.take(2)
.subscribe(Map(
"onNext" -> ((callback: String) => {
Expand All @@ -149,7 +149,7 @@ class UnitTestSuite extends JUnitSuite {
}

@Test def testFilterWithToList() {
val numbers = Observable.toObservable[Int](1, 2, 3, 4, 5, 6, 7, 8, 9)
val numbers = Observable.from[Int](1, 2, 3, 4, 5, 6, 7, 8, 9)
numbers.filter((x: Int) => 0 == (x % 2)).toList().subscribe(
(callback: java.util.List[Int]) => {
val lst = callback.asScala.toList
Expand All @@ -161,7 +161,7 @@ class UnitTestSuite extends JUnitSuite {
}

@Test def testTakeLast() {
val numbers = Observable.toObservable[Int](1, 2, 3, 4, 5, 6, 7, 8, 9)
val numbers = Observable.from[Int](1, 2, 3, 4, 5, 6, 7, 8, 9)
numbers.takeLast(1).subscribe((callback: Int) => {
println("testTakeLast: onNext -> got " + callback)
assertion.received(callback)
Expand All @@ -170,7 +170,7 @@ class UnitTestSuite extends JUnitSuite {
}

@Test def testMap() {
val numbers = Observable.toObservable(1, 2, 3, 4, 5, 6, 7, 8, 9)
val numbers = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9)
val mappedNumbers = new ArrayBuffer[Int]()
numbers.map(((x: Int)=> { x * x })).subscribe(((squareVal: Int) => {
println("square is " + squareVal )
Expand All @@ -181,9 +181,9 @@ class UnitTestSuite extends JUnitSuite {
}

@Test def testZip() {
val numbers = Observable.toObservable(1, 2, 3)
val colors = Observable.toObservable("red", "green", "blue")
val characters = Observable.toObservable("lion-o", "cheetara", "panthro")
val numbers = Observable.from(1, 2, 3)
val colors = Observable.from("red", "green", "blue")
val characters = Observable.from("lion-o", "cheetara", "panthro")

Observable.zip(numbers.toList, colors.toList, characters.toList, ((n: java.util.List[Int], c: java.util.List[String], t: java.util.List[String]) => { Map(
"numbers" -> n,
Expand Down
Loading