diff --git a/language-adaptors/rxjava-groovy/README.md b/language-adaptors/rxjava-groovy/README.md index d1d9698699..68768e8d3e 100644 --- a/language-adaptors/rxjava-groovy/README.md +++ b/language-adaptors/rxjava-groovy/README.md @@ -6,7 +6,7 @@ This adaptor allows 'groovy.lang.Closure' functions to be used and RxJava will k This enables code such as: ```groovy - Observable.toObservable("one", "two", "three") + Observable.from("one", "two", "three") .take(2) .subscribe({arg -> println(arg)}) ``` diff --git a/language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/RxExamples.groovy b/language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/RxExamples.groovy index ae020035d8..971c65f330 100644 --- a/language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/RxExamples.groovy +++ b/language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/RxExamples.groovy @@ -26,7 +26,7 @@ import rx.util.functions.Func1; // -------------------------------------------------- def hello(String[] names) { - Observable.toObservable(names) + Observable.from(names) .subscribe({ println "Hello " + it + "!"}) } @@ -38,7 +38,7 @@ hello("Ben", "George") // -------------------------------------------------- def existingDataFromNumbers() { - Observable o = Observable.toObservable(1, 2, 3, 4, 5, 6); + Observable o = Observable.from(1, 2, 3, 4, 5, 6); } def existingDataFromNumbersUsingFrom() { @@ -46,7 +46,7 @@ def existingDataFromNumbersUsingFrom() { } def existingDataFromObjects() { - Observable o = Observable.toObservable("a", "b", "c"); + Observable o = Observable.from("a", "b", "c"); } def existingDataFromObjectsUsingFrom() { @@ -55,7 +55,7 @@ def existingDataFromObjectsUsingFrom() { def existingDataFromList() { def list = [5, 6, 7, 8] - Observable o = Observable.toObservable(list); + Observable o = Observable.from(list); } def existingDataFromListUsingFrom() { diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index d0ee285ae4..2340da979c 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -54,7 +54,7 @@ def class ObservableTests { @Test public void testFilter() { - Observable.filter(Observable.toObservable(1, 2, 3), {it >= 2}).subscribe({ result -> a.received(result)}); + Observable.filter(Observable.from(1, 2, 3), {it >= 2}).subscribe({ result -> a.received(result)}); verify(a, times(0)).received(1); verify(a, times(1)).received(2); verify(a, times(1)).received(3); @@ -62,12 +62,12 @@ def class ObservableTests { @Test public void testLast() { - assertEquals("three", Observable.toObservable("one", "two", "three").last()) + assertEquals("three", Observable.from("one", "two", "three").toBlockingObservable().last()) } @Test public void testLastWithPredicate() { - assertEquals("two", Observable.toObservable("one", "two", "three").last({ x -> x.length() == 3})) + assertEquals("two", Observable.from("one", "two", "three").toBlockingObservable().last({ x -> x.length() == 3})) } @Test @@ -78,7 +78,7 @@ def class ObservableTests { @Test public void testMap2() { - Observable.map(Observable.toObservable(1, 2, 3), {'hello_' + it}).subscribe({ result -> a.received(result)}); + Observable.map(Observable.from(1, 2, 3), {'hello_' + it}).subscribe({ result -> a.received(result)}); verify(a, times(1)).received("hello_" + 1); verify(a, times(1)).received("hello_" + 2); verify(a, times(1)).received("hello_" + 3); @@ -86,7 +86,7 @@ def class ObservableTests { @Test public void testMaterialize() { - Observable.materialize(Observable.toObservable(1, 2, 3)).subscribe({ result -> a.received(result)}); + Observable.materialize(Observable.from(1, 2, 3)).subscribe({ result -> a.received(result)}); // we expect 4 onNext calls: 3 for 1, 2, 3 ObservableNotification.OnNext and 1 for ObservableNotification.OnCompleted verify(a, times(4)).received(any(Notification.class)); verify(a, times(0)).error(any(Exception.class)); @@ -95,12 +95,12 @@ def class ObservableTests { @Test public void testMergeDelayError() { Observable.mergeDelayError( - Observable.toObservable(1, 2, 3), + Observable.from(1, 2, 3), Observable.merge( - Observable.toObservable(6), + Observable.from(6), Observable.error(new NullPointerException()), - Observable.toObservable(7)), - Observable.toObservable(4, 5)) + Observable.from(7)), + Observable.from(4, 5)) .subscribe( { result -> a.received(result)}, { exception -> a.error(exception)}); verify(a, times(1)).received(1); @@ -116,12 +116,12 @@ def class ObservableTests { @Test public void testMerge() { Observable.merge( - Observable.toObservable(1, 2, 3), + Observable.from(1, 2, 3), Observable.merge( - Observable.toObservable(6), + Observable.from(6), Observable.error(new NullPointerException()), - Observable.toObservable(7)), - Observable.toObservable(4, 5)) + Observable.from(7)), + Observable.from(4, 5)) .subscribe({ result -> a.received(result)}, { exception -> a.error(exception)}); // executing synchronously so we can deterministically know what order things will come @@ -158,7 +158,7 @@ def class ObservableTests { @Test public void testSkipTake() { - Observable.skip(Observable.toObservable(1, 2, 3), 1).take(1).subscribe({ result -> a.received(result)}); + Observable.skip(Observable.from(1, 2, 3), 1).take(1).subscribe({ result -> a.received(result)}); verify(a, times(0)).received(1); verify(a, times(1)).received(2); verify(a, times(0)).received(3); @@ -166,7 +166,7 @@ def class ObservableTests { @Test public void testSkip() { - Observable.skip(Observable.toObservable(1, 2, 3), 2).subscribe({ result -> a.received(result)}); + Observable.skip(Observable.from(1, 2, 3), 2).subscribe({ result -> a.received(result)}); verify(a, times(0)).received(1); verify(a, times(0)).received(2); verify(a, times(1)).received(3); @@ -174,7 +174,7 @@ def class ObservableTests { @Test public void testTake() { - Observable.take(Observable.toObservable(1, 2, 3), 2).subscribe({ result -> a.received(result)}); + Observable.take(Observable.from(1, 2, 3), 2).subscribe({ result -> a.received(result)}); verify(a, times(1)).received(1); verify(a, times(1)).received(2); verify(a, times(0)).received(3); @@ -188,7 +188,7 @@ def class ObservableTests { @Test public void testTakeWhileViaGroovy() { - Observable.takeWhile(Observable.toObservable(1, 2, 3), { x -> x < 3}).subscribe({ result -> a.received(result)}); + Observable.takeWhile(Observable.from(1, 2, 3), { x -> x < 3}).subscribe({ result -> a.received(result)}); verify(a, times(1)).received(1); verify(a, times(1)).received(2); verify(a, times(0)).received(3); @@ -196,7 +196,7 @@ def class ObservableTests { @Test public void testTakeWhileWithIndexViaGroovy() { - Observable.takeWhileWithIndex(Observable.toObservable(1, 2, 3), { x, i -> i < 2}).subscribe({ result -> a.received(result)}); + Observable.takeWhileWithIndex(Observable.from(1, 2, 3), { x, i -> i < 2}).subscribe({ result -> a.received(result)}); verify(a, times(1)).received(1); verify(a, times(1)).received(2); verify(a, times(0)).received(3); @@ -210,7 +210,7 @@ def class ObservableTests { @Test public void testToSortedListStatic() { - Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4)).subscribe({ result -> a.received(result)}); + Observable.toSortedList(Observable.from(1, 3, 2, 5, 4)).subscribe({ result -> a.received(result)}); verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5)); } @@ -222,7 +222,7 @@ def class ObservableTests { @Test public void testToSortedListWithFunctionStatic() { - Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)}); + Observable.toSortedList(Observable.from(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)}); verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5)); } @@ -246,29 +246,29 @@ def class ObservableTests { @Test public void testLastOrDefault() { - def val = Observable.toObservable("one", "two").lastOrDefault("default", { x -> x.length() == 3}) + def val = Observable.from("one", "two").toBlockingObservable().lastOrDefault("default", { x -> x.length() == 3}) assertEquals("two", val) } @Test public void testLastOrDefault2() { - def val = Observable.toObservable("one", "two").lastOrDefault("default", { x -> x.length() > 3}) + def val = Observable.from("one", "two").toBlockingObservable().lastOrDefault("default", { x -> x.length() > 3}) assertEquals("default", val) } public void testSingle1() { - def s = Observable.toObservable("one").single({ x -> x.length() == 3}) + def s = Observable.from("one").toBlockingObservable().single({ x -> x.length() == 3}) assertEquals("one", s) } @Test(expected = IllegalStateException.class) public void testSingle2() { - Observable.toObservable("one", "two").single({ x -> x.length() == 3}) + Observable.from("one", "two").toBlockingObservable().single({ x -> x.length() == 3}) } @Test public void testDefer() { - def obs = Observable.toObservable(1, 2) + def obs = Observable.from(1, 2) Observable.defer({-> obs }).subscribe({ result -> a.received(result)}) verify(a, times(1)).received(1); verify(a, times(1)).received(2); @@ -277,7 +277,7 @@ def class ObservableTests { @Test public void testAll() { - Observable.toObservable(1, 2, 3).all({ x -> x > 0 }).subscribe({ result -> a.received(result) }); + Observable.from(1, 2, 3).all({ x -> x > 0 }).subscribe({ result -> a.received(result) }); verify(a, times(1)).received(true); } @@ -305,7 +305,7 @@ def class ObservableTests { int counter = 1; public Observable getNumbers() { - return Observable.toObservable(1, 3, 2, 5, 4); + return Observable.from(1, 3, 2, 5, 4); } public TestObservable getObservable() { diff --git a/language-adaptors/rxjava-jruby/README.md b/language-adaptors/rxjava-jruby/README.md index 1a77557a06..9aec635da3 100644 --- a/language-adaptors/rxjava-jruby/README.md +++ b/language-adaptors/rxjava-jruby/README.md @@ -6,7 +6,7 @@ This adaptor allows `org.jruby.RubyProc` lambda functions to be used and RxJava This enables code such as: ```ruby - Observable.toObservable("one", "two", "three") + Observable.from("one", "two", "three") .take(2) .subscribe(lambda { |arg| puts arg }) ``` diff --git a/language-adaptors/rxjava-jruby/src/main/java/rx/lang/jruby/JRubyAdaptor.java b/language-adaptors/rxjava-jruby/src/main/java/rx/lang/jruby/JRubyAdaptor.java index c5614b12b9..d66dc16acf 100644 --- a/language-adaptors/rxjava-jruby/src/main/java/rx/lang/jruby/JRubyAdaptor.java +++ b/language-adaptors/rxjava-jruby/src/main/java/rx/lang/jruby/JRubyAdaptor.java @@ -76,7 +76,7 @@ public void testCreateViaGroovy() { @Test public void testFilterViaGroovy() { - runGroovyScript("Observable.filter(Observable.toObservable(1, 2, 3), lambda{|it| it >= 2}).subscribe(lambda{|result| a.received(result)});"); + runGroovyScript("Observable.filter(Observable.from(1, 2, 3), lambda{|it| it >= 2}).subscribe(lambda{|result| a.received(result)});"); verify(assertion, times(0)).received(1L); verify(assertion, times(1)).received(2L); verify(assertion, times(1)).received(3L); @@ -98,7 +98,7 @@ public void testMap() { @Test public void testMaterializeViaGroovy() { - runGroovyScript("Observable.materialize(Observable.toObservable(1, 2, 3)).subscribe(lambda{|result| a.received(result)});"); + runGroovyScript("Observable.materialize(Observable.from(1, 2, 3)).subscribe(lambda{|result| a.received(result)});"); // we expect 4 onNext calls: 3 for 1, 2, 3 ObservableNotification.OnNext and 1 for ObservableNotification.OnCompleted verify(assertion, times(4)).received(any(Notification.class)); verify(assertion, times(0)).error(any(Exception.class)); @@ -129,7 +129,7 @@ public void testScriptWithOnNext() { @Test public void testSkipTakeViaGroovy() { - runGroovyScript("Observable.skip(Observable.toObservable(1, 2, 3), 1).take(1).subscribe(lambda{|result| a.received(result)});"); + runGroovyScript("Observable.skip(Observable.from(1, 2, 3), 1).take(1).subscribe(lambda{|result| a.received(result)});"); verify(assertion, times(0)).received(1); verify(assertion, times(1)).received(2L); verify(assertion, times(0)).received(3); @@ -137,7 +137,7 @@ public void testSkipTakeViaGroovy() { @Test public void testSkipViaGroovy() { - runGroovyScript("Observable.skip(Observable.toObservable(1, 2, 3), 2).subscribe(lambda{|result| a.received(result)});"); + runGroovyScript("Observable.skip(Observable.from(1, 2, 3), 2).subscribe(lambda{|result| a.received(result)});"); verify(assertion, times(0)).received(1); verify(assertion, times(0)).received(2); verify(assertion, times(1)).received(3L); @@ -145,7 +145,7 @@ public void testSkipViaGroovy() { @Test public void testTakeViaGroovy() { - runGroovyScript("Observable.take(Observable.toObservable(1, 2, 3), 2).subscribe(lambda{|result| a.received(result)});"); + runGroovyScript("Observable.take(Observable.from(1, 2, 3), 2).subscribe(lambda{|result| a.received(result)});"); verify(assertion, times(1)).received(1L); verify(assertion, times(1)).received(2L); verify(assertion, times(0)).received(3); @@ -183,7 +183,7 @@ public static class TestFactory { int counter = 1; public Observable getNumbers() { - return Observable.toObservable(1, 3, 2, 5, 4); + return Observable.from(1, 3, 2, 5, 4); } public TestObservable getObservable() { diff --git a/language-adaptors/rxjava-scala/README.md b/language-adaptors/rxjava-scala/README.md index ff3443f2d1..3d11b042a3 100644 --- a/language-adaptors/rxjava-scala/README.md +++ b/language-adaptors/rxjava-scala/README.md @@ -6,7 +6,7 @@ This adaptor allows 'fn' functions to be used and RxJava will know how to invoke This enables code such as: ```scala -Observable.toObservable("1", "2", "3") +Observable.from("1", "2", "3") .take(2) .subscribe((callback: String) => { println(callback) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ScalaAdaptor.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ScalaAdaptor.scala index 584c7c587f..12418d516d 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ScalaAdaptor.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ScalaAdaptor.scala @@ -122,7 +122,7 @@ class UnitTestSuite extends JUnitSuite { } @Test def testTake() { - Observable.toObservable("1", "2", "3").take(1).subscribe(Map( + Observable.from("1", "2", "3").take(1).subscribe(Map( "onNext" -> ((callback: String) => { print("testTake: callback = " + callback) assertion.received(callback) @@ -133,14 +133,14 @@ class UnitTestSuite extends JUnitSuite { @Test def testClosureVersusMap() { // using closure - Observable.toObservable("1", "2", "3") + Observable.from("1", "2", "3") .take(2) .subscribe((callback: String) => { println(callback) }) // using Map of closures - Observable.toObservable("1", "2", "3") + Observable.from("1", "2", "3") .take(2) .subscribe(Map( "onNext" -> ((callback: String) => { @@ -149,7 +149,7 @@ class UnitTestSuite extends JUnitSuite { } @Test def testFilterWithToList() { - val numbers = Observable.toObservable[Int](1, 2, 3, 4, 5, 6, 7, 8, 9) + val numbers = Observable.from[Int](1, 2, 3, 4, 5, 6, 7, 8, 9) numbers.filter((x: Int) => 0 == (x % 2)).toList().subscribe( (callback: java.util.List[Int]) => { val lst = callback.asScala.toList @@ -161,7 +161,7 @@ class UnitTestSuite extends JUnitSuite { } @Test def testTakeLast() { - val numbers = Observable.toObservable[Int](1, 2, 3, 4, 5, 6, 7, 8, 9) + val numbers = Observable.from[Int](1, 2, 3, 4, 5, 6, 7, 8, 9) numbers.takeLast(1).subscribe((callback: Int) => { println("testTakeLast: onNext -> got " + callback) assertion.received(callback) @@ -170,7 +170,7 @@ class UnitTestSuite extends JUnitSuite { } @Test def testMap() { - val numbers = Observable.toObservable(1, 2, 3, 4, 5, 6, 7, 8, 9) + val numbers = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9) val mappedNumbers = new ArrayBuffer[Int]() numbers.map(((x: Int)=> { x * x })).subscribe(((squareVal: Int) => { println("square is " + squareVal ) @@ -181,9 +181,9 @@ class UnitTestSuite extends JUnitSuite { } @Test def testZip() { - val numbers = Observable.toObservable(1, 2, 3) - val colors = Observable.toObservable("red", "green", "blue") - val characters = Observable.toObservable("lion-o", "cheetara", "panthro") + val numbers = Observable.from(1, 2, 3) + val colors = Observable.from("red", "green", "blue") + val characters = Observable.from("lion-o", "cheetara", "panthro") Observable.zip(numbers.toList, colors.toList, characters.toList, ((n: java.util.List[Int], c: java.util.List[String], t: java.util.List[String]) => { Map( "numbers" -> n, diff --git a/rxjava-contrib/rxjava-swing/src/main/java/rx/concurrency/SwingScheduler.java b/rxjava-contrib/rxjava-swing/src/main/java/rx/concurrency/SwingScheduler.java index 3ab99f35df..ced759f6f5 100644 --- a/rxjava-contrib/rxjava-swing/src/main/java/rx/concurrency/SwingScheduler.java +++ b/rxjava-contrib/rxjava-swing/src/main/java/rx/concurrency/SwingScheduler.java @@ -15,12 +15,13 @@ */ package rx.concurrency; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import static org.mockito.Mockito.*; import java.awt.EventQueue; import java.awt.event.ActionEvent; import java.awt.event.ActionListener; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -41,7 +42,7 @@ import rx.util.functions.Func2; /** - * Executes work on the Swing UI thread. + * Executes work on the Swing UI thread. * This scheduler should only be used with actions that execute quickly. */ public final class SwingScheduler extends Scheduler { @@ -77,33 +78,33 @@ public void call() { @Override public Subscription schedule(final T state, final Func2 action, long dueTime, TimeUnit unit) { final AtomicReference sub = new AtomicReference(); - long delay = unit.toMillis(dueTime); + long delay = unit.toMillis(dueTime); assertThatTheDelayIsValidForTheSwingTimer(delay); - + class ExecuteOnceAction implements ActionListener { private Timer timer; - + private void setTimer(Timer timer) { this.timer = timer; } - + @Override public void actionPerformed(ActionEvent e) { timer.stop(); sub.set(action.call(SwingScheduler.this, state)); } } - + ExecuteOnceAction executeOnce = new ExecuteOnceAction(); final Timer timer = new Timer((int) delay, executeOnce); executeOnce.setTimer(timer); timer.start(); - + return Subscriptions.create(new Action0() { @Override public void call() { timer.stop(); - + Subscription subscription = sub.get(); if (subscription != null) { subscription.unsubscribe(); @@ -115,28 +116,28 @@ public void call() { @Override public Subscription schedulePeriodically(T state, final Func2 action, long initialDelay, long period, TimeUnit unit) { final AtomicReference timer = new AtomicReference(); - - final long delay = unit.toMillis(period); + + final long delay = unit.toMillis(period); assertThatTheDelayIsValidForTheSwingTimer(delay); - + final CompositeSubscription subscriptions = new CompositeSubscription(); final Func2 initialAction = new Func2() { - @Override - public Subscription call(final Scheduler scheduler, final T state0) { - // start timer for periodic execution, collect subscriptions - timer.set(new Timer((int) delay, new ActionListener() { - @Override - public void actionPerformed(ActionEvent e) { - subscriptions.add(action.call(scheduler, state0)); - } - })); - timer.get().start(); - - return action.call(scheduler, state0); - } + @Override + public Subscription call(final Scheduler scheduler, final T state0) { + // start timer for periodic execution, collect subscriptions + timer.set(new Timer((int) delay, new ActionListener() { + @Override + public void actionPerformed(ActionEvent e) { + subscriptions.add(action.call(scheduler, state0)); + } + })); + timer.get().start(); + + return action.call(scheduler, state0); + } }; subscriptions.add(schedule(state, initialAction, initialDelay, unit)); - + subscriptions.add(Subscriptions.create(new Action0() { @Override public void call() { @@ -147,7 +148,7 @@ public void call() { } } })); - + return subscriptions; } @@ -156,11 +157,11 @@ private static void assertThatTheDelayIsValidForTheSwingTimer(long delay) { throw new IllegalArgumentException(String.format("The swing timer only accepts non-negative delays up to %d milliseconds.", Integer.MAX_VALUE)); } } - + public static class UnitTest { @Rule public ExpectedException exception = ExpectedException.none(); - + @Test public void testInvalidDelayValues() { final SwingScheduler scheduler = new SwingScheduler(); @@ -174,34 +175,44 @@ public void testInvalidDelayValues() { exception.expect(IllegalArgumentException.class); scheduler.schedulePeriodically(action, 1L + Integer.MAX_VALUE, 100L, TimeUnit.MILLISECONDS); - + exception.expect(IllegalArgumentException.class); scheduler.schedulePeriodically(action, 100L, 1L + Integer.MAX_VALUE / 1000, TimeUnit.SECONDS); } - + @Test public void testPeriodicScheduling() throws Exception { final SwingScheduler scheduler = new SwingScheduler(); + final CountDownLatch latch = new CountDownLatch(4); + final Action0 innerAction = mock(Action0.class); final Action0 unsubscribe = mock(Action0.class); final Func0 action = new Func0() { @Override public Subscription call() { - innerAction.call(); - assertTrue(SwingUtilities.isEventDispatchThread()); - return Subscriptions.create(unsubscribe); + try { + innerAction.call(); + assertTrue(SwingUtilities.isEventDispatchThread()); + return Subscriptions.create(unsubscribe); + } finally { + latch.countDown(); + } } }; - + Subscription sub = scheduler.schedulePeriodically(action, 50, 200, TimeUnit.MILLISECONDS); - Thread.sleep(840); + + if (!latch.await(5000, TimeUnit.MILLISECONDS)) { + fail("timed out waiting for tasks to execute"); + } + sub.unsubscribe(); waitForEmptyEventQueue(); verify(innerAction, times(4)).call(); verify(unsubscribe, times(4)).call(); } - + @Test public void testNestedActions() throws Exception { final SwingScheduler scheduler = new SwingScheduler(); diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 208e4c54d7..a7169c20a4 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -36,8 +35,11 @@ import org.mockito.Mockito; import org.mockito.MockitoAnnotations; +import rx.observables.BlockingObservable; import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; +import rx.operators.AtomicObservableSubscription; +import rx.operators.AtomicObserver; import rx.operators.OperationAll; import rx.operators.OperationCache; import rx.operators.OperationConcat; @@ -50,9 +52,7 @@ import rx.operators.OperationMaterialize; import rx.operators.OperationMerge; import rx.operators.OperationMergeDelayError; -import rx.operators.OperationMostRecent; import rx.operators.OperationMulticast; -import rx.operators.OperationNext; import rx.operators.OperationObserveOn; import rx.operators.OperationOnErrorResumeNextViaFunction; import rx.operators.OperationOnErrorResumeNextViaObservable; @@ -67,8 +67,6 @@ import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; import rx.operators.OperationTimestamp; -import rx.operators.OperationToFuture; -import rx.operators.OperationToIterator; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; import rx.operators.OperationToObservableList; @@ -83,8 +81,6 @@ import rx.subjects.Subject; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; -import rx.util.AtomicObservableSubscription; -import rx.util.AtomicObserver; import rx.util.Range; import rx.util.Timestamped; import rx.util.functions.Action0; @@ -633,74 +629,6 @@ public ConnectableObservable multicast(Subject subject) { return multicast(this, subject); } - /** - * Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence. - * - * @return The single element in the observable sequence. - */ - public T single() { - return single(this); - } - - /** - * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. - * - * @param predicate - * A predicate function to evaluate for elements in the sequence. - * @return The single element in the observable sequence. - */ - public T single(Func1 predicate) { - return single(this, predicate); - } - - /** - * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. - * - * @param predicate - * A predicate function to evaluate for elements in the sequence. - * @return The single element in the observable sequence. - */ - public T single(Object predicate) { - return single(this, predicate); - } - - /** - * Returns the only element of an observable sequence, or a default value if the observable sequence is empty. - * - * @param defaultValue - * default value for a sequence. - * @return The single element in the observable sequence, or a default value if no value is found. - */ - public T singleOrDefault(T defaultValue) { - return singleOrDefault(this, defaultValue); - } - - /** - * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. - * - * @param defaultValue - * default value for a sequence. - * @param predicate - * A predicate function to evaluate for elements in the sequence. - * @return The single element in the observable sequence, or a default value if no value is found. - */ - public T singleOrDefault(T defaultValue, Func1 predicate) { - return singleOrDefault(this, defaultValue, predicate); - } - - /** - * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. - * - * @param defaultValue - * default value for a sequence. - * @param predicate - * A predicate function to evaluate for elements in the sequence. - * @return The single element in the observable sequence, or a default value if no value is found. - */ - public T singleOrDefault(T defaultValue, Object predicate) { - return singleOrDefault(this, defaultValue, predicate); - } - /** * Allow the {@link RxJavaErrorHandler} to receive the exception from onError. * @@ -1038,116 +966,6 @@ public static Observable just(T value) { return toObservable(list); } - /** - * Returns the last element of an observable sequence with a specified source. - * - * @param that - * the source Observable - * @return the last element in the observable sequence. - */ - public static T last(final Observable that) { - T result = null; - for (T value : that.toIterable()) { - result = value; - } - return result; - } - - /** - * Returns the last element of an observable sequence that matches the predicate. - * - * @param that - * the source Observable - * @param predicate - * a predicate function to evaluate for elements in the sequence. - * @return the last element in the observable sequence. - */ - public static T last(final Observable that, final Func1 predicate) { - return last(that.filter(predicate)); - } - - /** - * Returns the last element of an observable sequence that matches the predicate. - * - * @param that - * the source Observable - * @param predicate - * a predicate function to evaluate for elements in the sequence. - * @return the last element in the observable sequence. - */ - public static T last(final Observable that, final Object predicate) { - return last(that.filter(predicate)); - } - - /** - * Returns the last element of an observable sequence, or a default value if no value is found. - * - * @param source - * the source observable. - * @param defaultValue - * a default value that would be returned if observable is empty. - * @param - * the type of source. - * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. - */ - public static T lastOrDefault(Observable source, T defaultValue) { - boolean found = false; - T result = null; - - for (T value : source.toIterable()) { - found = true; - result = value; - } - - if (!found) { - return defaultValue; - } - - return result; - } - - /** - * Returns the last element of an observable sequence that matches the predicate, or a default value if no value is found. - * - * @param source - * the source observable. - * @param defaultValue - * a default value that would be returned if observable is empty. - * @param predicate - * a predicate function to evaluate for elements in the sequence. - * @param - * the type of source. - * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. - */ - public static T lastOrDefault(Observable source, T defaultValue, Func1 predicate) { - return lastOrDefault(source.filter(predicate), defaultValue); - } - - /** - * Returns the last element of an observable sequence that matches the predicate, or a default value if no value is found. - * - * @param source - * the source observable. - * @param defaultValue - * a default value that would be returned if observable is empty. - * @param predicate - * a predicate function to evaluate for elements in the sequence. - * @param - * the type of source. - * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. - */ - public static T lastOrDefault(Observable source, T defaultValue, Object predicate) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(predicate); - - return lastOrDefault(source, defaultValue, new Func1() { - @Override - public Boolean call(T args) { - return (Boolean) _f.call(args); - } - }); - } - /** * Applies a function of your choosing to every notification emitted by an Observable, and returns * this transformation as a new Observable sequence. @@ -2105,19 +1923,6 @@ public Observable> timestamp() { return create(OperationTimestamp.timestamp(this)); } - /** - * Return a Future representing a single value of the Observable. - *

