-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Synchronize Observer on OperationMerge #201
Synchronize Observer on OperationMerge #201
Conversation
fixes ReactiveX#200 This is necessary because by definition Merge is subscribing to multiple sequences in parallel and is supposed to serialize them into a single Observable.
RxJava-pull-requests #41 SUCCESS |
The fix looks good. However, the test case does not catch the original problem. First, the test case tests an "Observable.create(merge(o1,o2)" (https://github.com/Netflix/RxJava/pull/201/files#L0R409). The create wraps an AtomicObservableSubscription, which defeats the purpose of checking whether OperationMerge is chronologically well behaved itself. This should be "new Observable(merge(o1,o2), false)" (where false = isTrusted) or "Observable.merge(o1,o2)". This occurs in most of the test cases for OperationMerge. By changing the test case to properly test OperationMerge, the test still didn't fail on the original wrong implementation. I don't have a test case that shows the original problem, without resorting to threads with fixed sleep delays to orchestrate an interleaving. |
The test case in this commit forces concurrent onNext invocations and demonstrates how the changed code no longer allows concurrent onNext calls. When you say "interleaving" perhaps you're not referring to concurrent execution but instead how merge does not sequentially concatenate sequences? If that's what you mean then Note that
whereas
The As for the comments about The isTrusted piece has nothing to do with concurrent execution - the Thus with merge you can have these sequences:
If the two are asynchronously and concurrently executing then they will both begin emitting immediately but onNext will not concurrently execute (which was the bug this fixes). You can then end up with this:
But no longer end up with this with concurrent onNext calls like this which were possible before this commit:
If you're trying to get output like this:
then you want the |
You are right, I am confusing AtomicObservableSubscription with SynchronizedObserver, which indeed refer to two completely different things. Sorry for that. I do not mean the concat operation. With interleaving I meant the interleaving of the calls to onNext and countDown in the test case. When I revert the change on line https://github.com/Netflix/RxJava/pull/201/files#L0L121 and run the test case, the test case doesn't fail:
|
- return AtomicSubscription not MergeSubscription which I was accidentally still returning - try/finally in unit test so threads are released even if assertion is thrown
That's rather odd as I see the unit test doing what I expect: With this code: sequences.subscribe(new ParentObserver(actualObserver)); I get the failure:
With this code: sequences.subscribe(new ParentObserver(synchronizedObserver)); the unit test passes.
The I use a CountDownLatch in the I then use the If onNext is not synchronized then both threads will block on Once I perform the assertion the Does this unit test correctly test the bug you originally were reporting about concurrent execution of onNext, or is there something else that I am missing? I just committed a few tweaks to improve the code. On line 136 it is now: sequences.subscribe(new ParentObserver(synchronizedObserver)); If I change this to the following the unit test breaks for me: sequences.subscribe(new ParentObserver(actualObserver)); |
RxJava-pull-requests #43 SUCCESS |
Just to re-iterate: I think the fix is good. This is now about the test case included in the fix.
I don't always see FAILED, sometimes the test does succeed.
Output:
The test succeeding while it shouldn't doesn't happen always. Therefore I think that the test doesn't account for this possible interleaving of the different threads executing:
|
This was on an Ubuntu machine. On a mac book I needed more samples for this result:
Something to consider when trying to reproduce the results. I don't have detailed specs of the test machines available at the moment. |
Hmm, not sure yet what's wrong the test case but obviously it is not deterministic. Thank you for the detailed testing - I'll explore what I can do. If you can offer a better unit test that is deterministic (not reliant on Thread.sleep) for this I'll gladly accept it. |
I'm holding off committing until I understand the non-determinism and can fix it ... unless you would prefer I just commit the fix and worry about the unit test later. |
The merge operation is also used to implement mapMany (https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/operators/OperationMap.java#L71), which is even more important to get right. I haven't looked into the impact of the proposed fix of merge on mapMany. |
That should work as expected since mapMany is just a composition of |
- not sure of a way other than putting Thread.sleep in here to give time after each CountDownLatch triggers for the process scheduler to execute the next line of each thread See ReactiveX#201 for more information.
RxJava-pull-requests #70 SUCCESS |
I see in the code how the non-deterministic behavior could occur. I'm not quite sure of a way to solve it other than using Thread.sleep to hopefully give each thread time to complete the I can't use another CountDownLatch for after the call - as we want it in the middle of onNext. I can't use it inside onNext as the whole point of this test is that one of them should be blocked on the synchronized onNext call, thus I have no idea how to know if a thread has actually invoked but is blocked on a method ... other than a thread dump. I can't replicate the non-deterministic behavior on my machine so @thegeez could you see if the fix I just committed helps on your machine? I'm going to proceed with the merge instead of waiting as I want to get this fix in since it's passing unit tests (and manual review by both you and me) and if it's still non-deterministic on your machine or elsewhere we can continue to figure out a better way to do this test. |
…ization Synchronize Observer on OperationMerge
The fix in benjchristensen@169e7e0 indeed removes the false positives I saw previously for the test case. |
Great, thank you for confirming. |
- not sure of a way other than putting Thread.sleep in here to give time after each CountDownLatch triggers for the process scheduler to execute the next line of each thread See ReactiveX#201 for more information.
…-synchronization Synchronize Observer on OperationMerge
fix for Merge serialization bug reported in #200
This is necessary because by definition Merge is subscribing to multiple sequences in parallel and is supposed to serialize them into a single Observable.