diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 2340da979c4..2617be87fb6 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -72,8 +72,8 @@ def class ObservableTests { @Test public void testMap1() { - new TestFactory().getObservable().map({v -> 'say' + v}).subscribe({ result -> a.received(result)}); - verify(a, times(1)).received("sayhello_1"); + Observable.from(1).map({v -> 'hello_' + v}).subscribe({ result -> a.received(result)}); + verify(a, times(1)).received("hello_1"); } @Test diff --git a/rxjava-core/build.gradle b/rxjava-core/build.gradle index 5931b9fc101..72a984c88d8 100644 --- a/rxjava-core/build.gradle +++ b/rxjava-core/build.gradle @@ -7,7 +7,6 @@ sourceCompatibility = JavaVersion.VERSION_1_6 targetCompatibility = JavaVersion.VERSION_1_6 dependencies { - compile 'org.slf4j:slf4j-api:1.7.0' provided 'junit:junit-dep:4.10' provided 'org.mockito:mockito-core:1.8.5' } @@ -49,5 +48,7 @@ jar { instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava' instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*' } + // commenting out for now as it's breaking the rxjava-scala build and I can't figure out why + // exclude('**/*$UnitTest*') } diff --git a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java index ddbfbaf276a..c3805febf6c 100644 --- a/rxjava-core/src/main/java/rx/observables/BlockingObservable.java +++ b/rxjava-core/src/main/java/rx/observables/BlockingObservable.java @@ -44,7 +44,7 @@ public Subscription call(Observer observer) { /** * Returns an iterator that iterates all values of the observable. * - * @param that + * @param source * an observable sequence to get an iterator for. * @param * the type of source. @@ -57,7 +57,7 @@ public static Iterator toIterator(Observable source) { /** * Returns the last element of an observable sequence with a specified source. * - * @param that + * @param source * the source Observable * @return the last element in the observable sequence. */ @@ -68,7 +68,7 @@ public static T last(final Observable source) { /** * Returns the last element of an observable sequence that matches the predicate. * - * @param that + * @param source * the source Observable * @param predicate * a predicate function to evaluate for elements in the sequence. @@ -81,7 +81,7 @@ public static T last(final Observable source, final Func1 pre /** * Returns the last element of an observable sequence that matches the predicate. * - * @param that + * @param source * the source Observable * @param predicate * a predicate function to evaluate for elements in the sequence. @@ -198,7 +198,7 @@ private static T _singleOrDefault(BlockingObservable source, boolean hasD /** * Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence. * - * @param that + * @param source * the source Observable * @return The single element in the observable sequence. * @throws IllegalStateException @@ -211,7 +211,7 @@ public static T single(Observable source) { /** * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. * - * @param that + * @param source * the source Observable * @param predicate * A predicate function to evaluate for elements in the sequence. @@ -226,7 +226,7 @@ public static T single(Observable source, Func1 predicate) { /** * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. * - * @param that + * @param source * the source Observable * @param predicate * A predicate function to evaluate for elements in the sequence. @@ -241,7 +241,7 @@ public static T single(Observable source, Object predicate) { /** * Returns the only element of an observable sequence, or a default value if the observable sequence is empty. * - * @param that + * @param source * the source Observable * @param defaultValue * default value for a sequence. @@ -254,7 +254,7 @@ public static T singleOrDefault(Observable source, T defaultValue) { /** * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. * - * @param that + * @param source * the source Observable * @param defaultValue * default value for a sequence. @@ -269,7 +269,7 @@ public static T singleOrDefault(Observable source, T defaultValue, Func1< /** * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. * - * @param that + * @param source * the source Observable * @param defaultValue * default value for a sequence. @@ -286,7 +286,7 @@ public static T singleOrDefault(Observable source, T defaultValue, Object *

* This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use toList().toFuture(). * - * @param that + * @param source * the source Observable * @return a Future that expects a single item emitted by the source Observable */ @@ -297,7 +297,7 @@ public static Future toFuture(final Observable source) { /** * Converts an observable sequence to an Iterable. * - * @param that + * @param source * the source Observable * @return Observable converted to Iterable. */ @@ -565,7 +565,7 @@ public void testLastEmptyObservable() { @Test public void testLastOrDefault() { - BlockingObservable observable = BlockingObservable.from(toObservable(1, 0, -1)); + BlockingObservable observable = BlockingObservable.from(from(1, 0, -1)); int last = observable.lastOrDefault(-100, new Func1() { @Override public Boolean call(Integer args) { @@ -577,19 +577,19 @@ public Boolean call(Integer args) { @Test public void testLastOrDefault1() { - BlockingObservable observable = BlockingObservable.from(toObservable("one", "two", "three")); + BlockingObservable observable = BlockingObservable.from(from("one", "two", "three")); assertEquals("three", observable.lastOrDefault("default")); } @Test public void testLastOrDefault2() { - BlockingObservable observable = BlockingObservable.from(toObservable()); + BlockingObservable observable = BlockingObservable.from(from()); assertEquals("default", observable.lastOrDefault("default")); } @Test public void testLastOrDefaultWithPredicate() { - BlockingObservable observable = BlockingObservable.from(toObservable(1, 0, -1)); + BlockingObservable observable = BlockingObservable.from(from(1, 0, -1)); int last = observable.lastOrDefault(0, new Func1() { @Override public Boolean call(Integer args) { @@ -602,7 +602,7 @@ public Boolean call(Integer args) { @Test public void testLastOrDefaultWrongPredicate() { - BlockingObservable observable = BlockingObservable.from(toObservable(-1, -2, -3)); + BlockingObservable observable = BlockingObservable.from(from(-1, -2, -3)); int last = observable.lastOrDefault(0, new Func1() { @Override public Boolean call(Integer args) { @@ -625,19 +625,19 @@ public Boolean call(String s) { } public void testSingle() { - BlockingObservable observable = BlockingObservable.from(toObservable("one")); + BlockingObservable observable = BlockingObservable.from(from("one")); assertEquals("one", observable.single()); } @Test public void testSingleDefault() { - BlockingObservable observable = BlockingObservable.from(toObservable()); + BlockingObservable observable = BlockingObservable.from(from()); assertEquals("default", observable.singleOrDefault("default")); } @Test(expected = IllegalStateException.class) public void testSingleDefaultPredicateMatchesMoreThanOne() { - BlockingObservable.from(toObservable("one", "two")).singleOrDefault("default", new Func1() { + BlockingObservable.from(from("one", "two")).singleOrDefault("default", new Func1() { @Override public Boolean call(String args) { return args.length() == 3; @@ -647,7 +647,7 @@ public Boolean call(String args) { @Test public void testSingleDefaultPredicateMatchesNothing() { - BlockingObservable observable = BlockingObservable.from(toObservable("one", "two")); + BlockingObservable observable = BlockingObservable.from(from("one", "two")); String result = observable.singleOrDefault("default", new Func1() { @Override public Boolean call(String args) { @@ -659,13 +659,13 @@ public Boolean call(String args) { @Test(expected = IllegalStateException.class) public void testSingleDefaultWithMoreThanOne() { - BlockingObservable observable = BlockingObservable.from(toObservable("one", "two", "three")); + BlockingObservable observable = BlockingObservable.from(from("one", "two", "three")); observable.singleOrDefault("default"); } @Test public void testSingleWithPredicateDefault() { - BlockingObservable observable = BlockingObservable.from(toObservable("one", "two", "four")); + BlockingObservable observable = BlockingObservable.from(from("one", "two", "four")); assertEquals("four", observable.single(new Func1() { @Override public Boolean call(String s) { @@ -676,13 +676,13 @@ public Boolean call(String s) { @Test(expected = IllegalStateException.class) public void testSingleWrong() { - BlockingObservable observable = BlockingObservable.from(toObservable(1, 2)); + BlockingObservable observable = BlockingObservable.from(from(1, 2)); observable.single(); } @Test(expected = IllegalStateException.class) public void testSingleWrongPredicate() { - BlockingObservable observable = BlockingObservable.from(toObservable(-1)); + BlockingObservable observable = BlockingObservable.from(from(-1)); observable.single(new Func1() { @Override public Boolean call(Integer args) { @@ -693,7 +693,7 @@ public Boolean call(Integer args) { @Test public void testToIterable() { - BlockingObservable obs = BlockingObservable.from(toObservable("one", "two", "three")); + BlockingObservable obs = BlockingObservable.from(from("one", "two", "three")); Iterator it = obs.toIterable().iterator(); diff --git a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java index 6995e934d2f..a1c904fc27c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java @@ -15,7 +15,11 @@ */ package rx.operators; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; @@ -27,31 +31,21 @@ import rx.Subscription; import rx.concurrency.TestScheduler; import rx.subscriptions.Subscriptions; -import rx.util.AtomicObservableSubscription; import rx.util.functions.Action0; import rx.util.functions.Func1; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; - /** - * This operation transforms an {@link Observable} sequence of {@link Observable} sequences into a single - * {@link Observable} sequence which only produces values from the most recently published {@link Observable} - * sequence in the sequence. + * This operation transforms an {@link Observable} sequence of {@link Observable} sequences into a single {@link Observable} sequence which only produces values from the most recently published + * {@link Observable} sequence in the sequence. */ public final class OperationSwitch { /** - * This function transforms an {@link Observable} sequence of {@link Observable} sequences into a single - * {@link Observable} sequence which produces values from the most recently published {@link Observable}. + * This function transforms an {@link Observable} sequence of {@link Observable} sequences into a single {@link Observable} sequence which produces values from the most recently published + * {@link Observable}. * - * @param sequences The {@link Observable} sequence consisting of {@link Observable} sequences. + * @param sequences + * The {@link Observable} sequence consisting of {@link Observable} sequences. * @return A {@link Func1} which does this transformation. */ public static Func1, Subscription> switchDo(final Observable> sequences) { @@ -105,7 +99,7 @@ public void onError(Exception e) { @Override public void onNext(Observable args) { unsubscribeFromSubSequence(); - + subsequence.set(args.subscribe(new Observer() { @Override public void onCompleted() { @@ -132,7 +126,7 @@ private void unsubscribeFromSubSequence() { } } } - + public static class UnitTest { private TestScheduler scheduler; @@ -158,7 +152,7 @@ public Subscription call(Observer observer) { return Subscriptions.empty(); } })); - + publishNext(observer, 200, Observable.create(new Func1, Subscription>() { @Override public Subscription call(Observer observer) { @@ -167,7 +161,7 @@ public Subscription call(Observer observer) { return Subscriptions.empty(); } })); - + publishCompleted(observer, 250); return Subscriptions.empty(); @@ -188,7 +182,7 @@ public Subscription call(Observer observer) { inOrder.verify(observer, times(1)).onNext("one"); verify(observer, never()).onCompleted(); verify(observer, never()).onError(any(Exception.class)); - + scheduler.advanceTimeTo(175, TimeUnit.MILLISECONDS); inOrder.verify(observer, times(1)).onNext("two"); verify(observer, never()).onCompleted(); @@ -198,7 +192,7 @@ public Subscription call(Observer observer) { inOrder.verify(observer, times(1)).onNext("three"); verify(observer, never()).onCompleted(); verify(observer, never()).onError(any(Exception.class)); - + scheduler.advanceTimeTo(350, TimeUnit.MILLISECONDS); inOrder.verify(observer, never()).onNext(anyString()); verify(observer, times(1)).onCompleted(); @@ -218,7 +212,7 @@ public Subscription call(Observer observer) { return Subscriptions.empty(); } })); - + publishNext(observer, 200, Observable.create(new Func1, Subscription>() { @Override public Subscription call(Observer observer) { @@ -227,7 +221,7 @@ public Subscription call(Observer observer) { return Subscriptions.empty(); } })); - + publishError(observer, 250, new TestException()); return Subscriptions.empty(); @@ -248,7 +242,7 @@ public Subscription call(Observer observer) { inOrder.verify(observer, times(1)).onNext("one"); verify(observer, never()).onCompleted(); verify(observer, never()).onError(any(Exception.class)); - + scheduler.advanceTimeTo(175, TimeUnit.MILLISECONDS); inOrder.verify(observer, times(1)).onNext("two"); verify(observer, never()).onCompleted(); @@ -258,7 +252,7 @@ public Subscription call(Observer observer) { inOrder.verify(observer, times(1)).onNext("three"); verify(observer, never()).onCompleted(); verify(observer, never()).onError(any(Exception.class)); - + scheduler.advanceTimeTo(350, TimeUnit.MILLISECONDS); inOrder.verify(observer, never()).onNext(anyString()); verify(observer, never()).onCompleted(); @@ -278,7 +272,7 @@ public Subscription call(Observer observer) { return Subscriptions.empty(); } })); - + publishNext(observer, 130, Observable.create(new Func1, Subscription>() { @Override public Subscription call(Observer observer) { @@ -286,7 +280,7 @@ public Subscription call(Observer observer) { return Subscriptions.empty(); } })); - + publishNext(observer, 150, Observable.create(new Func1, Subscription>() { @Override public Subscription call(Observer observer) { @@ -313,7 +307,7 @@ public Subscription call(Observer observer) { inOrder.verify(observer, times(1)).onNext("one"); verify(observer, never()).onCompleted(); verify(observer, never()).onError(any(Exception.class)); - + scheduler.advanceTimeTo(250, TimeUnit.MILLISECONDS); inOrder.verify(observer, times(1)).onNext("three"); verify(observer, never()).onCompleted(); @@ -333,7 +327,7 @@ public Subscription call(Observer observer) { return Subscriptions.empty(); } })); - + publishNext(observer, 130, Observable.create(new Func1, Subscription>() { @Override public Subscription call(Observer observer) { @@ -341,7 +335,7 @@ public Subscription call(Observer observer) { return Subscriptions.empty(); } })); - + publishNext(observer, 150, Observable.create(new Func1, Subscription>() { @Override public Subscription call(Observer observer) { @@ -368,7 +362,7 @@ public Subscription call(Observer observer) { inOrder.verify(observer, times(1)).onNext("one"); verify(observer, never()).onCompleted(); verify(observer, never()).onError(any(Exception.class)); - + scheduler.advanceTimeTo(250, TimeUnit.MILLISECONDS); inOrder.verify(observer, never()).onNext("three"); verify(observer, never()).onCompleted(); @@ -392,7 +386,7 @@ public void call() { } }, delay, TimeUnit.MILLISECONDS); } - + private void publishNext(final Observer observer, long delay, final T value) { scheduler.schedule(new Action0() { @Override @@ -403,6 +397,7 @@ public void call() { } @SuppressWarnings("serial") - private class TestException extends Exception { } + private class TestException extends Exception { + } } } \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java index 091bfa3e029..4753f23acf2 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java @@ -15,15 +15,19 @@ */ package rx.subscriptions; +import static org.junit.Assert.*; + +import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.junit.Test; import rx.Subscription; -import rx.util.functions.Functions; +import rx.util.CompositeException; /** * Subscription that represents a group of Subscriptions that are unsubscribed together. @@ -32,8 +36,6 @@ */ public class CompositeSubscription implements Subscription { - private static final Logger logger = LoggerFactory.getLogger(Functions.class); - /* * The reason 'synchronized' is used on 'add' and 'unsubscribe' is because AtomicBoolean/ConcurrentLinkedQueue are both being modified so it needs to be done atomically. * @@ -67,13 +69,80 @@ public synchronized void add(Subscription s) { @Override public synchronized void unsubscribe() { if (unsubscribed.compareAndSet(false, true)) { + Collection es = null; for (Subscription s : subscriptions) { try { s.unsubscribe(); } catch (Exception e) { - logger.error("Failed to unsubscribe.", e); + if (es == null) { + es = new ArrayList(); + } + es.add(e); } } + if (es != null) { + throw new CompositeException("Failed to unsubscribe to 1 or more subscriptions.", es); + } + } + } + + public static class UnitTest { + + @Test + public void testSuccess() { + final AtomicInteger counter = new AtomicInteger(); + CompositeSubscription s = new CompositeSubscription(); + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + }); + + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + }); + + s.unsubscribe(); + + assertEquals(2, counter.get()); + } + + @Test + public void testException() { + final AtomicInteger counter = new AtomicInteger(); + CompositeSubscription s = new CompositeSubscription(); + s.add(new Subscription() { + + @Override + public void unsubscribe() { + throw new RuntimeException("failed on first one"); + } + }); + + s.add(new Subscription() { + + @Override + public void unsubscribe() { + counter.incrementAndGet(); + } + }); + + try { + s.unsubscribe(); + fail("Expecting an exception"); + } catch (CompositeException e) { + // we expect this + assertEquals(1, e.getExceptions().size()); + } + + // we should still have unsubscribed to the second one + assertEquals(1, counter.get()); } } diff --git a/rxjava-core/src/main/java/rx/util/functions/Functions.java b/rxjava-core/src/main/java/rx/util/functions/Functions.java index 93a4049c08c..53671c210d6 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Functions.java +++ b/rxjava-core/src/main/java/rx/util/functions/Functions.java @@ -18,9 +18,6 @@ import java.util.Collection; import java.util.concurrent.ConcurrentHashMap; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - /** * Allows execution of functions from multiple different languages. *

@@ -30,8 +27,6 @@ */ public class Functions { - private static final Logger logger = LoggerFactory.getLogger(Functions.class); - private final static ConcurrentHashMap, FunctionLanguageAdaptor> languageAdaptors = new ConcurrentHashMap, FunctionLanguageAdaptor>(); static { @@ -49,12 +44,17 @@ private static boolean loadLanguageAdaptor(String name) { Class c = Class.forName(className); FunctionLanguageAdaptor a = (FunctionLanguageAdaptor) c.newInstance(); registerLanguageAdaptor(a.getFunctionClass(), a); - logger.info("Successfully loaded function language adaptor: " + name + " with path: " + className); + /* + * Using System.err/System.out as this is the only place in the library where we do logging and it's only at startup. + * I don't want to include SL4J/Log4j just for this and no one uses Java Logging. + */ + System.out.println("RxJava => Successfully loaded function language adaptor: " + name + " with path: " + className); } catch (ClassNotFoundException e) { - logger.info("Could not find function language adaptor: " + name + " with path: " + className); + System.err.println("RxJava => Could not find function language adaptor: " + name + " with path: " + className); return false; } catch (Exception e) { - logger.error("Failed trying to initialize function language adaptor: " + className, e); + System.err.println("RxJava => Failed trying to initialize function language adaptor: " + className); + e.printStackTrace(); return false; } return true;