diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava-core/src/main/java/rx/operators/OperationMerge.java index eeb1e96407..d5aebe0ad5 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java @@ -441,6 +441,15 @@ public void onNext(String v) { o1.onNextBeingSent.await(); o2.onNextBeingSent.await(); + // I can't think of a way to know for sure that both threads have or are trying to send onNext + // since I can't use a CountDownLatch for "after" onNext since I want to catch during it + // but I can't know for sure onNext is invoked + // so I'm unfortunately reverting to using a Thread.sleep to allow the process scheduler time + // to make sure after o1.onNextBeingSent and o2.onNextBeingSent are hit that the following + // onNext is invoked. + + Thread.sleep(300); + try { // in try/finally so threads are released via latch countDown even if assertion fails assertEquals(1, concurrentCounter.get()); } finally { @@ -541,6 +550,8 @@ public Subscription subscribe(final Observer observer) { public void run() { onNextBeingSent.countDown(); observer.onNext("hello"); + // I can't use a countDownLatch to prove we are actually sending 'onNext' + // since it will block if synchronized and I'll deadlock observer.onCompleted(); }