Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rxcpp: Fix data race in composite_subscription #481

Merged
merged 1 commit into from
Feb 13, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 102 additions & 7 deletions Rx/v2/src/rxcpp/rx-subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,14 @@ class subscription : public subscription_base
std::terminate();
}
}

explicit subscription(std::shared_ptr<base_subscription_state> s)
: state(std::move(s))
{
if (!state) {
std::terminate();
}
}
public:

subscription()
Expand Down Expand Up @@ -178,9 +186,23 @@ class subscription : public subscription_base
weak_state_type get_weak() {
return state;
}

// Atomically promote weak subscription to strong.
// Calls std::terminate if w has already expired.
static subscription lock(weak_state_type w) {
return subscription(w);
}

// Atomically try to promote weak subscription to strong.
// Returns an empty maybe<> if w has already expired.
static rxu::maybe<subscription> maybe_lock(weak_state_type w) {
auto strong_subscription = w.lock();
if (!strong_subscription) {
return rxu::detail::maybe<subscription>{};
} else {
return rxu::detail::maybe<subscription>{subscription{std::move(strong_subscription)}};
}
}
};

inline bool operator<(const subscription& lhs, const subscription& rhs) {
Expand Down Expand Up @@ -223,8 +245,14 @@ class composite_subscription_inner
typedef subscription::weak_state_type weak_subscription;
struct composite_subscription_state : public std::enable_shared_from_this<composite_subscription_state>
{
// invariant: cannot access this data without the lock held.
std::set<subscription> subscriptions;
// double checked locking:
// issubscribed must be loaded again after each lock acquisition.
// invariant:
// never call subscription::unsubscribe with lock held.
std::mutex lock;
// invariant: transitions from 'true' to 'false' exactly once, at any time.
std::atomic<bool> issubscribed;

~composite_subscription_state()
Expand All @@ -242,41 +270,108 @@ class composite_subscription_inner
{
}

// Atomically add 's' to the set of subscriptions.
//
// If unsubscribe() has already occurred, this immediately
// calls s.unsubscribe().
//
// cs.unsubscribe() [must] happens-before s.unsubscribe()
//
// Due to the un-atomic nature of calling 's.unsubscribe()',
// it is possible to observe the unintuitive
// add(s)=>s.unsubscribe() prior
// to any of the unsubscribe()=>sN.unsubscribe().
inline weak_subscription add(subscription s) {
if (!issubscribed) {
if (!issubscribed) { // load.acq [seq_cst]
s.unsubscribe();
} else if (s.is_subscribed()) {
std::unique_lock<decltype(lock)> guard(lock);
subscriptions.insert(s);
if (!issubscribed) { // load.acq [seq_cst]
// unsubscribe was called concurrently.
guard.unlock();
// invariant: do not call unsubscribe with lock held.
s.unsubscribe();
} else {
subscriptions.insert(s);
}
}
return s.get_weak();
}

// Atomically remove 'w' from the set of subscriptions.
//
// This does nothing if 'w' was already previously removed,
// or refers to an expired value.
inline void remove(weak_subscription w) {
if (issubscribed && !w.expired()) {
auto s = subscription::lock(w);
if (issubscribed) { // load.acq [seq_cst]
rxu::maybe<subscription> maybe_subscription = subscription::maybe_lock(w);

if (maybe_subscription.empty()) {
// Do nothing if the subscription has already expired.
return;
}
iam marked this conversation as resolved.
Show resolved Hide resolved

std::unique_lock<decltype(lock)> guard(lock);
subscriptions.erase(std::move(s));
// invariant: subscriptions must be accessed under the lock.

if (issubscribed) { // load.acq [seq_cst]
subscription& s = maybe_subscription.get();
subscriptions.erase(std::move(s));
} // else unsubscribe() was called concurrently; this becomes a no-op.
}
}

// Atomically clear all subscriptions that were observably added
// (and not subsequently observably removed).
//
// Un-atomically call unsubscribe on those subscriptions.
//
// forall subscriptions in {add(s1),add(s2),...}
// - {remove(s3), remove(s4), ...}:
// cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
//
// cs.unsubscribe() observed-before cs.clear ==> do nothing.
inline void clear() {
if (issubscribed) {
if (issubscribed) { // load.acq [seq_cst]
std::unique_lock<decltype(lock)> guard(lock);

if (!issubscribed) { // load.acq [seq_cst]
// unsubscribe was called concurrently.
return;
}

std::set<subscription> v(std::move(subscriptions));
// invariant: do not call unsubscribe with lock held.
guard.unlock();
std::for_each(v.begin(), v.end(),
[](const subscription& s) {
s.unsubscribe(); });
}
}

// Atomically clear all subscriptions that were observably added
// (and not subsequently observably removed).
//
// Un-atomically call unsubscribe on those subscriptions.
//
// Switches to an 'unsubscribed' state, all subsequent
// adds are immediately unsubscribed.
//
// cs.unsubscribe() [must] happens-before
// cs.add(s) ==> s.unsubscribe()
//
// forall subscriptions in {add(s1),add(s2),...}
// - {remove(s3), remove(s4), ...}:
// cs.unsubscribe() || cs.clear() happens before s.unsubscribe()
inline void unsubscribe() {
if (issubscribed.exchange(false)) {
if (issubscribed.exchange(false)) { // cas.acq_rel [seq_cst]
std::unique_lock<decltype(lock)> guard(lock);

// is_subscribed can only transition to 'false' once,
// does not need an extra atomic access here.

std::set<subscription> v(std::move(subscriptions));
// invariant: do not call unsubscribe with lock held.
guard.unlock();
std::for_each(v.begin(), v.end(),
[](const subscription& s) {
Expand Down