Skip to content

Commit

Permalink
Merge pull request #238 from benjchristensen/pull-234-merge-ObserveOn
Browse files Browse the repository at this point in the history
ScheduledObserver/ObserveOn - Manual Merge of Pull 234
  • Loading branch information
benjchristensen committed Apr 18, 2013
2 parents 34e097b + ea3cbbf commit ee8f4a5
Show file tree
Hide file tree
Showing 3 changed files with 223 additions and 21 deletions.
46 changes: 41 additions & 5 deletions rxjava-core/src/main/java/rx/operators/OperationObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,22 @@
*/
package rx.operators;

import static org.mockito.Matchers.*;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.concurrency.Schedulers;
import rx.util.functions.Action0;
import rx.util.functions.Func1;

public class OperationObserveOn {
Expand Down Expand Up @@ -60,15 +65,46 @@ public void testObserveOn() {
Observer<Integer> observer = mock(Observer.class);
Observable.create(observeOn(Observable.toObservable(1, 2, 3), scheduler)).subscribe(observer);

verify(scheduler, times(4)).schedule(any(Action0.class));
verifyNoMoreInteractions(scheduler);

verify(observer, times(1)).onNext(1);
verify(observer, times(1)).onNext(2);
verify(observer, times(1)).onNext(3);
verify(observer, times(1)).onCompleted();
}

@Test
@SuppressWarnings("unchecked")
public void testOrdering() throws InterruptedException {
Observable<String> obs = Observable.from("one", null, "two", "three", "four");

Observer<String> observer = mock(Observer.class);

InOrder inOrder = inOrder(observer);

final CountDownLatch completedLatch = new CountDownLatch(1);
doAnswer(new Answer<Void>() {

@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
completedLatch.countDown();
return null;
}
}).when(observer).onCompleted();

obs.observeOn(Schedulers.threadPoolForComputation()).subscribe(observer);

if (!completedLatch.await(1000, TimeUnit.MILLISECONDS)) {
fail("timed out waiting");
}

inOrder.verify(observer, times(1)).onNext("one");
inOrder.verify(observer, times(1)).onNext(null);
inOrder.verify(observer, times(1)).onNext("two");
inOrder.verify(observer, times(1)).onNext("three");
inOrder.verify(observer, times(1)).onNext("four");
inOrder.verify(observer, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

}

}
64 changes: 48 additions & 16 deletions rxjava-core/src/main/java/rx/operators/ScheduledObserver.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/**
* Copyright 2013 Netflix, Inc.
*
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*
* http://www.apache.org/licenses/LICENSE-2.0
*
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
Expand All @@ -15,45 +15,77 @@
*/
package rx.operators;

import rx.Notification;
import rx.Observer;
import rx.Scheduler;
import rx.util.functions.Action0;

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;

/* package */class ScheduledObserver<T> implements Observer<T> {
private final Observer<T> underlying;
private final Scheduler scheduler;

private final ConcurrentLinkedQueue<Notification<T>> queue = new ConcurrentLinkedQueue<Notification<T>>();
private final AtomicInteger counter = new AtomicInteger(0);

public ScheduledObserver(Observer<T> underlying, Scheduler scheduler) {
this.underlying = underlying;
this.scheduler = scheduler;
}

@Override
public void onCompleted() {
scheduler.schedule(new Action0() {
@Override
public void call() {
underlying.onCompleted();
}
});
enqueue(new Notification<T>());
}

@Override
public void onError(final Exception e) {
scheduler.schedule(new Action0() {
@Override
public void call() {
underlying.onError(e);
}
});
enqueue(new Notification<T>(e));
}

@Override
public void onNext(final T args) {
enqueue(new Notification<T>(args));
}

private void enqueue(Notification<T> notification) {
int count = counter.getAndIncrement();

queue.offer(notification);

if (count == 0) {
processQueue();
}
}

private void processQueue() {
scheduler.schedule(new Action0() {
@Override
public void call() {
underlying.onNext(args);
Notification<T> not = queue.poll();

switch (not.getKind()) {
case OnNext:
underlying.onNext(not.getValue());
break;
case OnError:
underlying.onError(not.getException());
break;
case OnCompleted:
underlying.onCompleted();
break;
default:
throw new IllegalStateException("Unknown kind of notification " + not);

}

int count = counter.decrementAndGet();
if (count > 0) {
scheduler.schedule(this);
}

}
});
}
Expand Down
134 changes: 134 additions & 0 deletions rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.junit.Test;

Expand Down Expand Up @@ -395,4 +396,137 @@ public Subscription call(Scheduler scheduler, String state) {
}
}

