Skip to content

Commit

Permalink
Split the event emitter implementation into two.
Browse files Browse the repository at this point in the history
  • Loading branch information
andersio committed Dec 2, 2016
1 parent ccba3f5 commit b12d6bc
Showing 1 changed file with 109 additions and 92 deletions.
201 changes: 109 additions & 92 deletions Sources/Signal.swift
Original file line number Diff line number Diff line change
Expand Up @@ -52,127 +52,144 @@ public final class Signal<Value, Error: Swift.Error> {
/// Used to track if the signal has terminated. Protected by `sendLock`.
var terminated = false

/// Used to ensure that events are serialized during delivery to observers.
let sendLock: NSLock?
/// The event emitter of the signal that would be passed to `generator`.
let observer: Observer

switch serialization {
case .inherit:
// If the `Signal` inherits serialization from its owner, the `sendLock`
// is elided.
//
// The recursive interrupt handling under `inherit` would be disabled.
//
// While the only asychronous or recursive source of `interrupted` is
// `SignalProducer`'s interruption mechanism, its produced signal still
// uses the default serialization at the moment, and it is also where the
// `interrupted` event would be sent to.
// The recursive `interrupted` event handling would also be elided. So
// `SignalProducer` should not use `inherit` for the produced signal in
// `startWithSignal`.
//
// https://github.com/ReactiveCocoa/ReactiveSwift/pull/129
sendLock = nil

case .serialize:
sendLock = NSLock()
sendLock!.name = "org.reactivecocoa.ReactiveSwift.Signal"
}

let observer = Observer { [weak self] event in
guard let signal = self else {
return
}

@inline(__always)
func interrupt(_ observers: Bag<Observer>) {
for observer in observers {
observer.sendInterrupted()
observer = Observer { [weak self] event in
guard let signal = self else {
return
}
terminated = true
interruptedState = nil
}

if case .interrupted = event {
// Recursive events are generally disallowed. But `interrupted` is kind
// of a special snowflake, since it can inadvertently be sent by
// downstream consumers.
//
// So we would treat `interrupted` events specially. If it happens
// to occur while the `sendLock` is acquired, the observer call-out and
// the disposal would be delegated to the current sender, or
// occasionally one of the senders waiting on `sendLock`.
if let state = signal.state.swap(nil) {
// Writes to `interruptedState` are implicitly synchronized. So we do
// not need to guard it with locks.
//
// Specifically, senders serialized by `sendLock` can react to and
// clear `interruptedState` only if they see the write made below.
// The write can happen only once, since `state` being swapped with
// `nil` is a point of no return.
//
// Even in the case that both a previous sender and its successor see
// the write (the `interruptedState` check before & after the unlock
// of `sendLock`), the senders are still bound to the `sendLock`.
// So whichever sender loses the battle of acquring `sendLock` is
// guaranteed to be blocked.
interruptedState = state

if sendLock?.try() ?? true {
if !terminated, let state = interruptedState {
interrupt(state.observers)
}
sendLock?.unlock()
signal.generatorDisposable?.dispose()
}
}
} else {
let isTerminating = event.isTerminating

if let observers = (isTerminating ? signal.state.swap(nil)?.observers : signal.state.value?.observers) {
var shouldDispose = false

sendLock?.lock()

if !terminated {
for observer in observers {
observer.action(event)
}

// Check if a downstream consumer or a concurrent sender has
// interrupted the signal.
if !isTerminating, let state = interruptedState {
interrupt(state.observers)
shouldDispose = true
}

if isTerminating {
terminated = true
shouldDispose = true
signal.generatorDisposable?.dispose()
}
}
}
}

sendLock?.unlock()
case .serialize:
/// Used to ensure that events are serialized during delivery to observers.
let sendLock = NSLock()
sendLock.name = "org.reactivecocoa.ReactiveSwift.Signal"

// Based on the implicit memory order, any updates to the
// `interruptedState` should always be visible after `sendLock` is
// released. So we check it again and handle the interruption if
// it has not been taken over.
if !shouldDispose && !terminated && !isTerminating, let state = interruptedState {
sendLock?.lock()
observer = Observer { [weak self] event in
guard let signal = self else {
return
}

@inline(__always)
func interrupt(_ observers: Bag<Observer>) {
for observer in observers {
observer.sendInterrupted()
}
terminated = true
interruptedState = nil
}

if case .interrupted = event {
// Recursive events are generally disallowed. But `interrupted` is kind
// of a special snowflake, since it can inadvertently be sent by
// downstream consumers.
//
// So we would treat `interrupted` events specially. If it happens
// to occur while the `sendLock` is acquired, the observer call-out and
// the disposal would be delegated to the current sender, or
// occasionally one of the senders waiting on `sendLock`.
if let state = signal.state.swap(nil) {
// Writes to `interruptedState` are implicitly synchronized. So we do
// not need to guard it with locks.
//
// Specifically, senders serialized by `sendLock` can react to and
// clear `interruptedState` only if they see the write made below.
// The write can happen only once, since `state` being swapped with
// `nil` is a point of no return.
//
// Even in the case that both a previous sender and its successor see
// the write (the `interruptedState` check before & after the unlock
// of `sendLock`), the senders are still bound to the `sendLock`.
// So whichever sender loses the battle of acquring `sendLock` is
// guaranteed to be blocked.
interruptedState = state

if sendLock.try() {
if !terminated, let state = interruptedState {
interrupt(state.observers)
}
sendLock.unlock()
signal.generatorDisposable?.dispose()
}
}
} else {
let isTerminating = event.isTerminating

if let observers = (isTerminating ? signal.state.swap(nil)?.observers : signal.state.value?.observers) {
var shouldDispose = false

sendLock.lock()

// `terminated` before acquring the lock could be a false negative,
// since it might race against other concurrent senders until the
// lock acquisition above succeeds. So we have to check again if the
// signal is really still alive.
if !terminated {
interrupt(state.observers)
shouldDispose = true
for observer in observers {
observer.action(event)
}

// Check if a downstream consumer or a concurrent sender has
// interrupted the signal.
if !isTerminating, let state = interruptedState {
interrupt(state.observers)
shouldDispose = true
}

if isTerminating {
terminated = true
shouldDispose = true
}
}

sendLock?.unlock()
}
sendLock.unlock()

// Based on the implicit memory order, any updates to the
// `interruptedState` should always be visible after `sendLock` is
// released. So we check it again and handle the interruption if
// it has not been taken over.
if !shouldDispose && !terminated && !isTerminating, let state = interruptedState {
sendLock.lock()

// `terminated` before acquring the lock could be a false negative,
// since it might race against other concurrent senders until the
// lock acquisition above succeeds. So we have to check again if the
// signal is really still alive.
if !terminated {
interrupt(state.observers)
shouldDispose = true
}

if shouldDispose {
// Dispose only after notifying observers, so disposal
// logic is consistently the last thing to run.
signal.generatorDisposable?.dispose()
sendLock.unlock()
}

if shouldDispose {
// Dispose only after notifying observers, so disposal
// logic is consistently the last thing to run.
signal.generatorDisposable?.dispose()
}
}
}
}
Expand Down

0 comments on commit b12d6bc

Please sign in to comment.