Skip to content

Commit

Permalink
Removed the explicit locking for interruptedState.
Browse files Browse the repository at this point in the history
  • Loading branch information
andersio committed Nov 26, 2016
1 parent d1155d6 commit 712afcc
Showing 1 changed file with 31 additions and 18 deletions.
49 changes: 31 additions & 18 deletions Sources/Signal.swift
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ public final class Signal<Value, Error: Swift.Error> {
state = Atomic(SignalState())

/// Holds the final signal state captured by an `interrupted` event. If it
/// is set, the Signal should interrupt as soon as possible.
let interruptedState = Atomic<SignalState<Value, Error>?>(nil)
/// is set, the Signal should interrupt as soon as possible. Implicitly
/// protected by `state` and `sendLock`.
var interruptedState: SignalState<Value, Error>? = nil

/// Used to track if the signal has terminated. Protected by `sendLock`.
var terminated = false
Expand All @@ -56,6 +57,15 @@ public final class Signal<Value, Error: Swift.Error> {
return
}

@inline(__always)
func interrupt(_ observers: Bag<Observer>) {
for observer in observers {
observer.sendInterrupted()
}
terminated = true
interruptedState = nil
}

if case .interrupted = event {
// Recursive events are generally disallowed. But `interrupted` is kind
// of a special snowflake, since it can inadvertently be sent by
Expand All @@ -66,14 +76,23 @@ public final class Signal<Value, Error: Swift.Error> {
// the disposal would be delegated to the current sender, or
// occasionally one of the senders waiting on `sendLock`.
if let state = signal.state.swap(nil) {
interruptedState.value = state
// Writes to `interruptedState` are implicitly synchronized. So we do
// not need to guard it with locks.
//
// Specifically, senders serialized by `sendLock` can react to and
// clear `interruptedState` only if they see the write made below.
// This write can happen only once, since `state` being swapped with
// `nil` is a point of no return.
//
// Even in a case that both previous sender and its successor sees
// the write, the delegation of interruption is still bound to the
// `sendLock`, and we also acively block all events coming after
// the signal termination using `terminated`.
interruptedState = state

if sendLock.try() {
if !terminated, let state = interruptedState.swap(nil) {
for observer in state.observers {
observer.sendInterrupted()
}
terminated = true
if !terminated, let state = interruptedState {
interrupt(state.observers)
}
sendLock.unlock()
signal.generatorDisposable?.dispose()
Expand All @@ -94,11 +113,8 @@ public final class Signal<Value, Error: Swift.Error> {

// Check if a downstream consumer or a concurrent sender has
// interrupted the signal.
if !isTerminating, let state = interruptedState.swap(nil) {
for observer in state.observers {
observer.sendInterrupted()
}
terminated = true
if !isTerminating, let state = interruptedState {
interrupt(state.observers)
shouldDispose = true
}

Expand All @@ -114,15 +130,12 @@ public final class Signal<Value, Error: Swift.Error> {
// `interruptedState` should always be visible after `sendLock` is
// released. So we check it again and handle the interruption if
// it has not been taken over.
if !shouldDispose && !terminated && !isTerminating, let state = interruptedState.swap(nil) {
if !shouldDispose && !terminated && !isTerminating, let state = interruptedState {
sendLock.lock()

if !terminated {
for observer in state.observers {
observer.sendInterrupted()
}
interrupt(state.observers)
shouldDispose = true
terminated = true
}

sendLock.unlock()
Expand Down

0 comments on commit 712afcc

Please sign in to comment.