Skip to content

Commit

Permalink
Introduced SignalStatus. sendLock is now stored by the class.
Browse files Browse the repository at this point in the history
  • Loading branch information
andersio committed Dec 1, 2016
1 parent 2bb7d7a commit 60bbd0c
Showing 1 changed file with 48 additions and 38 deletions.
86 changes: 48 additions & 38 deletions Sources/Signal.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,14 @@ public final class Signal<Value, Error: Swift.Error> {
/// The state of the signal. `nil` if the signal has terminated.
private let state: Atomic<SignalState<Value, Error>?>

/// Holds the status of the signal. Implicitly protected by `state` and
/// `sendLock`.
private var status: SignalStatus<Value, Error>

/// Used to ensure that events are serialized during delivery to observers.
private let sendLock: NSLock


/// Initialize a Signal that will immediately invoke the given generator,
/// then forward events sent to the given observer.
///
Expand All @@ -39,25 +47,10 @@ public final class Signal<Value, Error: Swift.Error> {
/// that will act as an event emitter for the signal.
public init(_ generator: (Observer) -> Disposable?) {
state = Atomic(SignalState())

/// Holds the final signal state captured by a termination event. If it
/// is set, the Signal should terminate as soon as possible. Implicitly
/// protected by `state` and `sendLock`.
var terminationState: SignalTerminationState<Value, Error>? = nil

/// Used to track if the signal has terminated. Protected by `sendLock`.
var terminated = false

/// Used to ensure that events are serialized during delivery to observers.
let sendLock = NSLock()
status = .green
sendLock = NSLock()
sendLock.name = "org.reactivecocoa.ReactiveSwift.Signal"

func terminate(with state: SignalTerminationState<Value, Error>) {
state.send()
terminated = true
terminationState = nil
}

let observer = Observer { [weak self] event in
guard let signal = self else {
return
Expand All @@ -83,66 +76,69 @@ public final class Signal<Value, Error: Swift.Error> {
// the disposal would be delegated to the current sender, or
// occasionally one of the senders waiting on `sendLock`.
if let state = signal.state.swap(nil) {
terminationState = SignalTerminationState(state: state, event: event)
signal.status = .yellow(SignalTerminationSnapshot(state: state, event: event))

// Writes to `terminationState` are implicitly synchronized. So we do
// Writes to `terminationSnapshot` are implicitly synchronized. So we do
// not need to guard it with locks.
//
// Specifically, senders serialized by `sendLock` can react to and
// clear `terminationState` only if they see the write made below.
// clear `terminationSnapshot` only if they see the write made below.
// The write can happen only once, since `state` being swapped with
// `nil` is a point of no return.
//
// Even in the case that both a previous sender and its successor see
// the write (the `terminationState` check before & after the unlock
// the write (the `terminationSnapshot` check before & after the unlock
// of `sendLock`), the senders are still bound to the `sendLock`.
// So whichever sender loses the battle of acquring `sendLock` is
// guaranteed to be blocked.
if sendLock.try() {
if !terminated, let state = terminationState {
terminate(with: state)
if signal.sendLock.try() {
if case let .yellow(state) = signal.status {
state.send()
signal.status = .red
}
sendLock.unlock()
signal.sendLock.unlock()
signal.generatorDisposable?.dispose()
}
}
} else if let observers = signal.state.value?.observers {
var shouldDispose = false

sendLock.lock()
signal.sendLock.lock()

if !terminated {
if case .green = signal.status {
for observer in observers {
observer.action(event)
}

// Check if a downstream consumer or a concurrent sender has
// interrupted the signal.
if let state = terminationState {
terminate(with: state)
if case let .yellow(state) = signal.status {
state.send()
signal.status = .red
shouldDispose = true
}
}

sendLock.unlock()
signal.sendLock.unlock()

// Based on the implicit memory order, any updates to the
// `terminationState` should always be visible after `sendLock` is
// `terminationSnapshot` should always be visible after `sendLock` is
// released. So we check it again and handle the termination if
// it has not been taken over.
if !shouldDispose && !terminated, let state = terminationState {
sendLock.lock()
if !shouldDispose, case let .yellow(state) = signal.status {
signal.sendLock.lock()

// `terminated` before acquring the lock could be a false negative,
// since it might race against other concurrent senders until the
// lock acquisition above succeeds. So we have to check again if the
// signal is really still alive.
if !terminated {
terminate(with: state)
// termination still has not been handled by others.
if case .yellow = signal.status {
state.send()
signal.status = .red
shouldDispose = true
}

sendLock.unlock()
signal.sendLock.unlock()
}

if shouldDispose {
Expand Down Expand Up @@ -235,12 +231,25 @@ public final class Signal<Value, Error: Swift.Error> {
}
}

/// The status of a `Signal`.
private enum SignalStatus<Value, Error: Swift.Error> {
/// The `Signal` is alive.
case green

/// The `Signal` has received a termination event, and is about to be
/// terminated.
case yellow(SignalTerminationSnapshot<Value, Error>)

/// The `Signal` has terminated.
case red
}

/// A snapshot of the state of a `Signal` when it receives a termination event.
///
/// - note: The snapshot must be a reference type, as the total size of all
/// stored properties might span beyond a cache line, resulting in
/// corrupted states being observed by the event emitter.
private final class SignalTerminationState<Value, Error: Swift.Error> {
private final class SignalTerminationSnapshot<Value, Error: Swift.Error> {
private let state: SignalState<Value, Error>
private let event: Event<Value, Error>

Expand All @@ -255,6 +264,7 @@ private final class SignalTerminationState<Value, Error: Swift.Error> {
}

/// Send the termination event to the observers.
@inline(__always)
func send() {
for observer in state.observers {
observer.action(event)
Expand Down

0 comments on commit 60bbd0c

Please sign in to comment.