@Test
public void testConcurrentOnNextFailsValidation() throws InterruptedException {

final int count = 10;
final CountDownLatch latch = new CountDownLatch(count);
Observable<String> o = Observable.create(new Func1<Observer<String>, Subscription>() {

@Override
public Subscription call(final Observer<String> observer) {
for (int i = 0; i < count; i++) {
final int v = i;
new Thread(new Runnable() {

@Override
public void run() {
observer.onNext("v: " + v);

latch.countDown();
}
}).start();
}
return Subscriptions.empty();
}
});

ConcurrentObserverValidator<String> observer = new ConcurrentObserverValidator<String>();
// this should call onNext concurrently
o.subscribe(observer);

if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) {
fail("timed out");
}

if (observer.error.get() == null) {
fail("We expected error messages due to concurrency");
}
}

@Test
public void testObserveOn() throws InterruptedException {

Observable<String> o = Observable.from("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten");

ConcurrentObserverValidator<String> observer = new ConcurrentObserverValidator<String>();

o.observeOn(Schedulers.threadPoolForComputation()).subscribe(observer);

if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) {
fail("timed out");
}

if (observer.error.get() != null) {
observer.error.get().printStackTrace();
fail("Error: " + observer.error.get().getMessage());
}
}

@Test
public void testSubscribeOnNestedConcurrency() throws InterruptedException {

Observable<String> o = Observable.from("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten")
.mapMany(new Func1<String, Observable<String>>() {

@Override
public Observable<String> call(final String v) {
return Observable.create(new Func1<Observer<String>, Subscription>() {

@Override
public Subscription call(final Observer<String> observer) {
observer.onNext("value_after_map-" + v);
observer.onCompleted();
return Subscriptions.empty();
}
}).subscribeOn(Schedulers.newThread()); // subscribe on a new thread
}
});

ConcurrentObserverValidator<String> observer = new ConcurrentObserverValidator<String>();

o.subscribe(observer);

if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) {
fail("timed out");
}

if (observer.error.get() != null) {
observer.error.get().printStackTrace();
fail("Error: " + observer.error.get().getMessage());
}
}

/**
* Used to determine if onNext is being invoked concurrently.
*
* @param <T>
*/
private static class ConcurrentObserverValidator<T> implements Observer<T> {

final AtomicInteger concurrentCounter = new AtomicInteger();
final AtomicReference<Exception> error = new AtomicReference<Exception>();
final CountDownLatch completed = new CountDownLatch(1);

@Override
public void onCompleted() {
completed.countDown();
}

@Override
public void onError(Exception e) {
completed.countDown();
error.set(e);
}

@Override
public void onNext(T args) {
int count = concurrentCounter.incrementAndGet();
System.out.println("ConcurrentObserverValidator.onNext: " + args);
if (count > 1) {
onError(new RuntimeException("we should not have concurrent execution of onNext"));
}
try {
try {
// take some time so other onNext calls could pile up (I haven't yet thought of a way to do this without sleeping)
Thread.sleep(50);
} catch (InterruptedException e) {
// ignore
}
} finally {
concurrentCounter.decrementAndGet();
}
}

}
}

0 comments on commit ee8f4a5

Please sign in to comment.