diff --git a/Sources/Signal.swift b/Sources/Signal.swift index 90eea8862..39d1046a0 100644 --- a/Sources/Signal.swift +++ b/Sources/Signal.swift @@ -24,8 +24,26 @@ public final class Signal { /// when the signal terminates. private var generatorDisposable: Disposable? - /// The state of the signal. `nil` if the signal has terminated. - private let state: Atomic?> + /// The state of the signal. + /// + /// `state` synchronizes using Read-Copy-Update. Reads on the event delivery + /// routine are thus wait-free. But modifications, e.g. inserting observers, + /// still have to be serialized, and are required not to mutate in place. + /// + /// This suits `Signal` as reads to `status` happens on the critical path of + /// event delivery, while observers bag manipulation or termination generally + /// has a constant occurrence. + /// + /// As `SignalState` is a packed object reference (a tagged pointer) that is + /// naturally aligned, reads to are guaranteed to be atomic on all supported + /// hardware architectures of Swift (ARM and x86). + private var state: SignalState + + /// Used to ensure that state updates are serialized. + private let updateLock: NSLock + + /// Used to ensure that events are serialized during delivery to observers. + private let sendLock: NSLock /// Initialize a Signal that will immediately invoke the given generator, /// then forward events sent to the given observer. @@ -38,119 +56,177 @@ public final class Signal { /// - generator: A closure that accepts an implicitly created observer /// that will act as an event emitter for the signal. public init(_ generator: (Observer) -> Disposable?) { - state = Atomic(SignalState()) - - /// Holds the final signal state captured by an `interrupted` event. If it - /// is set, the Signal should interrupt as soon as possible. Implicitly - /// protected by `state` and `sendLock`. - var interruptedState: SignalState? = nil - - /// Used to track if the signal has terminated. Protected by `sendLock`. - var terminated = false - - /// Used to ensure that events are serialized during delivery to observers. - let sendLock = NSLock() - sendLock.name = "org.reactivecocoa.ReactiveSwift.Signal" + state = .alive(AliveState()) + updateLock = NSLock() + updateLock.name = "org.reactivecocoa.ReactiveSwift.Signal.updateLock" + sendLock = NSLock() + sendLock.name = "org.reactivecocoa.ReactiveSwift.Signal.sendLock" let observer = Observer { [weak self] event in guard let signal = self else { return } + // Thread Safety Notes on `Signal.state`. + // + // - Check if the signal is at a specific state. + // + // Read directly. + // + // - Deliver `value` events with the alive state. + // + // `sendLock` must be acquired. + // + // - Replace the alive state with another. + // (e.g. observers bag manipulation) + // + // `updateLock` must be acquired. + // + // - Transition from `alive` to `terminating` as a result of receiving + // a termination event. + // + // `updateLock` must be acquired, and should fail gracefully if the + // signal has terminated. + // + // - Check if the signal is terminating. If it is, invoke `tryTerminate` + // which transitions the state from `terminating` to `terminated`, and + // delivers the termination event. + // + // Both `sendLock` and `updateLock` must be acquired. The check can be + // relaxed, but the state must be checked again after the locks are + // acquired. Fail gracefully if the state has changed since the relaxed + // read, i.e. a concurrent sender has already handled the termination + // event. + // + // Exploiting the relaxation of reads, please note that false positives + // are intentionally allowed in the `terminating` checks below. As a + // result, normal event deliveries need not acquire `updateLock`. + // Nevertheless, this should not cause the termination event being + // sent multiple times, since `tryTerminate` would not respond to false + // positives. + + /// Try to terminate the signal. + /// + /// If the signal is alive or has terminated, it fails gracefully. In + /// other words, calling this method as a result of a false positive + /// `terminating` check is permitted. + /// + /// - note: The `updateLock` would be acquired. + /// + /// - returns: `true` if the attempt succeeds. `false` otherwise. @inline(__always) - func interrupt(_ observers: Bag) { - for observer in observers { - observer.sendInterrupted() + func tryTerminate() -> Bool { + // Acquire `updateLock`. If the termination has still not yet been + // handled, take it over and bump the status to `terminated`. + signal.updateLock.lock() + + if case let .terminating(state) = signal.state { + signal.state = .terminated + signal.updateLock.unlock() + + for observer in state.observers { + observer.action(state.event) + } + + return true } - terminated = true - interruptedState = nil + + signal.updateLock.unlock() + return false } - if case .interrupted = event { - // Recursive events are generally disallowed. But `interrupted` is kind - // of a special snowflake, since it can inadvertently be sent by - // downstream consumers. + if event.isTerminating { + // Recursive events are disallowed for `value` events, but are permitted + // for termination events. Specifically: + // + // - `interrupted` + // It can inadvertently be sent by downstream consumers as part of the + // `SignalProducer` mechanics. // - // So we would treat `interrupted` events specially. If it happens - // to occur while the `sendLock` is acquired, the observer call-out and + // - `completed` + // If a downstream consumer weakly references an object, invocation of + // such consumer may cause a race condition with its weak retain against + // the last strong release of the object. If the `Lifetime` of the + // object is being referenced by an upstream `take(during:)`, a + // signal recursion might occur. + // + // So we would treat termination events specially. If it happens to + // occur while the `sendLock` is acquired, the observer call-out and // the disposal would be delegated to the current sender, or // occasionally one of the senders waiting on `sendLock`. - if let state = signal.state.swap(nil) { - // Writes to `interruptedState` are implicitly synchronized. So we do - // not need to guard it with locks. - // - // Specifically, senders serialized by `sendLock` can react to and - // clear `interruptedState` only if they see the write made below. - // The write can happen only once, since `state` being swapped with - // `nil` is a point of no return. - // - // Even in the case that both a previous sender and its successor see - // the write (the `interruptedState` check before & after the unlock - // of `sendLock`), the senders are still bound to the `sendLock`. - // So whichever sender loses the battle of acquring `sendLock` is - // guaranteed to be blocked. - interruptedState = state - - if sendLock.try() { - if !terminated, let state = interruptedState { - interrupt(state.observers) + signal.updateLock.lock() + + if case let .alive(state) = signal.state { + let newSnapshot = TerminatingState(observers: state.observers, + event: event) + signal.state = .terminating(newSnapshot) + signal.updateLock.unlock() + + if signal.sendLock.try() { + // Check whether the terminating state has been handled by a + // concurrent sender. If not, handle it. + let shouldDispose = tryTerminate() + signal.sendLock.unlock() + + if shouldDispose { + signal.swapDisposable()?.dispose() } - sendLock.unlock() - signal.generatorDisposable?.dispose() } + } else { + signal.updateLock.unlock() } } else { - let isTerminating = event.isTerminating + var shouldDispose = false - if let observers = (isTerminating ? signal.state.swap(nil)?.observers : signal.state.value?.observers) { - var shouldDispose = false - - sendLock.lock() + // The `terminating` status check is performed twice for two different + // purposes: + // + // 1. Within the main protected section + // It guarantees that a recursive termination event sent by a + // downstream consumer, is immediately processed and need not compete + // with concurrent pending senders (if any). + // + // Termination events sent concurrently may also be caught here, but + // not necessarily all of them due to data races. + // + // 2. After the main protected section + // It ensures the termination event sent concurrently that are not + // caught by (1) due to data races would still be processed. + // + // The related PR on the race conditions: + // https://github.com/ReactiveCocoa/ReactiveSwift/pull/112 - if !terminated { - for observer in observers { - observer.action(event) - } + signal.sendLock.lock() + // Start of the main protected section. - // Check if a downstream consumer or a concurrent sender has - // interrupted the signal. - if !isTerminating, let state = interruptedState { - interrupt(state.observers) - shouldDispose = true - } + if case let .alive(state) = signal.state { + for observer in state.observers { + observer.action(event) + } - if isTerminating { - terminated = true - shouldDispose = true - } + // Check if the status has been bumped to `terminating` due to a + // concurrent or a recursive termination event. + if case .terminating = signal.state { + shouldDispose = tryTerminate() } + } - sendLock.unlock() - - // Based on the implicit memory order, any updates to the - // `interruptedState` should always be visible after `sendLock` is - // released. So we check it again and handle the interruption if - // it has not been taken over. - if !shouldDispose && !terminated && !isTerminating, let state = interruptedState { - sendLock.lock() - - // `terminated` before acquring the lock could be a false negative, - // since it might race against other concurrent senders until the - // lock acquisition above succeeds. So we have to check again if the - // signal is really still alive. - if !terminated { - interrupt(state.observers) - shouldDispose = true - } + // End of the main protected section. + signal.sendLock.unlock() - sendLock.unlock() - } + // Check if the status has been bumped to `terminating` due to a + // concurrent termination event that has not been caught in the main + // protected section. + if !shouldDispose, case .terminating = signal.state { + signal.sendLock.lock() + shouldDispose = tryTerminate() + signal.sendLock.unlock() + } - if shouldDispose { - // Dispose only after notifying observers, so disposal - // logic is consistently the last thing to run. - signal.generatorDisposable?.dispose() - } + if shouldDispose { + // Dispose only after notifying observers, so disposal + // logic is consistently the last thing to run. + signal.swapDisposable()?.dispose() } } } @@ -158,12 +234,22 @@ public final class Signal { generatorDisposable = generator(observer) } - deinit { - if state.swap(nil) != nil { - // As the signal can deinitialize only when it has no observers attached, - // only the generator disposable has to be disposed of at this point. - generatorDisposable?.dispose() + /// Swap the generator disposable with `nil`. + /// + /// - returns: + /// The generator disposable, or `nil` if it has been disposed of. + private func swapDisposable() -> Disposable? { + if let d = generatorDisposable { + generatorDisposable = nil + return d } + return nil + } + + deinit { + // A signal can deinitialize only when it is not retained and has no + // active observers. So `state` need not be swapped. + swapDisposable()?.dispose() } /// A Signal that never sends any events to its observers. @@ -214,20 +300,25 @@ public final class Signal { @discardableResult public func observe(_ observer: Observer) -> Disposable? { var token: RemovalToken? - state.modify { - $0?.retainedSignal = self - token = $0?.observers.insert(observer) + updateLock.lock() + if case let .alive(snapshot) = state { + var observers = snapshot.observers + token = observers.insert(observer) + state = .alive(AliveState(observers: observers, retaining: self)) } + updateLock.unlock() if let token = token { return ActionDisposable { [weak self] in - if let strongSelf = self { - strongSelf.state.modify { state in - state?.observers.remove(using: token) - if state?.observers.isEmpty ?? false { - state!.retainedSignal = nil - } + if let s = self { + s.updateLock.lock() + if case let .alive(snapshot) = s.state { + var observers = snapshot.observers + observers.remove(using: token) + s.state = .alive(AliveState(observers: observers, + retaining: observers.isEmpty ? nil : self)) } + s.updateLock.unlock() } } } else { @@ -237,9 +328,71 @@ public final class Signal { } } -private struct SignalState { - var observers: Bag.Observer> = Bag() - var retainedSignal: Signal? +/// The state of a `Signal`. +/// +/// `SignalState` is guaranteed to be laid out as a tagged pointer by the Swift +/// compiler in the support targets of the Swift 3.0.1 ABI. +/// +/// The Swift compiler has also an optimization for enums with payloads that are +/// all reference counted, and at most one no-payload case. +private enum SignalState { + /// The `Signal` is alive. + case alive(AliveState) + + /// The `Signal` has received a termination event, and is about to be + /// terminated. + case terminating(TerminatingState) + + /// The `Signal` has terminated. + case terminated +} + +// As the amount of state would definitely span over a cache line, +// `AliveState` and `TerminatingState` is set to be a reference type so +// that we can atomically update the reference instead. +// +// Note that in-place mutation should not be introduced to `AliveState` and +// `TerminatingState`. Copy the states and create a new instance. + +/// The state of a `Signal` that is alive. It contains a bag of observers and +/// an optional self-retaining reference. +private final class AliveState { + /// The observers of the `Signal`. + fileprivate let observers: Bag.Observer> + + /// A self-retaining reference. It is set when there are one or more active + /// observers. + fileprivate let retaining: Signal? + + /// Create an alive state. + /// + /// - parameters: + /// - observers: The latest bag of observers. + /// - retaining: The self-retaining reference of the `Signal`, if necessary. + init(observers: Bag.Observer> = Bag(), retaining: Signal? = nil) { + self.observers = observers + self.retaining = retaining + } +} + +/// The state of a terminating `Signal`. It contains a bag of observers and the +/// termination event. +private final class TerminatingState { + /// The observers of the `Signal`. + fileprivate let observers: Bag.Observer> + + /// The termination event. + fileprivate let event: Event + + /// Create a terminating state. + /// + /// - parameters: + /// - observers: The latest bag of observers. + /// - event: The termination event. + init(observers: Bag.Observer>, event: Event) { + self.observers = observers + self.event = event + } } public protocol SignalProtocol { diff --git a/Tests/ReactiveSwiftTests/PropertySpec.swift b/Tests/ReactiveSwiftTests/PropertySpec.swift index 33f0bd7ee..66b25b42e 100644 --- a/Tests/ReactiveSwiftTests/PropertySpec.swift +++ b/Tests/ReactiveSwiftTests/PropertySpec.swift @@ -7,7 +7,7 @@ // import Foundation - +import Dispatch import Result import Nimble import Quick @@ -265,6 +265,39 @@ class PropertySpec: QuickSpec { property = nil expect(isEnded) == true } + + it("should not deadlock") { + let queue: DispatchQueue + + if #available(macOS 10.10, *) { + queue = DispatchQueue.global(qos: .userInteractive) + } else { + queue = DispatchQueue.global(priority: .high) + } + + let group = DispatchGroup() + + DispatchQueue.concurrentPerform(iterations: 500) { _ in + let source = MutableProperty(1) + var target = Optional(MutableProperty(1)) + + let semaphore = DispatchSemaphore(value: 0) + + target! <~ source + + queue.async(group: group) { + semaphore.wait() + target = nil + } + + queue.async(group: group) { + semaphore.signal() + source.value = 2 + } + } + + group.wait() + } } describe("Property") {