- * This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use toList().toFuture(). - * - * @param that - * the source Observable - * @return a Future that expects a single item emitted by the source Observable - */ - public static Future toFuture(final Observable that) { - return OperationToFuture.toFuture(that); - } - /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. @@ -2141,64 +1946,6 @@ public static Observable> toList(final Observable that) { return create(OperationToObservableList.toObservableList(that)); } - /** - * Converts an observable sequence to an Iterable. - * - * @param that - * the source Observable - * @return Observable converted to Iterable. - */ - public static Iterable toIterable(final Observable that) { - - return new Iterable() { - @Override - public Iterator iterator() { - return getIterator(that); - } - }; - } - - /** - * Returns an iterator that iterates all values of the observable. - * - * @param that - * an observable sequence to get an iterator for. - * @param - * the type of source. - * @return the iterator that could be used to iterate over the elements of the observable. - */ - public static Iterator getIterator(Observable that) { - return OperationToIterator.toIterator(that); - } - - /** - * Samples the next value (blocking without buffering) from in an observable sequence. - * - * @param items - * the source observable sequence. - * @param - * the type of observable. - * @return iterable that blocks upon each iteration until the next element in the observable source sequence becomes available. - */ - public static Iterable next(Observable items) { - return OperationNext.next(items); - } - - /** - * Samples the most recent value in an observable sequence. - * - * @param source - * the source observable sequence. - * @param - * the type of observable. - * @param initialValue - * the initial value that will be yielded by the enumerable sequence if no element has been sampled yet. - * @return the iterable that returns the last sampled element upon each iteration. - */ - public static Iterable mostRecent(Observable source, T initialValue) { - return OperationMostRecent.mostRecent(source, initialValue); - } - /** * Returns a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. * @@ -2216,127 +1963,6 @@ public static ConnectableObservable multicast(Observable source, fi return OperationMulticast.multicast(source, subject); } - /** - * Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence. - * - * @param that - * the source Observable - * @return The single element in the observable sequence. - * @throws IllegalStateException - * if there is not exactly one element in the observable sequence - */ - public static T single(Observable that) { - return singleOrDefault(that, false, null); - } - - /** - * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. - * - * @param that - * the source Observable - * @param predicate - * A predicate function to evaluate for elements in the sequence. - * @return The single element in the observable sequence. - * @throws IllegalStateException - * if there is not exactly one element in the observable sequence that matches the predicate - */ - public static T single(Observable that, Func1 predicate) { - return single(that.filter(predicate)); - } - - /** - * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. - * - * @param that - * the source Observable - * @param predicate - * A predicate function to evaluate for elements in the sequence. - * @return The single element in the observable sequence. - * @throws IllegalStateException - * if there is not exactly one element in the observable sequence that matches the predicate - */ - public static T single(Observable that, Object predicate) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(predicate); - - return single(that, new Func1() { - @Override - public Boolean call(T t) { - return (Boolean) _f.call(t); - } - }); - } - - /** - * Returns the only element of an observable sequence, or a default value if the observable sequence is empty. - * - * @param that - * the source Observable - * @param defaultValue - * default value for a sequence. - * @return The single element in the observable sequence, or a default value if no value is found. - */ - public static T singleOrDefault(Observable that, T defaultValue) { - return singleOrDefault(that, true, defaultValue); - } - - /** - * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. - * - * @param that - * the source Observable - * @param defaultValue - * default value for a sequence. - * @param predicate - * A predicate function to evaluate for elements in the sequence. - * @return The single element in the observable sequence, or a default value if no value is found. - */ - public static T singleOrDefault(Observable that, T defaultValue, Func1 predicate) { - return singleOrDefault(that.filter(predicate), defaultValue); - } - - /** - * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. - * - * @param that - * the source Observable - * @param defaultValue - * default value for a sequence. - * @param predicate - * A predicate function to evaluate for elements in the sequence. - * @return The single element in the observable sequence, or a default value if no value is found. - */ - public static T singleOrDefault(Observable that, T defaultValue, Object predicate) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(predicate); - - return singleOrDefault(that, defaultValue, new Func1() { - @Override - public Boolean call(T t) { - return (Boolean) _f.call(t); - } - }); - } - - private static T singleOrDefault(Observable that, boolean hasDefault, T defaultVal) { - Iterator it = that.toIterable().iterator(); - - if (!it.hasNext()) { - if (hasDefault) { - return defaultVal; - } - throw new IllegalStateException("Expected single entry. Actually empty stream."); - } - - T result = it.next(); - - if (it.hasNext()) { - throw new IllegalStateException("Expected single entry. Actually more than one entry."); - } - - return result; - } - /** * Converts an Iterable sequence to an Observable sequence. * @@ -2370,11 +1996,31 @@ public static Observable toObservable(Iterable iterable) { * the type of of object that the future's returns and the type emitted by the resulting * Observable * @return an Observable that emits the item from the source Future + * @deprecated Replaced by {@link #from(Future)} */ public static Observable toObservable(Future future) { return create(OperationToObservableFuture.toObservableFuture(future)); } + /** + * Converts an Future to an Observable sequence. + * + * Any object that supports the {@link Future} interface can be converted into an Observable that emits + * the return value of the get() method in the object, by passing the object into the toObservable method. + *

+ * This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing. + * + * @param future + * the source {@link Future} + * @param + * the type of of object that the future's returns and the type emitted by the resulting + * Observable + * @return an Observable that emits the item from the source Future + */ + public static Observable from(Future future) { + return create(OperationToObservableFuture.toObservableFuture(future)); + } + /** * Converts an Future to an Observable sequence. * @@ -2394,11 +2040,36 @@ public static Observable toObservable(Future future) { * the type of of object that the future's returns and the type emitted by the resulting * Observable * @return an Observable that emits the item from the source Future + * @deprecated Replaced by {@link #from(Future, long, TimeUnit)} */ public static Observable toObservable(Future future, long timeout, TimeUnit unit) { return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit)); } + /** + * Converts an Future to an Observable sequence. + * + * Any object that supports the {@link Future} interface can be converted into an Observable that emits + * the return value of the get() method in the object, by passing the object into the toObservable method. + * The subscribe method on this synchronously so the Subscription returned doesn't nothing. + *

+ * This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing. + * + * @param future + * the source {@link Future} + * @param timeout + * the maximum time to wait + * @param unit + * the time unit of the time argument + * @param + * the type of of object that the future's returns and the type emitted by the resulting + * Observable + * @return an Observable that emits the item from the source Future + */ + public static Observable from(Future future, long timeout, TimeUnit unit) { + return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit)); + } + /** * Converts an Array sequence to an Observable sequence. * @@ -2413,6 +2084,7 @@ public static Observable toObservable(Future future, long timeout, Tim * the type of items in the Array, and the type of items emitted by the resulting * Observable * @return an Observable that emits each item in the source Array + * @deprecated Use {@link #from(Object...)} */ public static Observable toObservable(T... items) { return toObservable(Arrays.asList(items)); @@ -2845,82 +2517,6 @@ public Observable where(Func1 predicate) { return where(this, predicate); } - /** - * Returns the last element of an observable sequence with a specified source. - * - * @return the last element in the observable sequence. - */ - public T last() { - return last(this); - } - - /** - * Returns the last element of an observable sequence that matches the predicate. - * - * @param predicate - * a predicate function to evaluate for elements in the sequence. - * @return the last element in the observable sequence. - */ - public T last(final Func1 predicate) { - return last(this, predicate); - } - - /** - * Returns the last element of an observable sequence that matches the predicate. - * - * @param predicate - * a predicate function to evaluate for elements in the sequence. - * @return the last element in the observable sequence. - */ - public T last(final Object predicate) { - @SuppressWarnings("rawtypes") - final FuncN _f = Functions.from(predicate); - - return last(this, new Func1() { - @Override - public Boolean call(T args) { - return (Boolean) _f.call(args); - } - }); - } - - /** - * Returns the last element, or a default value if no value is found. - * - * @param defaultValue - * a default value that would be returned if observable is empty. - * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. - */ - public T lastOrDefault(T defaultValue) { - return lastOrDefault(this, defaultValue); - } - - /** - * Returns the last element that matches the predicate, or a default value if no value is found. - * - * @param defaultValue - * a default value that would be returned if observable is empty. - * @param predicate - * a predicate function to evaluate for elements in the sequence. - * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. - */ - public T lastOrDefault(T defaultValue, Func1 predicate) { - return lastOrDefault(this, defaultValue, predicate); - } - - /** - * Returns the last element that matches the predicate, or a default value if no value is found. - * - * @param defaultValue - * a default value that would be returned if observable is empty. - * @param predicate - * a predicate function to evaluate for elements in the sequence. - * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. - */ - public T lastOrDefault(T defaultValue, Object predicate) { - return lastOrDefault(this, defaultValue, predicate); - } - /** * Applies a function of your choosing to every item emitted by an Observable, and returns this * transformation as a new Observable sequence. @@ -3561,17 +3157,6 @@ public Observable takeUntil(Observable other) { return takeUntil(this, other); } - /** - * Return a Future representing a single value of the Observable. - *

- * This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use toList().toFuture(). - * - * @return a Future that expects a single item emitted by the source Observable - */ - public Future toFuture() { - return toFuture(this); - } - /** * Returns an Observable that emits a single item, a list composed of all the items emitted by * the source Observable. @@ -3629,15 +3214,6 @@ public Observable> toSortedList(final Object sortFunction) { return toSortedList(this, sortFunction); } - /** - * Converts an observable sequence to an Iterable. - * - * @return Observable converted to Iterable. - */ - public Iterable toIterable() { - return toIterable(this); - } - @SuppressWarnings("unchecked") public Observable startWith(T... values) { return concat(Observable. from(values), this); @@ -3673,33 +3249,8 @@ public Observable> groupBy(final Func1 keySele return groupBy(this, keySelector); } - /** - * Returns an iterator that iterates all values of the observable. - * - * @return the iterator that could be used to iterate over the elements of the observable. - */ - public Iterator getIterator() { - return getIterator(this); - } - - /** - * Samples the next value (blocking without buffering) from in an observable sequence. - * - * @return iterable that blocks upon each iteration until the next element in the observable source sequence becomes available. - */ - public Iterable next() { - return next(this); - } - - /** - * Samples the most recent value in an observable sequence. - * - * @param initialValue - * the initial value that will be yielded by the enumerable sequence if no element has been sampled yet. - * @return the iterable that returns the last sampled element upon each iteration. - */ - public Iterable mostRecent(T initialValue) { - return mostRecent(this, initialValue); + public BlockingObservable toBlockingObservable() { + return BlockingObservable.from(this); } /** @@ -3802,182 +3353,6 @@ public void testSequenceEqual() { verify(result, times(1)).onNext(false); } - @Test - public void testToIterable() { - Observable obs = toObservable("one", "two", "three"); - - Iterator it = obs.toIterable().iterator(); - - assertEquals(true, it.hasNext()); - assertEquals("one", it.next()); - - assertEquals(true, it.hasNext()); - assertEquals("two", it.next()); - - assertEquals(true, it.hasNext()); - assertEquals("three", it.next()); - - assertEquals(false, it.hasNext()); - - } - - @Test(expected = TestException.class) - public void testToIterableWithException() { - Observable obs = create(new Func1, Subscription>() { - - @Override - public Subscription call(Observer observer) { - observer.onNext("one"); - observer.onError(new TestException()); - return Subscriptions.empty(); - } - }); - - Iterator it = obs.toIterable().iterator(); - - assertEquals(true, it.hasNext()); - assertEquals("one", it.next()); - - assertEquals(true, it.hasNext()); - it.next(); - - } - - @Test - public void testLastOrDefault1() { - Observable observable = toObservable("one", "two", "three"); - assertEquals("three", observable.lastOrDefault("default")); - } - - @Test - public void testLastOrDefault2() { - Observable observable = toObservable(); - assertEquals("default", observable.lastOrDefault("default")); - } - - @Test - public void testLastOrDefault() { - Observable observable = toObservable(1, 0, -1); - int last = observable.lastOrDefault(-100, new Func1() { - @Override - public Boolean call(Integer args) { - return args >= 0; - } - }); - assertEquals(0, last); - } - - @Test - public void testLastOrDefaultWrongPredicate() { - Observable observable = toObservable(-1, -2, -3); - int last = observable.lastOrDefault(0, new Func1() { - @Override - public Boolean call(Integer args) { - return args >= 0; - } - }); - assertEquals(0, last); - } - - @Test - public void testLastOrDefaultWithPredicate() { - Observable observable = toObservable(1, 0, -1); - int last = observable.lastOrDefault(0, new Func1() { - @Override - public Boolean call(Integer args) { - return args < 0; - } - }); - - assertEquals(-1, last); - } - - public void testSingle() { - Observable observable = toObservable("one"); - assertEquals("one", observable.single()); - } - - @Test - public void testSingleDefault() { - Observable observable = toObservable(); - assertEquals("default", observable.singleOrDefault("default")); - } - - @Test(expected = IllegalStateException.class) - public void testSingleDefaultWithMoreThanOne() { - Observable observable = toObservable("one", "two", "three"); - observable.singleOrDefault("default"); - } - - @Test - public void testSingleWithPredicateDefault() { - Observable observable = toObservable("one", "two", "four"); - assertEquals("four", observable.single(new Func1() { - @Override - public Boolean call(String s) { - return s.length() == 4; - } - })); - } - - @Test(expected = IllegalStateException.class) - public void testSingleWrong() { - Observable observable = toObservable(1, 2); - observable.single(); - } - - @Test(expected = IllegalStateException.class) - public void testSingleWrongPredicate() { - Observable observable = toObservable(-1); - observable.single(new Func1() { - @Override - public Boolean call(Integer args) { - return args > 0; - } - }); - } - - @Test - public void testSingleDefaultPredicateMatchesNothing() { - Observable observable = toObservable("one", "two"); - String result = observable.singleOrDefault("default", new Func1() { - @Override - public Boolean call(String args) { - return args.length() == 4; - } - }); - assertEquals("default", result); - } - - @Test(expected = IllegalStateException.class) - public void testSingleDefaultPredicateMatchesMoreThanOne() { - toObservable("one", "two").singleOrDefault("default", new Func1() { - @Override - public Boolean call(String args) { - return args.length() == 3; - } - }); - } - - @Test - public void testLast() { - Observable obs = Observable.toObservable("one", "two", "three"); - - assertEquals("three", obs.last()); - } - - @Test - public void testLastWithPredicate() { - Observable obs = Observable.toObservable("one", "two", "three"); - - assertEquals("two", obs.last(new Func1() { - @Override - public Boolean call(String s) { - return s.length() == 3; - } - })); - } - @Test public void testOnSubscribeFails() { @SuppressWarnings("unchecked") @@ -3997,13 +3372,6 @@ public Subscription call(Observer t1) { verify(observer, times(1)).onError(re); } - @Test - public void testLastEmptyObservable() { - Observable obs = Observable.toObservable(); - - assertNull(obs.last()); - } - @Test public void testMaterializeDematerializeChaining() { Observable obs = Observable.just(1); @@ -4377,10 +3745,6 @@ public void call(String v) { assertEquals(1, counter.get()); } - private static class TestException extends RuntimeException { - private static final long serialVersionUID = 1L; - } - } } diff --git a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java index 36b304f495..ba3dd0473d 100644 --- a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java +++ b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java @@ -19,7 +19,7 @@ import rx.Scheduler; import rx.Subscription; -import rx.util.AtomicObservableSubscription; +import rx.operators.AtomicObservableSubscription; import rx.util.functions.Func1; import rx.util.functions.Func2; diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java index b45bdd080d..5f0880e855 100644 --- a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java @@ -20,9 +20,9 @@ import rx.Scheduler; import rx.Subscription; +import rx.operators.AtomicObservableSubscription; import rx.subscriptions.CompositeSubscription; import rx.subscriptions.Subscriptions; -import rx.util.AtomicObservableSubscription; import rx.util.functions.Func2; /** diff --git a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java new file mode 100644 index 0000000000..ddbfbaf276 --- /dev/null +++ b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java @@ -0,0 +1,739 @@ +package rx.observables; + +import static org.junit.Assert.*; + +import java.util.Iterator; +import java.util.concurrent.Future; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.operators.OperationMostRecent; +import rx.operators.OperationNext; +import rx.operators.OperationToFuture; +import rx.operators.OperationToIterator; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Func1; +import rx.util.functions.FuncN; +import rx.util.functions.Functions; + +/** + * Extension of {@link Observable} that provides blocking operators. + *

+ * Constructud via {@link #from(Observable)} or {@link Observable#toBlockingObservable()} + * + * @param + */ +public class BlockingObservable extends Observable { + + public static BlockingObservable from(final Observable o) { + return new BlockingObservable(new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + return o.subscribe(observer); + } + }); + } + + /** + * Returns an iterator that iterates all values of the observable. + * + * @param that + * an observable sequence to get an iterator for. + * @param + * the type of source. + * @return the iterator that could be used to iterate over the elements of the observable. + */ + public static Iterator toIterator(Observable source) { + return OperationToIterator.toIterator(source); + } + + /** + * Returns the last element of an observable sequence with a specified source. + * + * @param that + * the source Observable + * @return the last element in the observable sequence. + */ + public static T last(final Observable source) { + return from(source).last(); + } + + /** + * Returns the last element of an observable sequence that matches the predicate. + * + * @param that + * the source Observable + * @param predicate + * a predicate function to evaluate for elements in the sequence. + * @return the last element in the observable sequence. + */ + public static T last(final Observable source, final Func1 predicate) { + return last(source.filter(predicate)); + } + + /** + * Returns the last element of an observable sequence that matches the predicate. + * + * @param that + * the source Observable + * @param predicate + * a predicate function to evaluate for elements in the sequence. + * @return the last element in the observable sequence. + */ + public static T last(final Observable source, final Object predicate) { + return last(source.filter(predicate)); + } + + /** + * Returns the last element of an observable sequence, or a default value if no value is found. + * + * @param source + * the source observable. + * @param defaultValue + * a default value that would be returned if observable is empty. + * @param + * the type of source. + * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. + */ + public static T lastOrDefault(Observable source, T defaultValue) { + return from(source).lastOrDefault(defaultValue); + } + + /** + * Returns the last element of an observable sequence that matches the predicate, or a default value if no value is found. + * + * @param source + * the source observable. + * @param defaultValue + * a default value that would be returned if observable is empty. + * @param predicate + * a predicate function to evaluate for elements in the sequence. + * @param + * the type of source. + * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. + */ + public static T lastOrDefault(Observable source, T defaultValue, Func1 predicate) { + return lastOrDefault(source.filter(predicate), defaultValue); + } + + /** + * Returns the last element of an observable sequence that matches the predicate, or a default value if no value is found. + * + * @param source + * the source observable. + * @param defaultValue + * a default value that would be returned if observable is empty. + * @param predicate + * a predicate function to evaluate for elements in the sequence. + * @param + * the type of source. + * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. + */ + public static T lastOrDefault(Observable source, T defaultValue, Object predicate) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(predicate); + + return lastOrDefault(source, defaultValue, new Func1() { + @Override + public Boolean call(T args) { + return (Boolean) _f.call(args); + } + }); + } + + /** + * Samples the most recent value in an observable sequence. + * + * @param source + * the source observable sequence. + * @param + * the type of observable. + * @param initialValue + * the initial value that will be yielded by the enumerable sequence if no element has been sampled yet. + * @return the iterable that returns the last sampled element upon each iteration. + */ + public static Iterable mostRecent(Observable source, T initialValue) { + return OperationMostRecent.mostRecent(source, initialValue); + } + + /** + * Samples the next value (blocking without buffering) from in an observable sequence. + * + * @param items + * the source observable sequence. + * @param + * the type of observable. + * @return iterable that blocks upon each iteration until the next element in the observable source sequence becomes available. + */ + public static Iterable next(Observable items) { + return OperationNext.next(items); + } + + private static T _singleOrDefault(BlockingObservable source, boolean hasDefault, T defaultValue) { + Iterator it = source.toIterable().iterator(); + + if (!it.hasNext()) { + if (hasDefault) { + return defaultValue; + } + throw new IllegalStateException("Expected single entry. Actually empty stream."); + } + + T result = it.next(); + + if (it.hasNext()) { + throw new IllegalStateException("Expected single entry. Actually more than one entry."); + } + + return result; + } + + /** + * Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence. + * + * @param that + * the source Observable + * @return The single element in the observable sequence. + * @throws IllegalStateException + * if there is not exactly one element in the observable sequence + */ + public static T single(Observable source) { + return from(source).single(); + } + + /** + * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. + * + * @param that + * the source Observable + * @param predicate + * A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence. + * @throws IllegalStateException + * if there is not exactly one element in the observable sequence that matches the predicate + */ + public static T single(Observable source, Func1 predicate) { + return from(source).single(predicate); + } + + /** + * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. + * + * @param that + * the source Observable + * @param predicate + * A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence. + * @throws IllegalStateException + * if there is not exactly one element in the observable sequence that matches the predicate + */ + public static T single(Observable source, Object predicate) { + return from(source).single(predicate); + } + + /** + * Returns the only element of an observable sequence, or a default value if the observable sequence is empty. + * + * @param that + * the source Observable + * @param defaultValue + * default value for a sequence. + * @return The single element in the observable sequence, or a default value if no value is found. + */ + public static T singleOrDefault(Observable source, T defaultValue) { + return from(source).singleOrDefault(defaultValue); + } + + /** + * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. + * + * @param that + * the source Observable + * @param defaultValue + * default value for a sequence. + * @param predicate + * A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence, or a default value if no value is found. + */ + public static T singleOrDefault(Observable source, T defaultValue, Func1 predicate) { + return from(source).singleOrDefault(defaultValue, predicate); + } + + /** + * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. + * + * @param that + * the source Observable + * @param defaultValue + * default value for a sequence. + * @param predicate + * A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence, or a default value if no value is found. + */ + public static T singleOrDefault(Observable source, T defaultValue, Object predicate) { + return from(source).singleOrDefault(defaultValue, predicate); + } + + /** + * Return a Future representing a single value of the Observable. + *

+ * This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use toList().toFuture(). + * + * @param that + * the source Observable + * @return a Future that expects a single item emitted by the source Observable + */ + public static Future toFuture(final Observable source) { + return OperationToFuture.toFuture(source); + } + + /** + * Converts an observable sequence to an Iterable. + * + * @param that + * the source Observable + * @return Observable converted to Iterable. + */ + public static Iterable toIterable(final Observable source) { + return from(source).toIterable(); + } + + protected BlockingObservable(Func1, Subscription> onSubscribe) { + super(onSubscribe); + } + + /** + * Returns an iterator that iterates all values of the observable. + * + * @return the iterator that could be used to iterate over the elements of the observable. + */ + public Iterator getIterator() { + return OperationToIterator.toIterator(this); + } + + /** + * Returns the last element of an observable sequence with a specified source. + * + * @return the last element in the observable sequence. + */ + public T last() { + T result = null; + for (T value : toIterable()) { + result = value; + } + return result; + } + + /** + * Returns the last element of an observable sequence that matches the predicate. + * + * @param predicate + * a predicate function to evaluate for elements in the sequence. + * @return the last element in the observable sequence. + */ + public T last(final Func1 predicate) { + return last(this, predicate); + } + + /** + * Returns the last element of an observable sequence that matches the predicate. + * + * @param predicate + * a predicate function to evaluate for elements in the sequence. + * @return the last element in the observable sequence. + */ + public T last(final Object predicate) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(predicate); + + return last(this, new Func1() { + @Override + public Boolean call(T args) { + return (Boolean) _f.call(args); + } + }); + } + + /** + * Returns the last element, or a default value if no value is found. + * + * @param defaultValue + * a default value that would be returned if observable is empty. + * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. + */ + public T lastOrDefault(T defaultValue) { + boolean found = false; + T result = null; + + for (T value : toIterable()) { + found = true; + result = value; + } + + if (!found) { + return defaultValue; + } + + return result; + } + + /** + * Returns the last element that matches the predicate, or a default value if no value is found. + * + * @param defaultValue + * a default value that would be returned if observable is empty. + * @param predicate + * a predicate function to evaluate for elements in the sequence. + * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. + */ + public T lastOrDefault(T defaultValue, Func1 predicate) { + return lastOrDefault(this, defaultValue, predicate); + } + + /** + * Returns the last element that matches the predicate, or a default value if no value is found. + * + * @param defaultValue + * a default value that would be returned if observable is empty. + * @param predicate + * a predicate function to evaluate for elements in the sequence. + * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. + */ + public T lastOrDefault(T defaultValue, Object predicate) { + return lastOrDefault(this, defaultValue, predicate); + } + + /** + * Samples the most recent value in an observable sequence. + * + * @param initialValue + * the initial value that will be yielded by the enumerable sequence if no element has been sampled yet. + * @return the iterable that returns the last sampled element upon each iteration. + */ + public Iterable mostRecent(T initialValue) { + return mostRecent(this, initialValue); + } + + /** + * Samples the next value (blocking without buffering) from in an observable sequence. + * + * @return iterable that blocks upon each iteration until the next element in the observable source sequence becomes available. + */ + public Iterable next() { + return next(this); + } + + /** + * Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence. + * + * @return The single element in the observable sequence. + */ + public T single() { + return _singleOrDefault(this, false, null); + } + + /** + * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. + * + * @param predicate + * A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence. + */ + public T single(Func1 predicate) { + return _singleOrDefault(from(this.filter(predicate)), false, null); + } + + /** + * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. + * + * @param predicate + * A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence. + */ + public T single(Object predicate) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(predicate); + + return single(new Func1() { + @Override + public Boolean call(T t) { + return (Boolean) _f.call(t); + } + }); + } + + /** + * Returns the only element of an observable sequence, or a default value if the observable sequence is empty. + * + * @param defaultValue + * default value for a sequence. + * @return The single element in the observable sequence, or a default value if no value is found. + */ + public T singleOrDefault(T defaultValue) { + return _singleOrDefault(this, true, defaultValue); + } + + /** + * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. + * + * @param defaultValue + * default value for a sequence. + * @param predicate + * A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence, or a default value if no value is found. + */ + public T singleOrDefault(T defaultValue, Func1 predicate) { + return _singleOrDefault(from(this.filter(predicate)), true, defaultValue); + } + + /** + * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. + * + * @param defaultValue + * default value for a sequence. + * @param predicate + * A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence, or a default value if no value is found. + */ + public T singleOrDefault(T defaultValue, final Object predicate) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(predicate); + + return singleOrDefault(defaultValue, new Func1() { + @Override + public Boolean call(T t) { + return (Boolean) _f.call(t); + } + }); + } + + /** + * Return a Future representing a single value of the Observable. + *

+ * This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use toList().toFuture(). + * + * @return a Future that expects a single item emitted by the source Observable + */ + public Future toFuture() { + return toFuture(this); + } + + /** + * Converts an observable sequence to an Iterable. + * + * @return Observable converted to Iterable. + */ + public Iterable toIterable() { + return new Iterable() { + @Override + public Iterator iterator() { + return getIterator(); + } + }; + } + + public static class UnitTest { + + @Mock + Observer w; + + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void testLast() { + BlockingObservable obs = BlockingObservable.from(Observable.from("one", "two", "three")); + + assertEquals("three", obs.last()); + } + + @Test + public void testLastEmptyObservable() { + BlockingObservable obs = BlockingObservable.from(Observable.from()); + + assertNull(obs.last()); + } + + @Test + public void testLastOrDefault() { + BlockingObservable observable = BlockingObservable.from(toObservable(1, 0, -1)); + int last = observable.lastOrDefault(-100, new Func1() { + @Override + public Boolean call(Integer args) { + return args >= 0; + } + }); + assertEquals(0, last); + } + + @Test + public void testLastOrDefault1() { + BlockingObservable observable = BlockingObservable.from(toObservable("one", "two", "three")); + assertEquals("three", observable.lastOrDefault("default")); + } + + @Test + public void testLastOrDefault2() { + BlockingObservable observable = BlockingObservable.from(toObservable()); + assertEquals("default", observable.lastOrDefault("default")); + } + + @Test + public void testLastOrDefaultWithPredicate() { + BlockingObservable observable = BlockingObservable.from(toObservable(1, 0, -1)); + int last = observable.lastOrDefault(0, new Func1() { + @Override + public Boolean call(Integer args) { + return args < 0; + } + }); + + assertEquals(-1, last); + } + + @Test + public void testLastOrDefaultWrongPredicate() { + BlockingObservable observable = BlockingObservable.from(toObservable(-1, -2, -3)); + int last = observable.lastOrDefault(0, new Func1() { + @Override + public Boolean call(Integer args) { + return args >= 0; + } + }); + assertEquals(0, last); + } + + @Test + public void testLastWithPredicate() { + BlockingObservable obs = BlockingObservable.from(Observable.from("one", "two", "three")); + + assertEquals("two", obs.last(new Func1() { + @Override + public Boolean call(String s) { + return s.length() == 3; + } + })); + } + + public void testSingle() { + BlockingObservable observable = BlockingObservable.from(toObservable("one")); + assertEquals("one", observable.single()); + } + + @Test + public void testSingleDefault() { + BlockingObservable observable = BlockingObservable.from(toObservable()); + assertEquals("default", observable.singleOrDefault("default")); + } + + @Test(expected = IllegalStateException.class) + public void testSingleDefaultPredicateMatchesMoreThanOne() { + BlockingObservable.from(toObservable("one", "two")).singleOrDefault("default", new Func1() { + @Override + public Boolean call(String args) { + return args.length() == 3; + } + }); + } + + @Test + public void testSingleDefaultPredicateMatchesNothing() { + BlockingObservable observable = BlockingObservable.from(toObservable("one", "two")); + String result = observable.singleOrDefault("default", new Func1() { + @Override + public Boolean call(String args) { + return args.length() == 4; + } + }); + assertEquals("default", result); + } + + @Test(expected = IllegalStateException.class) + public void testSingleDefaultWithMoreThanOne() { + BlockingObservable observable = BlockingObservable.from(toObservable("one", "two", "three")); + observable.singleOrDefault("default"); + } + + @Test + public void testSingleWithPredicateDefault() { + BlockingObservable observable = BlockingObservable.from(toObservable("one", "two", "four")); + assertEquals("four", observable.single(new Func1() { + @Override + public Boolean call(String s) { + return s.length() == 4; + } + })); + } + + @Test(expected = IllegalStateException.class) + public void testSingleWrong() { + BlockingObservable observable = BlockingObservable.from(toObservable(1, 2)); + observable.single(); + } + + @Test(expected = IllegalStateException.class) + public void testSingleWrongPredicate() { + BlockingObservable observable = BlockingObservable.from(toObservable(-1)); + observable.single(new Func1() { + @Override + public Boolean call(Integer args) { + return args > 0; + } + }); + } + + @Test + public void testToIterable() { + BlockingObservable obs = BlockingObservable.from(toObservable("one", "two", "three")); + + Iterator it = obs.toIterable().iterator(); + + assertEquals(true, it.hasNext()); + assertEquals("one", it.next()); + + assertEquals(true, it.hasNext()); + assertEquals("two", it.next()); + + assertEquals(true, it.hasNext()); + assertEquals("three", it.next()); + + assertEquals(false, it.hasNext()); + + } + + @Test(expected = TestException.class) + public void testToIterableWithException() { + BlockingObservable obs = BlockingObservable.from(create(new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + observer.onNext("one"); + observer.onError(new TestException()); + return Subscriptions.empty(); + } + })); + + Iterator it = obs.toIterable().iterator(); + + assertEquals(true, it.hasNext()); + assertEquals("one", it.next()); + + assertEquals(true, it.hasNext()); + it.next(); + + } + + private static class TestException extends RuntimeException { + private static final long serialVersionUID = 1L; + } + } +} diff --git a/rxjava-core/src/main/java/rx/util/AtomicObservableSubscription.java b/rxjava-core/src/main/java/rx/operators/AtomicObservableSubscription.java similarity index 99% rename from rxjava-core/src/main/java/rx/util/AtomicObservableSubscription.java rename to rxjava-core/src/main/java/rx/operators/AtomicObservableSubscription.java index 73e2fc5004..11bcc5fabc 100644 --- a/rxjava-core/src/main/java/rx/util/AtomicObservableSubscription.java +++ b/rxjava-core/src/main/java/rx/operators/AtomicObservableSubscription.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.util; +package rx.operators; import org.junit.Test; import rx.Subscription; diff --git a/rxjava-core/src/main/java/rx/util/AtomicObserver.java b/rxjava-core/src/main/java/rx/operators/AtomicObserver.java similarity index 98% rename from rxjava-core/src/main/java/rx/util/AtomicObserver.java rename to rxjava-core/src/main/java/rx/operators/AtomicObserver.java index 24519dc276..4fac51872a 100644 --- a/rxjava-core/src/main/java/rx/util/AtomicObserver.java +++ b/rxjava-core/src/main/java/rx/operators/AtomicObserver.java @@ -1,10 +1,11 @@ -package rx.util; +package rx.operators; import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; import rx.Observer; import rx.plugins.RxJavaPlugins; +import rx.util.CompositeException; /** * Wrapper around Observer to ensure compliance with Rx contract. diff --git a/rxjava-core/src/main/java/rx/operators/OperationAll.java b/rxjava-core/src/main/java/rx/operators/OperationAll.java index db603dc853..d0ccfa2409 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationAll.java +++ b/rxjava-core/src/main/java/rx/operators/OperationAll.java @@ -4,7 +4,6 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; import java.util.concurrent.atomic.AtomicBoolean; diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index 382f8ba8aa..bda1b48968 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -719,7 +719,7 @@ public void testCombineLatest2Types() { /* define a Observer to receive aggregated events */ Observer aObserver = mock(Observer.class); - Observable w = Observable.create(combineLatest(Observable.toObservable("one", "two"), Observable.toObservable(2, 3, 4), combineLatestFunction)); + Observable w = Observable.create(combineLatest(Observable.from("one", "two"), Observable.from(2, 3, 4), combineLatestFunction)); w.subscribe(aObserver); verify(aObserver, never()).onError(any(Exception.class)); @@ -738,7 +738,7 @@ public void testCombineLatest3TypesA() { /* define a Observer to receive aggregated events */ Observer aObserver = mock(Observer.class); - Observable w = Observable.create(combineLatest(Observable.toObservable("one", "two"), Observable.toObservable(2), Observable.toObservable(new int[] { 4, 5, 6 }), combineLatestFunction)); + Observable w = Observable.create(combineLatest(Observable.from("one", "two"), Observable.from(2), Observable.from(new int[] { 4, 5, 6 }), combineLatestFunction)); w.subscribe(aObserver); verify(aObserver, never()).onError(any(Exception.class)); @@ -755,7 +755,7 @@ public void testCombineLatest3TypesB() { /* define a Observer to receive aggregated events */ Observer aObserver = mock(Observer.class); - Observable w = Observable.create(combineLatest(Observable.toObservable("one"), Observable.toObservable(2), Observable.toObservable(new int[] { 4, 5, 6 }, new int[] { 7, 8 }), combineLatestFunction)); + Observable w = Observable.create(combineLatest(Observable.from("one"), Observable.from(2), Observable.from(new int[] { 4, 5, 6 }, new int[] { 7, 8 }), combineLatestFunction)); w.subscribe(aObserver); verify(aObserver, never()).onError(any(Exception.class)); diff --git a/rxjava-core/src/main/java/rx/operators/OperationConcat.java b/rxjava-core/src/main/java/rx/operators/OperationConcat.java index 93c14b2acf..bd43f275c1 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationConcat.java +++ b/rxjava-core/src/main/java/rx/operators/OperationConcat.java @@ -33,7 +33,6 @@ import rx.Observer; import rx.Subscription; import rx.subscriptions.BooleanSubscription; -import rx.util.AtomicObservableSubscription; import rx.util.Exceptions; import rx.util.functions.Func1; @@ -62,11 +61,11 @@ public final class OperationConcat { * @return An observable sequence whose elements are the result of combining the output from the list of Observables. */ public static Func1, Subscription> concat(final Observable... sequences) { - return concat(Observable.toObservable(sequences)); + return concat(Observable.from(sequences)); } public static Func1, Subscription> concat(final List> sequences) { - return concat(Observable.toObservable(sequences)); + return concat(Observable.from(sequences)); } public static Func1, Subscription> concat(final Observable> sequences) { @@ -150,8 +149,8 @@ public void testConcat() { final String[] o = { "1", "3", "5", "7" }; final String[] e = { "2", "4", "6" }; - final Observable odds = Observable.toObservable(o); - final Observable even = Observable.toObservable(e); + final Observable odds = Observable.from(o); + final Observable even = Observable.from(e); @SuppressWarnings("unchecked") Observable concat = Observable.create(concat(odds, even)); @@ -168,8 +167,8 @@ public void testConcatWithList() { final String[] o = { "1", "3", "5", "7" }; final String[] e = { "2", "4", "6" }; - final Observable odds = Observable.toObservable(o); - final Observable even = Observable.toObservable(e); + final Observable odds = Observable.from(o); + final Observable even = Observable.from(e); final List> list = new ArrayList>(); list.add(odds); list.add(even); @@ -187,8 +186,8 @@ public void testConcatObservableOfObservables() { final String[] o = { "1", "3", "5", "7" }; final String[] e = { "2", "4", "6" }; - final Observable odds = Observable.toObservable(o); - final Observable even = Observable.toObservable(e); + final Observable odds = Observable.from(o); + final Observable even = Observable.from(e); Observable> observableOfObservables = Observable.create(new Func1>, Subscription>() { @@ -372,8 +371,8 @@ public void testBlockedObservableOfObservables() { final String[] o = { "1", "3", "5", "7" }; final String[] e = { "2", "4", "6" }; - final Observable odds = Observable.toObservable(o); - final Observable even = Observable.toObservable(e); + final Observable odds = Observable.from(o); + final Observable even = Observable.from(e); final CountDownLatch callOnce = new CountDownLatch(1); final CountDownLatch okToContinue = new CountDownLatch(1); TestObservable> observableOfObservables = new TestObservable>(callOnce, okToContinue, odds, even); diff --git a/rxjava-core/src/main/java/rx/operators/OperationDematerialize.java b/rxjava-core/src/main/java/rx/operators/OperationDematerialize.java index e53f0fdc4b..9fb0ec2f9e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationDematerialize.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDematerialize.java @@ -86,7 +86,7 @@ public static class UnitTest { @Test @SuppressWarnings("unchecked") public void testDematerialize1() { - Observable> notifications = Observable.toObservable(1, 2).materialize(); + Observable> notifications = Observable.from(1, 2).materialize(); Observable dematerialize = Observable.dematerialize(notifications); Observer aObserver = mock(Observer.class); diff --git a/rxjava-core/src/main/java/rx/operators/OperationFilter.java b/rxjava-core/src/main/java/rx/operators/OperationFilter.java index 64cd70d1ff..efcfe10fc8 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationFilter.java +++ b/rxjava-core/src/main/java/rx/operators/OperationFilter.java @@ -24,7 +24,6 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; public final class OperationFilter { @@ -74,7 +73,7 @@ public static class UnitTest { @Test public void testFilter() { - Observable w = Observable.toObservable("one", "two", "three"); + Observable w = Observable.from("one", "two", "three"); Observable observable = Observable.create(filter(w, new Func1() { @Override diff --git a/rxjava-core/src/main/java/rx/operators/OperationFinally.java b/rxjava-core/src/main/java/rx/operators/OperationFinally.java index 844e5f1b40..f0638ea7eb 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationFinally.java +++ b/rxjava-core/src/main/java/rx/operators/OperationFinally.java @@ -116,7 +116,7 @@ private void checkActionCalled(Observable input) { @Test public void testFinallyCalledOnComplete() { - checkActionCalled(Observable.toObservable(new String[] {"1", "2", "3"})); + checkActionCalled(Observable.from(new String[] {"1", "2", "3"})); } @Test diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java index 35123d704e..1c2e6e969c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java @@ -162,10 +162,10 @@ public void testEmpty() { private static Map> toMap(Observable> observable) { Map> result = new HashMap>(); - for (GroupedObservable g : observable.toIterable()) { + for (GroupedObservable g : observable.toBlockingObservable().toIterable()) { K key = g.getKey(); - for (V value : g.toIterable()) { + for (V value : g.toBlockingObservable().toIterable()) { List values = result.get(key); if (values == null) { values = new ArrayList(); diff --git a/rxjava-core/src/main/java/rx/operators/OperationMap.java b/rxjava-core/src/main/java/rx/operators/OperationMap.java index 11aabc8280..4dc7878658 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMap.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMap.java @@ -143,7 +143,7 @@ public void testMap() { Map m1 = getMap("One"); Map m2 = getMap("Two"); @SuppressWarnings("unchecked") - Observable> observable = Observable.toObservable(m1, m2); + Observable> observable = Observable.from(m1, m2); Observable m = Observable.create(map(observable, new Func1, String>() { @@ -165,7 +165,7 @@ public String call(Map map) { @Test public void testMapMany() { /* simulate a top-level async call which returns IDs */ - Observable ids = Observable.toObservable(1, 2); + Observable ids = Observable.from(1, 2); /* now simulate the behavior to take those IDs and perform nested async calls based on them */ Observable m = Observable.create(mapMany(ids, new Func1>() { @@ -178,11 +178,11 @@ public Observable call(Integer id) { if (id == 1) { Map m1 = getMap("One"); Map m2 = getMap("Two"); - subObservable = Observable.toObservable(m1, m2); + subObservable = Observable.from(m1, m2); } else { Map m3 = getMap("Three"); Map m4 = getMap("Four"); - subObservable = Observable.toObservable(m3, m4); + subObservable = Observable.from(m3, m4); } /* simulate kicking off the async call and performing a select on it to transform the data */ @@ -210,15 +210,15 @@ public void testMapMany2() { Map m1 = getMap("One"); Map m2 = getMap("Two"); @SuppressWarnings("unchecked") - Observable> observable1 = Observable.toObservable(m1, m2); + Observable> observable1 = Observable.from(m1, m2); Map m3 = getMap("Three"); Map m4 = getMap("Four"); @SuppressWarnings("unchecked") - Observable> observable2 = Observable.toObservable(m3, m4); + Observable> observable2 = Observable.from(m3, m4); @SuppressWarnings("unchecked") - Observable>> observable = Observable.toObservable(observable1, observable2); + Observable>> observable = Observable.from(observable1, observable2); Observable m = Observable.create(mapMany(observable, new Func1>, Observable>() { diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava-core/src/main/java/rx/operators/OperationMerge.java index e750cddd20..e6cdd7328f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java @@ -35,8 +35,6 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.util.AtomicObservableSubscription; -import rx.util.SynchronizedObserver; import rx.util.functions.Func1; public final class OperationMerge { diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index 201b3c53f0..05c71139a6 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -69,7 +69,7 @@ public void testObserveOn() { Scheduler scheduler = spy(OperatorTester.UnitTest.forwardingScheduler(Schedulers.immediate())); Observer observer = mock(Observer.class); - Observable.create(observeOn(Observable.toObservable(1, 2, 3), scheduler)).subscribe(observer); + Observable.create(observeOn(Observable.from(1, 2, 3), scheduler)).subscribe(observer); verify(observer, times(1)).onNext(1); verify(observer, times(1)).onNext(2); diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java index 8dc80d0ab1..9e5ab61fbb 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaFunction.java @@ -29,7 +29,6 @@ import rx.Observer; import rx.Subscription; import rx.subscriptions.Subscriptions; -import rx.util.AtomicObservableSubscription; import rx.util.CompositeException; import rx.util.functions.Func1; @@ -124,7 +123,7 @@ public Subscription call(Observer observer) { @Override public Observable call(Exception t1) { receivedException.set(t1); - return Observable.toObservable("twoResume", "threeResume"); + return Observable.from("twoResume", "threeResume"); } }; @@ -154,7 +153,7 @@ public void testResumeNextWithAsyncExecution() { @Override public Observable call(Exception t1) { receivedException.set(t1); - return Observable.toObservable("twoResume", "threeResume"); + return Observable.from("twoResume", "threeResume"); } }; diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java index 84409e2b25..285b442374 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java @@ -27,7 +27,6 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; public final class OperationOnErrorResumeNextViaObservable { @@ -102,7 +101,7 @@ public static class UnitTest { public void testResumeNext() { Subscription s = mock(Subscription.class); TestObservable w = new TestObservable(s, "one"); - Observable resume = Observable.toObservable("twoResume", "threeResume"); + Observable resume = Observable.from("twoResume", "threeResume"); Observable observable = Observable.create(onErrorResumeNextViaObservable(w, resume)); @SuppressWarnings("unchecked") diff --git a/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java b/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java index fef25bb8f3..d0ebbed1c3 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationOnErrorReturn.java @@ -28,7 +28,6 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.util.AtomicObservableSubscription; import rx.util.CompositeException; import rx.util.functions.Func1; diff --git a/rxjava-core/src/main/java/rx/operators/OperationScan.java b/rxjava-core/src/main/java/rx/operators/OperationScan.java index 94ecaaf6d0..2a8bd083b7 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationScan.java +++ b/rxjava-core/src/main/java/rx/operators/OperationScan.java @@ -171,7 +171,7 @@ public void testScanIntegersWithInitialValue() { @SuppressWarnings("unchecked") Observer observer = mock(Observer.class); - Observable observable = Observable.toObservable(1, 2, 3); + Observable observable = Observable.from(1, 2, 3); Observable m = Observable.create(scan(observable, "", new Func2() { @@ -198,7 +198,7 @@ public void testScanIntegersWithoutInitialValue() { @SuppressWarnings("unchecked") Observer Observer = mock(Observer.class); - Observable observable = Observable.toObservable(1, 2, 3); + Observable observable = Observable.from(1, 2, 3); Observable m = Observable.create(scan(observable, new Func2() { @@ -225,7 +225,7 @@ public void testScanIntegersWithoutInitialValueAndOnlyOneValue() { @SuppressWarnings("unchecked") Observer Observer = mock(Observer.class); - Observable observable = Observable.toObservable(1); + Observable observable = Observable.from(1); Observable m = Observable.create(scan(observable, new Func2() { diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkip.java b/rxjava-core/src/main/java/rx/operators/OperationSkip.java index 54cbbfd653..a12ef1fef4 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSkip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSkip.java @@ -111,7 +111,7 @@ public static class UnitTest { @Test public void testSkip1() { - Observable w = Observable.toObservable("one", "two", "three"); + Observable w = Observable.from("one", "two", "three"); Observable skip = Observable.create(skip(w, 2)); @SuppressWarnings("unchecked") @@ -126,7 +126,7 @@ public void testSkip1() { @Test public void testSkip2() { - Observable w = Observable.toObservable("one", "two", "three"); + Observable w = Observable.from("one", "two", "three"); Observable skip = Observable.create(skip(w, 1)); @SuppressWarnings("unchecked") diff --git a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java index 7e3756057d..238ea1eae4 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java @@ -79,7 +79,7 @@ public static class UnitTest { @Test @SuppressWarnings("unchecked") public void testSubscribeOn() { - Observable w = Observable.toObservable(1, 2, 3); + Observable w = Observable.from(1, 2, 3); Scheduler scheduler = spy(OperatorTester.UnitTest.forwardingScheduler(Schedulers.immediate())); diff --git a/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java b/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java index acdeee4c50..4f19fce09c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSynchronize.java @@ -24,8 +24,6 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.util.AtomicObservableSubscription; -import rx.util.SynchronizedObserver; import rx.util.functions.Func1; /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index 5a36105ac3..68a8eec40e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -29,7 +29,6 @@ import rx.Observer; import rx.Subscription; import rx.subscriptions.Subscriptions; -import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; /** @@ -150,7 +149,7 @@ public static class UnitTest { @Test public void testTake1() { - Observable w = Observable.toObservable("one", "two", "three"); + Observable w = Observable.from("one", "two", "three"); Observable take = Observable.create(assertTrustedObservable(take(w, 2))); @SuppressWarnings("unchecked") @@ -165,7 +164,7 @@ public void testTake1() { @Test public void testTake2() { - Observable w = Observable.toObservable("one", "two", "three"); + Observable w = Observable.from("one", "two", "three"); Observable take = Observable.create(assertTrustedObservable(take(w, 1))); @SuppressWarnings("unchecked") @@ -190,7 +189,7 @@ public Subscription call(Observer observer) return Subscriptions.empty(); } }); - Observable.create(assertTrustedObservable(take(source, 1))).last(); + Observable.create(assertTrustedObservable(take(source, 1))).toBlockingObservable().last(); } @Test @@ -214,7 +213,7 @@ public void unsubscribe() }; } }); - Observable.create(assertTrustedObservable(take(source, 0))).lastOrDefault("ok"); + Observable.create(assertTrustedObservable(take(source, 0))).toBlockingObservable().lastOrDefault("ok"); assertTrue("source subscribed", subscribed.get()); assertTrue("source unsubscribed", unSubscribed.get()); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java index c80f9f59d9..b05888901e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeLast.java @@ -27,7 +27,6 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.util.AtomicObservableSubscription; import rx.util.functions.Func1; /** @@ -98,7 +97,7 @@ public static class UnitTest { @Test public void testTakeLastEmpty() { - Observable w = Observable.toObservable(); + Observable w = Observable.from(); Observable take = Observable.create(takeLast(w, 2)); @SuppressWarnings("unchecked") @@ -111,7 +110,7 @@ public void testTakeLastEmpty() { @Test public void testTakeLast1() { - Observable w = Observable.toObservable("one", "two", "three"); + Observable w = Observable.from("one", "two", "three"); Observable take = Observable.create(takeLast(w, 2)); @SuppressWarnings("unchecked") @@ -127,7 +126,7 @@ public void testTakeLast1() { @Test public void testTakeLast2() { - Observable w = Observable.toObservable("one"); + Observable w = Observable.from("one"); Observable take = Observable.create(takeLast(w, 10)); @SuppressWarnings("unchecked") diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java index cb196b5d38..67fbb5d3a3 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -29,8 +29,6 @@ import rx.subjects.PublishSubject; import rx.subjects.Subject; import rx.subscriptions.Subscriptions; -import rx.util.AtomicObservableSubscription; -import rx.util.AtomicObserver; import rx.util.functions.Func1; import rx.util.functions.Func2; @@ -154,7 +152,7 @@ public static class UnitTest { @Test public void testTakeWhile1() { - Observable w = Observable.toObservable(1, 2, 3); + Observable w = Observable.from(1, 2, 3); Observable take = Observable.create(takeWhile(w, new Func1() { @Override @@ -208,7 +206,7 @@ public Boolean call(Integer input) @Test public void testTakeWhile2() { - Observable w = Observable.toObservable("one", "two", "three"); + Observable w = Observable.from("one", "two", "three"); Observable take = Observable.create(takeWhileWithIndex(w, new Func2() { @Override @@ -248,7 +246,7 @@ public Boolean call(String s) { return false; } - })).last(); + })).toBlockingObservable().last(); } @Test diff --git a/rxjava-core/src/main/java/rx/operators/OperationToFuture.java b/rxjava-core/src/main/java/rx/operators/OperationToFuture.java index 89b2bc294e..493cb19763 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToFuture.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToFuture.java @@ -118,14 +118,14 @@ private T getValue() throws ExecutionException { @Test public void testToFuture() throws InterruptedException, ExecutionException { - Observable obs = Observable.toObservable("one"); + Observable obs = Observable.from("one"); Future f = toFuture(obs); assertEquals("one", f.get()); } @Test public void testToFutureList() throws InterruptedException, ExecutionException { - Observable obs = Observable.toObservable("one", "two", "three"); + Observable obs = Observable.from("one", "two", "three"); Future> f = toFuture(obs.toList()); assertEquals("one", f.get().get(0)); assertEquals("two", f.get().get(1)); @@ -134,7 +134,7 @@ public void testToFutureList() throws InterruptedException, ExecutionException { @Test(expected = ExecutionException.class) public void testExceptionWithMoreThanOneElement() throws InterruptedException, ExecutionException { - Observable obs = Observable.toObservable("one", "two"); + Observable obs = Observable.from("one", "two"); Future f = toFuture(obs); assertEquals("one", f.get()); // we expect an exception since there are more than 1 element diff --git a/rxjava-core/src/main/java/rx/operators/OperationToIterator.java b/rxjava-core/src/main/java/rx/operators/OperationToIterator.java index 3eb56572f1..34cb3ce44a 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToIterator.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToIterator.java @@ -107,7 +107,7 @@ public void remove() { @Test public void testToIterator() { - Observable obs = Observable.toObservable("one", "two", "three"); + Observable obs = Observable.from("one", "two", "three"); Iterator it = toIterator(obs); diff --git a/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java b/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java index 855927da31..6ee906dc99 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToObservableList.java @@ -84,7 +84,7 @@ public static class UnitTest { @Test public void testList() { - Observable w = Observable.toObservable("one", "two", "three"); + Observable w = Observable.from("one", "two", "three"); Observable> observable = Observable.create(toObservableList(w)); @SuppressWarnings("unchecked") @@ -97,7 +97,7 @@ public void testList() { @Test public void testListMultipleObservers() { - Observable w = Observable.toObservable("one", "two", "three"); + Observable w = Observable.from("one", "two", "three"); Observable> observable = Observable.create(toObservableList(w)); @SuppressWarnings("unchecked") diff --git a/rxjava-core/src/main/java/rx/operators/OperationToObservableSortedList.java b/rxjava-core/src/main/java/rx/operators/OperationToObservableSortedList.java index 018b2a9da0..5d3aa5b978 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationToObservableSortedList.java +++ b/rxjava-core/src/main/java/rx/operators/OperationToObservableSortedList.java @@ -143,7 +143,7 @@ public static class UnitTest { @Test public void testSortedList() { - Observable w = Observable.toObservable(1, 3, 2, 5, 4); + Observable w = Observable.from(1, 3, 2, 5, 4); Observable> observable = Observable.create(toSortedList(w)); @SuppressWarnings("unchecked") @@ -156,7 +156,7 @@ public void testSortedList() { @Test public void testSortedListWithCustomFunction() { - Observable w = Observable.toObservable(1, 3, 2, 5, 4); + Observable w = Observable.from(1, 3, 2, 5, 4); Observable> observable = Observable.create(toSortedList(w, new Func2() { @Override diff --git a/rxjava-core/src/main/java/rx/operators/OperationWhere.java b/rxjava-core/src/main/java/rx/operators/OperationWhere.java index 8fd7d2b36b..da488bec92 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationWhere.java +++ b/rxjava-core/src/main/java/rx/operators/OperationWhere.java @@ -38,7 +38,7 @@ public static class UnitTest { @Test public void testWhere() { - Observable w = Observable.toObservable("one", "two", "three"); + Observable w = Observable.from("one", "two", "three"); Observable observable = Observable.create(where(w, new Func1() { @Override diff --git a/rxjava-core/src/main/java/rx/operators/OperationZip.java b/rxjava-core/src/main/java/rx/operators/OperationZip.java index 73091e32b5..a960e62968 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationZip.java +++ b/rxjava-core/src/main/java/rx/operators/OperationZip.java @@ -30,8 +30,6 @@ import rx.Observer; import rx.Subscription; import rx.subscriptions.Subscriptions; -import rx.util.AtomicObservableSubscription; -import rx.util.SynchronizedObserver; import rx.util.functions.Func1; import rx.util.functions.Func2; import rx.util.functions.Func3; @@ -670,7 +668,7 @@ public void testZip2Types() { /* define a Observer to receive aggregated events */ Observer aObserver = mock(Observer.class); - Observable w = Observable.create(zip(Observable.toObservable("one", "two"), Observable.toObservable(2, 3, 4), zipr)); + Observable w = Observable.create(zip(Observable.from("one", "two"), Observable.from(2, 3, 4), zipr)); w.subscribe(aObserver); verify(aObserver, never()).onError(any(Exception.class)); @@ -689,7 +687,7 @@ public void testZip3Types() { /* define a Observer to receive aggregated events */ Observer aObserver = mock(Observer.class); - Observable w = Observable.create(zip(Observable.toObservable("one", "two"), Observable.toObservable(2), Observable.toObservable(new int[] { 4, 5, 6 }), zipr)); + Observable w = Observable.create(zip(Observable.from("one", "two"), Observable.from(2), Observable.from(new int[] { 4, 5, 6 }), zipr)); w.subscribe(aObserver); verify(aObserver, never()).onError(any(Exception.class)); @@ -705,7 +703,7 @@ public void testOnNextExceptionInvokesOnError() { @SuppressWarnings("unchecked") Observer aObserver = mock(Observer.class); - Observable w = Observable.create(zip(Observable.toObservable(10, 20, 30), Observable.toObservable(0, 1, 2), zipr)); + Observable w = Observable.create(zip(Observable.from(10, 20, 30), Observable.from(0, 1, 2), zipr)); w.subscribe(aObserver); verify(aObserver, times(1)).onError(any(Exception.class)); diff --git a/rxjava-core/src/main/java/rx/operators/OperatorTester.java b/rxjava-core/src/main/java/rx/operators/OperatorTester.java index 6fa93c1a1d..db102cc482 100644 --- a/rxjava-core/src/main/java/rx/operators/OperatorTester.java +++ b/rxjava-core/src/main/java/rx/operators/OperatorTester.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -124,7 +124,7 @@ public Subscription call(Observer observer) observer.onCompleted(); return Subscriptions.empty(); } - })).lastOrDefault("end"); + })).toBlockingObservable().lastOrDefault("end"); } @@ -139,7 +139,7 @@ public Subscription call(Observer observer) observer.onError(new Exception()); return Subscriptions.empty(); } - })).lastOrDefault("end"); + })).toBlockingObservable().lastOrDefault("end"); } @Test(expected = AssertionError.class) @@ -153,7 +153,7 @@ public Subscription call(Observer observer) observer.onNext("one"); return Subscriptions.empty(); } - })).lastOrDefault("end"); + })).toBlockingObservable().lastOrDefault("end"); } @Test(expected = AssertionError.class) @@ -167,7 +167,7 @@ public Subscription call(Observer observer) observer.onCompleted(); return Subscriptions.empty(); } - })).lastOrDefault("end"); + })).toBlockingObservable().lastOrDefault("end"); } @Test(expected = AssertionError.class) @@ -181,7 +181,7 @@ public Subscription call(Observer observer) observer.onError(new Exception()); return Subscriptions.empty(); } - })).lastOrDefault("end"); + })).toBlockingObservable().lastOrDefault("end"); } @Test(expected = AssertionError.class) @@ -195,7 +195,7 @@ public Subscription call(Observer observer) observer.onNext("one"); return Subscriptions.empty(); } - })).lastOrDefault("end"); + })).toBlockingObservable().lastOrDefault("end"); } @Test @@ -209,7 +209,7 @@ public Subscription call(Observer observer) observer.onCompleted(); return Subscriptions.empty(); } - })).lastOrDefault("end"); + })).toBlockingObservable().lastOrDefault("end"); } @Test @@ -333,7 +333,7 @@ public Subscription schedule(Func0 action, long dueTime, TimeUnit public Subscription schedule(Func1 action, long dueTime, TimeUnit unit) { return underlying.schedule(action, dueTime, unit); } - + @Override public Subscription schedule(T state, Func2 action, long dueTime, TimeUnit unit) { return underlying.schedule(state, action, dueTime, unit); diff --git a/rxjava-core/src/main/java/rx/operators/README.txt b/rxjava-core/src/main/java/rx/operators/README.txt new file mode 100644 index 0000000000..c2d441a10c --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/README.txt @@ -0,0 +1,5 @@ +This package "rx.operators" is for internal implementation details and can change at any time. + +It is excluded from the public Javadocs (http://netflix.github.io/RxJava/javadoc/) and should not be relied upon by any code. + +In short, changes to public signatures of these classes will not be accounted for in the versioning of RxJava. \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/util/SynchronizedObserver.java b/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java similarity index 99% rename from rxjava-core/src/main/java/rx/util/SynchronizedObserver.java rename to rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java index 11f7138303..cd8ad2187d 100644 --- a/rxjava-core/src/main/java/rx/util/SynchronizedObserver.java +++ b/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package rx.util; +package rx.operators; import static org.junit.Assert.*; import static org.mockito.Matchers.*; diff --git a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java index 5bc15623b6..5261e4dbdf 100644 --- a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java @@ -26,7 +26,7 @@ import rx.Observer; import rx.Subscription; -import rx.util.AtomicObservableSubscription; +import rx.operators.AtomicObservableSubscription; import rx.util.functions.Action1; import rx.util.functions.Func0; import rx.util.functions.Func1; diff --git a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java index 5f3b16dc90..9ef56807c1 100644 --- a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java @@ -26,7 +26,7 @@ import rx.Observer; import rx.Subscription; -import rx.util.AtomicObservableSubscription; +import rx.operators.AtomicObservableSubscription; import rx.util.functions.Action1; import rx.util.functions.Func0; import rx.util.functions.Func1; diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java index 030bde520c..6c375017c0 100644 --- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -33,7 +33,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.util.AtomicObservableSubscription; +import rx.operators.AtomicObservableSubscription; import rx.util.functions.Action1; import rx.util.functions.Func0; import rx.util.functions.Func1; @@ -151,7 +151,7 @@ public void unsubscribe() { }).subscribe(subject); // the subject has received an onComplete from the first subscribe because // it is synchronous and the next subscribe won't do anything. - Observable.toObservable(-1, -2, -3).subscribe(subject); + Observable.from(-1, -2, -3).subscribe(subject); List> expected = new ArrayList>(); expected.add(new Notification(-1));