Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix concurrency bug in ScheduledObserver #269

Merged
merged 1 commit into from
May 11, 2013

Conversation

benjchristensen
Copy link
Member

This is a followup to 1fa6ae3 that fixed one issue (concurrency) and created another (broke Rx contract by allowing concurrent execution of onNext).

I have reverted back to the previous implementatio and then attempted to fix the concurrency issue again.

I think it ended up being a simple fix … just re-ordering the enqueue method to remove the race-condition between the logic protected by the AtomicInteger and adding to the queue.

It's not an atomic operation (adding then processing) so we need to just add to the queue and treat it as an async data structure and keep the AtomicInteger portion to only protecting the "process or not process" logic.

        // this must happen before 'counter' is used to provide synchronization between threads
        queue.offer(notification);

This may still have issues but it's now working in all of my concurrency tests (the ones that broken with the original and then my modified version). The tests are not easy to build unit tests out of as they require running for many seconds and non-deterministically causing a race condition so I have not yet spend the time to try and figure out a deterministic unit test hence them not being committed.

This is a followup to ReactiveX@1fa6ae3 that fixed one issue (concurrency) and created another (broke Rx contract by allowing concurrent execution of onNext).

I have reverted back to the previous implementatio and then attempted to fix the concurrency issue again.

I think it ended up being a simple fix … just re-ordering the `enqueue` method to remove the race-condition between the logic protected by the AtomicInteger and adding to the queue.

It's not an atomic operation (adding then processing) so we need to just add to the queue and treat it as an async data structure and keep the AtomicInteger portion to only protecting the "process or not process" logic.

```java
        // this must happen before 'counter' is used to provide synchronization between threads
        queue.offer(notification);
```

This may still have issues but it's now working in all of my concurrency tests (the ones that broken with the original and then my modified version). The tests are not easy to build unit tests out of as they require running for many seconds and non-deterministically causing a race condition so I have not yet spend the time to try and figure out a deterministic unit test hence them not being committed.
@cloudbees-pull-request-builder

RxJava-pull-requests #141 SUCCESS
This pull request looks good

benjchristensen added a commit that referenced this pull request May 11, 2013
Fix concurrency bug in ScheduledObserver
@benjchristensen benjchristensen merged commit 76592d6 into ReactiveX:master May 11, 2013
neerajrj pushed a commit to neerajrj/Hystrix that referenced this pull request Nov 2, 2013
- pick up concurrency fix to ScheduledObserver ReactiveX/RxJava#269
rickbw pushed a commit to rickbw/RxJava that referenced this pull request Jan 9, 2014
Fix concurrency bug in ScheduledObserver
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants