diff --git a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java new file mode 100644 index 0000000000..eb7c655414 --- /dev/null +++ b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java @@ -0,0 +1,264 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.subjects; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Test; +import org.mockito.Mockito; + +import rx.Observer; +import rx.Subscription; +import rx.util.AtomicObservableSubscription; +import rx.util.SynchronizedObserver; +import rx.util.functions.Action1; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +/** + * Subject that publishes the last and all subsequent events to each {@link Observer} that subscribes. + *

+ * Example usage: + *

+ *

 {@code
+ 
+  // observer will receive all events.
+  BehaviorSubject subject = BehaviorSubject.createWithDefaultValue("default");
+  subject.subscribe(observer);
+  subject.onNext("one");
+  subject.onNext("two");
+  subject.onNext("three");
+ 
+  // observer will receive the "one", "two" and "three" events.
+  BehaviorSubject subject = BehaviorSubject.createWithDefaultValue("default");
+  subject.onNext("one");
+  subject.subscribe(observer);
+  subject.onNext("two");
+  subject.onNext("three");
+ 
+  } 
+ * 
+ * @param 
+ */
+public class BehaviorSubject extends Subject {
+
+    /**
+     * Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each 
+     * {@link Observer} that subscribes to it.
+     *  
+     * @param defaultValue
+     *            The value which will be published to any {@link Observer} as long as the 
+     *            {@link BehaviorSubject} has not yet received any events.
+     * @return the constructed {@link BehaviorSubject}.
+     */
+    public static  BehaviorSubject createWithDefaultValue(T defaultValue) {
+        final ConcurrentHashMap> observers = new ConcurrentHashMap>();
+
+        final AtomicReference currentValue = new AtomicReference(defaultValue);
+        
+        Func1, Subscription> onSubscribe = new Func1, Subscription>() {
+            @Override
+            public Subscription call(Observer observer) {
+                final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
+
+                subscription.wrap(new Subscription() {
+                    @Override
+                    public void unsubscribe() {
+                        // on unsubscribe remove it from the map of outbound observers to notify
+                        observers.remove(subscription);
+                    }
+                });
+
+                SynchronizedObserver synchronizedObserver = new SynchronizedObserver(observer, subscription);
+                synchronizedObserver.onNext(currentValue.get());
+
+                // on subscribe add it to the map of outbound observers to notify
+                observers.put(subscription, synchronizedObserver);
+                return subscription;
+            }
+        };
+
+        return new BehaviorSubject(currentValue, onSubscribe, observers);
+    }
+
+    private final ConcurrentHashMap> observers;
+    private final AtomicReference currentValue;
+
+    protected BehaviorSubject(AtomicReference currentValue, Func1, Subscription> onSubscribe, ConcurrentHashMap> observers) {
+        super(onSubscribe);
+        this.currentValue = currentValue;
+        this.observers = observers;
+    }
+    
+    @Override
+    public void onCompleted() {
+        for (Observer observer : observers.values()) {
+            observer.onCompleted();
+        }
+    }
+
+    @Override
+    public void onError(Exception e) {
+        for (Observer observer : observers.values()) {
+            observer.onError(e);
+        }
+    }
+
+    @Override
+    public void onNext(T args) {
+        currentValue.set(args);
+        for (Observer observer : observers.values()) {
+            observer.onNext(args);
+        }
+    }
+
+    public static class UnitTest {
+
+        private final Exception testException = new Exception();
+
+        @Test
+        public void testThatObserverReceivesDefaultValueIfNothingWasPublished() {
+            BehaviorSubject subject = BehaviorSubject.createWithDefaultValue("default");
+
+            @SuppressWarnings("unchecked")
+            Observer aObserver = mock(Observer.class);
+            subject.subscribe(aObserver);
+
+            subject.onNext("one");
+            subject.onNext("two");
+            subject.onNext("three");
+
+            assertReceivedAllEvents(aObserver);
+        }
+
+        private void assertReceivedAllEvents(Observer aObserver) {
+            verify(aObserver, times(1)).onNext("default");
+            verify(aObserver, times(1)).onNext("one");
+            verify(aObserver, times(1)).onNext("two");
+            verify(aObserver, times(1)).onNext("three");
+            verify(aObserver, Mockito.never()).onError(testException);
+            verify(aObserver, Mockito.never()).onCompleted();
+        }
+
+        @Test
+        public void testThatObserverDoesNotReceiveDefaultValueIfSomethingWasPublished() {
+            BehaviorSubject subject = BehaviorSubject.createWithDefaultValue("default");
+
+            subject.onNext("one");
+
+            @SuppressWarnings("unchecked")
+            Observer aObserver = mock(Observer.class);
+            subject.subscribe(aObserver);
+
+            subject.onNext("two");
+            subject.onNext("three");
+
+            assertDidNotReceiveTheDefaultValue(aObserver);
+        }
+
+        private void assertDidNotReceiveTheDefaultValue(Observer aObserver) {
+            verify(aObserver, Mockito.never()).onNext("default");
+            verify(aObserver, times(1)).onNext("one");
+            verify(aObserver, times(1)).onNext("two");
+            verify(aObserver, times(1)).onNext("three");
+            verify(aObserver, Mockito.never()).onError(testException);
+            verify(aObserver, Mockito.never()).onCompleted();
+        }
+
+        @Test
+        public void testCompleted() {
+            BehaviorSubject subject = BehaviorSubject.createWithDefaultValue("default");
+
+            @SuppressWarnings("unchecked")
+            Observer aObserver = mock(Observer.class);
+            subject.subscribe(aObserver);
+
+            subject.onNext("one");
+            subject.onCompleted();
+
+            assertCompletedObserver(aObserver);
+        }
+
+        private void assertCompletedObserver(Observer aObserver)
+        {
+            verify(aObserver, times(1)).onNext("default");
+            verify(aObserver, times(1)).onNext("one");
+            verify(aObserver, Mockito.never()).onError(any(Exception.class));
+            verify(aObserver, times(1)).onCompleted();
+        }
+
+        @Test
+        public void testCompletedAfterError() {
+            BehaviorSubject subject = BehaviorSubject.createWithDefaultValue("default");
+
+            @SuppressWarnings("unchecked")
+            Observer aObserver = mock(Observer.class);
+            subject.subscribe(aObserver);
+
+            subject.onNext("one");
+            subject.onError(testException);
+            subject.onNext("two");
+            subject.onCompleted();
+
+            assertErrorObserver(aObserver);
+        }
+
+        private void assertErrorObserver(Observer aObserver)
+        {
+            verify(aObserver, times(1)).onNext("default");
+            verify(aObserver, times(1)).onNext("one");
+            verify(aObserver, times(1)).onError(testException);
+        }
+        
+        @Test
+        public void testUnsubscribe()
+        {
+            UnsubscribeTester.test(new Func0>()
+            {
+                @Override
+                public BehaviorSubject call()
+                {
+                    return BehaviorSubject.createWithDefaultValue("default");
+                }
+            }, new Action1>()
+            {
+                @Override
+                public void call(BehaviorSubject DefaultSubject)
+                {
+                    DefaultSubject.onCompleted();
+                }
+            }, new Action1>()
+            {
+                @Override
+                public void call(BehaviorSubject DefaultSubject)
+                {
+                    DefaultSubject.onError(new Exception());
+                }
+            }, new Action1>()
+            {
+                @Override
+                public void call(BehaviorSubject DefaultSubject)
+                {
+                    DefaultSubject.onNext("one");
+                }
+            });
+        }
+    }
+}