From 6edbfb41ff57ae388faf46df513d7f9a8bcc0302 Mon Sep 17 00:00:00 2001 From: Anders Ha Date: Tue, 23 May 2017 15:24:18 +0800 Subject: [PATCH] Signal resource management with `Lifetime`. --- Sources/Deprecations+Removals.swift | 8 + Sources/Flatten.swift | 48 ++- Sources/FoundationExtensions.swift | 4 +- Sources/Lifetime.swift | 5 +- Sources/Property.swift | 4 +- Sources/Signal.swift | 304 ++++++++++-------- Sources/SignalProducer.swift | 8 +- .../SignalLifetimeSpec.swift | 34 +- Tests/ReactiveSwiftTests/SignalSpec.swift | 52 ++- 9 files changed, 255 insertions(+), 212 deletions(-) diff --git a/Sources/Deprecations+Removals.swift b/Sources/Deprecations+Removals.swift index a208b0815..0a4e04fef 100644 --- a/Sources/Deprecations+Removals.swift +++ b/Sources/Deprecations+Removals.swift @@ -3,6 +3,14 @@ import Dispatch import Result // MARK: Unavailable methods in ReactiveSwift 2.0. +extension Signal { + @available(*, unavailable, message:"Use the `Signal.init` that accepts a two-argument generator.") + public convenience init(_ generator: (Observer) -> Disposable?) { fatalError() } + + @available(*, unavailable, message:"Use the `Signal.pipe` variant that returns a `Lifetime` instead.") + public static func pipe(disposable: Disposable?) -> (output: Signal, input: Observer) { fatalError() } +} + extension Lifetime { @available(*, unavailable, renamed:"hasEnded") public var isDisposed: Bool { fatalError() } diff --git a/Sources/Flatten.swift b/Sources/Flatten.swift index b1e12962d..b7f28e63a 100644 --- a/Sources/Flatten.swift +++ b/Sources/Flatten.swift @@ -427,18 +427,13 @@ extension Signal where Value: SignalProducerProtocol, Error == Value.Error { fileprivate func concurrent(limit: UInt) -> Signal { precondition(limit > 0, "The concurrent limit must be greater than zero.") - return Signal { relayObserver in - let disposable = CompositeDisposable() - let relayDisposable = CompositeDisposable() - - disposable += relayDisposable - disposable += self.observeConcurrent(relayObserver, limit, relayDisposable) - - return disposable + return Signal { relayObserver, lifetime in + let disposable = self.observeConcurrent(relayObserver, limit, lifetime) + _ = (disposable?.dispose).map(lifetime.observeEnded) } } - fileprivate func observeConcurrent(_ observer: Signal.Observer, _ limit: UInt, _ disposable: CompositeDisposable) -> Disposable? { + fileprivate func observeConcurrent(_ observer: Signal.Observer, _ limit: UInt, _ lifetime: Lifetime) -> Disposable? { let state = Atomic(ConcurrentFlattenState(limit: limit)) func startNextIfNeeded() { @@ -447,7 +442,7 @@ extension Signal where Value: SignalProducerProtocol, Error == Value.Error { let deinitializer = ScopedDisposable(ActionDisposable(action: producerState.deinitialize)) producer.startWithSignal { signal, inner in - let handle = disposable.add(inner) + let handle = lifetime.observeEnded(inner.dispose) signal.observe { event in switch event { @@ -510,12 +505,10 @@ extension SignalProducer where Value: SignalProducerProtocol, Error == Value.Err precondition(limit > 0, "The concurrent limit must be greater than zero.") return SignalProducer { relayObserver, lifetime in - self.startWithSignal { signal, signalDisposable in - let disposables = CompositeDisposable() - lifetime.observeEnded(signalDisposable.dispose) - lifetime.observeEnded(disposables.dispose) + self.startWithSignal { signal, interruptHandle in + lifetime.observeEnded(interruptHandle.dispose) - _ = signal.observeConcurrent(relayObserver, limit, disposables) + _ = signal.observeConcurrent(relayObserver, limit, lifetime) } } } @@ -675,14 +668,12 @@ extension Signal where Value: SignalProducerProtocol, Error == Value.Error { /// - note: The returned signal completes when `signal` and the latest inner /// signal have both completed. fileprivate func switchToLatest() -> Signal { - return Signal { observer in - let composite = CompositeDisposable() + return Signal { observer, lifetime in let serial = SerialDisposable() + lifetime.observeEnded(serial.dispose) - composite += serial - composite += self.observeSwitchToLatest(observer, serial) - - return composite + let disposable = self.observeSwitchToLatest(observer, serial) + _ = (disposable?.dispose).map(lifetime.observeEnded) } } @@ -800,14 +791,12 @@ extension Signal where Value: SignalProducerProtocol, Error == Value.Error { /// /// The returned signal completes when `self` and the winning inner signal have both completed. fileprivate func race() -> Signal { - return Signal { observer in - let composite = CompositeDisposable() + return Signal { observer, lifetime in let relayDisposable = CompositeDisposable() + lifetime.observeEnded(relayDisposable.dispose) - composite += relayDisposable - composite += self.observeRace(observer, relayDisposable) - - return composite + let disposable = self.observeRace(observer, relayDisposable) + _ = (disposable?.dispose).map(lifetime.observeEnded) } } @@ -1192,8 +1181,9 @@ extension Signal { /// - transform: A closure that accepts emitted error and returns a signal /// producer with a different type of error. public func flatMapError(_ transform: @escaping (Error) -> SignalProducer) -> Signal { - return Signal { observer in - self.observeFlatMapError(transform, observer, SerialDisposable()) + return Signal { observer, lifetime in + let disposable = self.observeFlatMapError(transform, observer, SerialDisposable()) + _ = (disposable?.dispose).map(lifetime.observeEnded) } } diff --git a/Sources/FoundationExtensions.swift b/Sources/FoundationExtensions.swift index 7781a5cd4..a88c37682 100644 --- a/Sources/FoundationExtensions.swift +++ b/Sources/FoundationExtensions.swift @@ -30,12 +30,12 @@ extension Reactive where Base: NotificationCenter { /// - note: The signal does not terminate naturally. Observers must be /// explicitly disposed to avoid leaks. public func notifications(forName name: Notification.Name?, object: AnyObject? = nil) -> Signal { - return Signal { [base = self.base] observer in + return Signal { [base = self.base] observer, lifetime in let notificationObserver = base.addObserver(forName: name, object: object, queue: nil) { notification in observer.send(value: notification) } - return ActionDisposable { + lifetime.observeEnded { base.removeObserver(notificationObserver) } } diff --git a/Sources/Lifetime.swift b/Sources/Lifetime.swift index 9d0635610..b1856cf3d 100644 --- a/Sources/Lifetime.swift +++ b/Sources/Lifetime.swift @@ -11,8 +11,9 @@ public final class Lifetime { /// - note: Consider using `Lifetime.observeEnded` if only a closure observer /// is to be attached. public var ended: Signal { - return Signal { observer in - return disposables += observer.sendCompleted + return Signal { observer, lifetime in + let disposable = (disposables += observer.sendCompleted) + _ = (disposable?.dispose).map(lifetime.observeEnded) } } diff --git a/Sources/Property.swift b/Sources/Property.swift index 4aff988ae..29e074ff7 100644 --- a/Sources/Property.swift +++ b/Sources/Property.swift @@ -527,7 +527,9 @@ public final class Property: PropertyProtocol { unsafeProducer.startWithSignal { upstream, interruptHandle in // A composed property tracks its active consumers through its relay signal, and // interrupts `unsafeProducer` if the relay signal terminates. - let (signal, _observer) = Signal.pipe(disposable: interruptHandle) + let (signal, _observer, lifetime) = Signal.pipe() + lifetime.observeEnded(interruptHandle.dispose) + let observer = transform?(_observer) ?? _observer relay = signal diff --git a/Sources/Signal.swift b/Sources/Signal.swift index 86904ea5d..24e1a90ee 100644 --- a/Sources/Signal.swift +++ b/Sources/Signal.swift @@ -48,7 +48,7 @@ public final class Signal { } /// The disposable associated with the signal. - private let disposable: SerialDisposable + private let disposable: CompositeDisposable /// The state of the signal. /// @@ -106,17 +106,17 @@ public final class Signal { /// Used to indicate if the `Signal` has deinitialized. private var hasDeinitialized: Bool - fileprivate init(_ generator: (Observer) -> Disposable?) { + fileprivate init(_ generator: (Observer, Lifetime) -> Void) { state = .alive(AliveState()) updateLock = Lock.make() sendLock = Lock.make() hasDeinitialized = false - disposable = SerialDisposable() + disposable = CompositeDisposable() // The generator observer retains the `Signal` core. - disposable.inner = generator(Observer(self.send)) + generator(Observer(self.send), Lifetime(disposable)) } private func send(_ event: Event) { @@ -394,7 +394,7 @@ public final class Signal { /// - parameters: /// - generator: A closure that accepts an implicitly created observer /// that will act as an event emitter for the signal. - public init(_ generator: (Observer) -> Disposable?) { + public init(_ generator: (Observer, Lifetime) -> Void) { core = Core(generator) } @@ -479,14 +479,13 @@ public final class Signal { extension Signal { /// A Signal that never sends any events to its observers. public static var never: Signal { - return self.init { _ in nil } + return self.init { _ in } } /// A Signal that completes immediately without emitting any value. public static var empty: Signal { - return self.init { observer in + return self.init { observer, _ in observer.sendCompleted() - return nil } } @@ -501,16 +500,36 @@ extension Signal { /// - disposable: An optional disposable to associate with the signal, and /// to be disposed of when the signal terminates. /// - /// - returns: A tuple of `output: Signal`, the output end of the pipe, - /// and `input: Observer`, the input end of the pipe. - public static func pipe(disposable: Disposable? = nil) -> (output: Signal, input: Observer) { + /// - returns: A 2-tuple of the output end of the pipe as `Signal` and the input end + /// of the pipe as `Signal.Observer`. + public static func pipe() -> (output: Signal, input: Observer) { + let (o, i, _) = pipe() + return (o, i) + } + + /// Create a `Signal` that will be controlled by sending events to an + /// input observer. + /// + /// - note: The `Signal` will remain alive until a terminating event is sent + /// to the input observer, or until it has no observers and there + /// are no strong references to it. + /// + /// - parameters: + /// - disposable: An optional disposable to associate with the signal, and + /// to be disposed of when the signal terminates. + /// + /// - returns: A 3-tuple of the output end of the pipe as `Signal`, the input end of + /// the pipe as `Signal.Observer`, and the lifetime of the pipe. + public static func pipe() -> (output: Signal, input: Observer, lifetime: Lifetime) { var observer: Observer! - let signal = self.init { innerObserver in + var lifetime: Lifetime! + + let signal = self.init { innerObserver, innerLifetime in observer = innerObserver - return disposable + lifetime = innerLifetime } - return (signal, observer) + return (signal, observer, lifetime) } } @@ -630,10 +649,12 @@ extension Signal { /// /// - returns: A signal that will send new values. public func map(_ transform: @escaping (Value) -> U) -> Signal { - return Signal { observer in - return self.observe { event in + return Signal { observer, lifetime in + let disposable = self.observe { event in observer.action(event.map(transform)) } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } @@ -645,10 +666,12 @@ extension Signal { /// /// - returns: A signal that will send new type of errors. public func mapError(_ transform: @escaping (Error) -> F) -> Signal { - return Signal { observer in - return self.observe { event in + return Signal { observer, lifetime in + let disposable = self.observe { event in observer.action(event.mapError(transform)) } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } @@ -682,8 +705,8 @@ extension Signal { /// /// - returns: A signal that forwards the values passing the given closure. public func filter(_ isIncluded: @escaping (Value) -> Bool) -> Signal { - return Signal { observer in - return self.observe { (event: Event) -> Void in + return Signal { observer, lifetime in + let disposable = self.observe { (event: Event) -> Void in guard let value = event.value else { observer.action(event) return @@ -693,6 +716,8 @@ extension Signal { observer.send(value: value) } } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } @@ -703,8 +728,8 @@ extension Signal { /// /// - returns: A signal that will send new values, that are non `nil` after the transformation. public func filterMap(_ transform: @escaping (Value) -> U?) -> Signal { - return Signal { observer in - return self.observe { (event: Event) -> Void in + return Signal { observer, lifetime in + let disposable = self.observe { (event: Event) -> Void in switch event { case let .value(value): if let mapped = transform(value) { @@ -718,6 +743,8 @@ extension Signal { observer.sendInterrupted() } } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } } @@ -744,15 +771,15 @@ extension Signal { public func take(first count: Int) -> Signal { precondition(count >= 0) - return Signal { observer in + return Signal { observer, lifetime in if count == 0 { observer.sendCompleted() - return nil + return } var taken = 0 - return self.observe { event in + let disposable = self.observe { event in guard let value = event.value else { observer.action(event) return @@ -767,6 +794,8 @@ extension Signal { observer.sendCompleted() } } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } } @@ -871,10 +900,10 @@ extension Signal { /// - returns: A signal of arrays of values, as instructed by the `shouldEmit` /// closure. public func collect(_ shouldEmit: @escaping (_ collectedValues: [Value]) -> Bool) -> Signal<[Value], Error> { - return Signal<[Value], Error> { observer in + return Signal<[Value], Error> { observer, lifetime in let state = CollectState() - return self.observe { event in + let disposable = self.observe { event in switch event { case let .value(value): state.append(value) @@ -893,6 +922,8 @@ extension Signal { observer.sendInterrupted() } } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } @@ -934,10 +965,10 @@ extension Signal { /// - returns: A signal of arrays of values, as instructed by the `shouldEmit` /// closure. public func collect(_ shouldEmit: @escaping (_ collected: [Value], _ latest: Value) -> Bool) -> Signal<[Value], Error> { - return Signal<[Value], Error> { observer in + return Signal<[Value], Error> { observer, lifetime in let state = CollectState() - return self.observe { event in + let disposable = self.observe { event in switch event { case let .value(value): if shouldEmit(state.values, value) { @@ -956,6 +987,8 @@ extension Signal { observer.sendInterrupted() } } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } @@ -967,12 +1000,14 @@ extension Signal { /// /// - returns: A signal that will yield `self` values on provided scheduler. public func observe(on scheduler: Scheduler) -> Signal { - return Signal { observer in - return self.observe { event in + return Signal { observer, lifetime in + let disposable = self.observe { event in scheduler.schedule { observer.action(event) } } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } } @@ -1016,8 +1051,8 @@ extension Signal { public func delay(_ interval: TimeInterval, on scheduler: DateScheduler) -> Signal { precondition(interval >= 0) - return Signal { observer in - return self.observe { event in + return Signal { observer, lifetime in + let disposable = self.observe { event in switch event { case .failed, .interrupted: scheduler.schedule { @@ -1031,6 +1066,8 @@ extension Signal { } } } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } @@ -1050,16 +1087,18 @@ extension Signal { return signal } - return Signal { observer in + return Signal { observer, lifetime in var skipped = 0 - return self.observe { event in + let disposable = self.observe { event in if case .value = event, skipped < count { skipped += 1 } else { observer.action(event) } } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } @@ -1075,8 +1114,8 @@ extension Signal { /// /// - returns: A signal that sends events as its values. public func materialize() -> Signal { - return Signal { observer in - return self.observe { event in + return Signal { observer, lifetime in + let disposable = self.observe { event in observer.send(value: event) switch event { @@ -1090,6 +1129,8 @@ extension Signal { break } } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } } @@ -1100,8 +1141,8 @@ extension Signal where Value: EventProtocol, Error == NoError { /// /// - returns: A signal that sends values carried by `self` events. public func dematerialize() -> Signal { - return Signal { observer in - return self.observe { event in + return Signal { observer, lifetime in + let disposable = self.observe { event in switch event { case let .value(innerEvent): observer.action(innerEvent.event) @@ -1116,6 +1157,8 @@ extension Signal where Value: EventProtocol, Error == NoError { observer.sendInterrupted() } } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } } @@ -1144,12 +1187,10 @@ extension Signal { disposed: (() -> Void)? = nil, value: ((Value) -> Void)? = nil ) -> Signal { - return Signal { observer in - let disposable = CompositeDisposable() - - _ = disposed.map(disposable.add) + return Signal { observer, lifetime in + _ = disposed.map(lifetime.observeEnded) - disposable += signal.observe { receivedEvent in + let disposable = signal.observe { receivedEvent in event?(receivedEvent) switch receivedEvent { @@ -1173,7 +1214,7 @@ extension Signal { observer.action(receivedEvent) } - return disposable + _ = (disposable?.dispose).map(lifetime.observeEnded) } } } @@ -1200,11 +1241,10 @@ extension Signal { /// once both input signals have completed, or interrupt if /// either input signal is interrupted. public func sample(with sampler: Signal) -> Signal<(Value, T), Error> { - return Signal<(Value, T), Error> { observer in + return Signal<(Value, T), Error> { observer, lifetime in let state = Atomic(SampleState()) - let disposable = CompositeDisposable() - disposable += self.observe { event in + let disposable = self.observe { event in switch event { case let .value(value): state.modify { @@ -1229,7 +1269,7 @@ extension Signal { } } - disposable += sampler.observe { event in + let samplerDisposable = sampler.observe { event in switch event { case .value(let samplerValue): if let value = state.value.latestValue { @@ -1254,7 +1294,8 @@ extension Signal { } } - return disposable + _ = (disposable?.dispose).map(lifetime.observeEnded) + _ = (samplerDisposable?.dispose).map(lifetime.observeEnded) } } @@ -1293,15 +1334,14 @@ extension Signal { /// once `self` has terminated. **`samplee`'s terminated events /// are ignored**. public func withLatest(from samplee: Signal) -> Signal<(Value, U), Error> { - return Signal<(Value, U), Error> { observer in + return Signal<(Value, U), Error> { observer, lifetime in let state = Atomic(nil) - let disposable = CompositeDisposable() - disposable += samplee.observeValues { value in + let sampleeDisposable = samplee.observeValues { value in state.value = value } - disposable += self.observe { event in + let disposable = self.observe { event in switch event { case let .value(value): if let value2 = state.value { @@ -1316,7 +1356,8 @@ extension Signal { } } - return disposable + _ = (disposable?.dispose).map(lifetime.observeEnded) + _ = (sampleeDisposable?.dispose).map(lifetime.observeEnded) } } @@ -1336,13 +1377,13 @@ extension Signal { /// once `self` has terminated. **`samplee`'s terminated events /// are ignored**. public func withLatest(from samplee: SignalProducer) -> Signal<(Value, U), Error> { - return Signal<(Value, U), Error> { observer in - let d = CompositeDisposable() + return Signal<(Value, U), Error> { observer, lifetime in samplee.startWithSignal { signal, disposable in - d += disposable - d += self.withLatest(from: signal).observe(observer) + lifetime.observeEnded(disposable.dispose) + + let disposable = self.withLatest(from: signal).observe(observer) + _ = (disposable?.dispose).map(lifetime.observeEnded) } - return d } } } @@ -1357,11 +1398,12 @@ extension Signal { /// /// - returns: A signal that will deliver events until `lifetime` ends. public func take(during lifetime: Lifetime) -> Signal { - return Signal { observer in - let disposable = CompositeDisposable() - disposable += self.observe(observer) - disposable += lifetime.observeEnded(observer.sendCompleted) - return disposable + return Signal { observer, innerLifetime in + let disposable = self.observe(observer) + let lifetimeDisposable = lifetime.observeEnded(observer.sendCompleted) + + _ = (disposable?.dispose).map(innerLifetime.observeEnded) + _ = (lifetimeDisposable?.dispose).map(innerLifetime.observeEnded) } } @@ -1375,11 +1417,10 @@ extension Signal { /// - returns: A signal that will deliver events until `trigger` sends /// `value` or `completed` events. public func take(until trigger: Signal<(), NoError>) -> Signal { - return Signal { observer in - let disposable = CompositeDisposable() - disposable += self.observe(observer) + return Signal { observer, lifetime in + let disposable = self.observe(observer) - disposable += trigger.observe { event in + let triggerDisposable = trigger.observe { event in switch event { case .value, .completed: observer.sendCompleted() @@ -1389,7 +1430,8 @@ extension Signal { } } - return disposable + _ = (disposable?.dispose).map(lifetime.observeEnded) + _ = (triggerDisposable?.dispose).map(lifetime.observeEnded) } } @@ -1404,8 +1446,9 @@ extension Signal { /// - returns: A signal that will deliver events once the `trigger` sends /// `value` or `completed` events. public func skip(until trigger: Signal<(), NoError>) -> Signal { - return Signal { observer in + return Signal { observer, lifetime in let disposable = SerialDisposable() + lifetime.observeEnded(disposable.dispose) disposable.inner = trigger.observe { event in switch event { @@ -1416,8 +1459,6 @@ extension Signal { break } } - - return disposable } } @@ -1525,15 +1566,17 @@ extension Signal { /// - returns: A signal that sends the partial results of the accumuation, and the /// final result as `self` completes. public func scan(into initialResult: U, _ nextPartialResult: @escaping (inout U, Value) -> Void) -> Signal { - return Signal { observer in + return Signal { observer, lifetime in var accumulator = initialResult - return self.observe { event in + let disposable = self.observe { event in observer.action(event.map { value in nextPartialResult(&accumulator, value) return accumulator }) } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } } @@ -1585,10 +1628,10 @@ extension Signal { /// /// - returns: A signal which conditionally forwards values from `self`. public func skip(while shouldContinue: @escaping (Value) -> Bool) -> Signal { - return Signal { observer in + return Signal { observer, lifetime in var isSkipping = true - return self.observe { event in + let disposable = self.observe { event in switch event { case let .value(value): isSkipping = isSkipping && shouldContinue(value) @@ -1600,6 +1643,8 @@ extension Signal { observer.action(event) } } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } @@ -1616,9 +1661,7 @@ extension Signal { /// instead, regardless of whether `self` has sent events /// already. public func take(untilReplacement signal: Signal) -> Signal { - return Signal { observer in - let disposable = CompositeDisposable() - + return Signal { observer, lifetime in let signalDisposable = self.observe { event in switch event { case .completed: @@ -1629,13 +1672,13 @@ extension Signal { } } - disposable += signalDisposable - disposable += signal.observe { event in + let replacementDisposable = signal.observe { event in signalDisposable?.dispose() observer.action(event) } - return disposable + _ = (signalDisposable?.dispose).map(lifetime.observeEnded) + _ = (replacementDisposable?.dispose).map(lifetime.observeEnded) } } @@ -1648,11 +1691,11 @@ extension Signal { /// - returns: A signal that receives up to `count` values from `self` /// after `self` completes. public func take(last count: Int) -> Signal { - return Signal { observer in + return Signal { observer, lifetime in var buffer: [Value] = [] buffer.reserveCapacity(count) - return self.observe { event in + let disposable = self.observe { event in switch event { case let .value(value): // To avoid exceeding the reserved capacity of the buffer, @@ -1673,6 +1716,8 @@ extension Signal { observer.sendInterrupted() } } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } @@ -1685,14 +1730,16 @@ extension Signal { /// /// - returns: A signal which conditionally forwards values from `self`. public func take(while shouldContinue: @escaping (Value) -> Bool) -> Signal { - return Signal { observer in - return self.observe { event in + return Signal { observer, lifetime in + let disposable = self.observe { event in if let value = event.value, !shouldContinue(value) { observer.sendCompleted() } else { observer.action(event) } } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } } @@ -1741,14 +1788,12 @@ extension Signal { public func throttle(_ interval: TimeInterval, on scheduler: DateScheduler) -> Signal { precondition(interval >= 0) - return Signal { observer in + return Signal { observer, lifetime in let state: Atomic> = Atomic(ThrottleState()) let schedulerDisposable = SerialDisposable() + lifetime.observeEnded(schedulerDisposable.dispose) - let disposable = CompositeDisposable() - disposable += schedulerDisposable - - disposable += self.observe { event in + let disposable = self.observe { event in guard let value = event.value else { schedulerDisposable.inner = scheduler.schedule { observer.action(event) @@ -1794,7 +1839,7 @@ extension Signal { } } - return disposable + _ = (disposable?.dispose).map(lifetime.observeEnded) } } @@ -1823,15 +1868,13 @@ extension Signal { public func throttle(while shouldThrottle: P, on scheduler: Scheduler) -> Signal where P.Value == Bool { - return Signal { observer in + return Signal { observer, lifetime in let initial: ThrottleWhileState = .resumed let state = Atomic(initial) let schedulerDisposable = SerialDisposable() + lifetime.observeEnded(schedulerDisposable.dispose) - let disposable = CompositeDisposable() - disposable += schedulerDisposable - - disposable += shouldThrottle.producer + let propertyDisposable = shouldThrottle.producer .skipRepeats() .startWithValues { shouldThrottle in let valueToSend = state.modify { state -> Value? in @@ -1857,7 +1900,7 @@ extension Signal { } } - disposable += self.observe { event in + let disposable = self.observe { event in let eventToSend = state.modify { state -> Event? in switch event { case let .value(value): @@ -1884,7 +1927,8 @@ extension Signal { } } - return disposable + lifetime.observeEnded(propertyDisposable.dispose) + _ = (disposable?.dispose).map(lifetime.observeEnded) } } @@ -1919,8 +1963,8 @@ extension Signal { let d = SerialDisposable() - return Signal { observer in - return self.observe { event in + return Signal { observer, lifetime in + let disposable = self.observe { event in switch event { case let .value(value): let date = scheduler.currentDate.addingTimeInterval(interval) @@ -1934,6 +1978,8 @@ extension Signal { } } } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } } @@ -1951,23 +1997,24 @@ extension Signal { /// /// - returns: A signal that sends unique values during its lifetime. public func uniqueValues(_ transform: @escaping (Value) -> Identity) -> Signal { - return Signal { observer in + return Signal { observer, lifetime in var seenValues: Set = [] - return self - .observe { event in - switch event { - case let .value(value): - let identity = transform(value) - if !seenValues.contains(identity) { - seenValues.insert(identity) - fallthrough - } - - case .failed, .completed, .interrupted: - observer.action(event) + let disposable = self.observe { event in + switch event { + case let .value(value): + let identity = transform(value) + if !seenValues.contains(identity) { + seenValues.insert(identity) + fallthrough } + + case .failed, .completed, .interrupted: + observer.action(event) } + } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } } @@ -2171,7 +2218,7 @@ extension Signal { } private convenience init(_ builder: AggregateBuilder, _ transform: @escaping (ContiguousArray) -> Value) { - self.init { observer in + self.init { observer, lifetime in let disposables = CompositeDisposable() let strategy = Atomic(Strategy(count: builder.startHandlers.count) { observer.send(value: transform($0)) }) @@ -2179,7 +2226,7 @@ extension Signal { disposables += action(index, strategy) { observer.action($0.map { _ in fatalError() }) } } - return ActionDisposable { + lifetime.observeEnded { strategy.modify { _ in disposables.dispose() } @@ -2379,16 +2426,17 @@ extension Signal { public func timeout(after interval: TimeInterval, raising error: Error, on scheduler: DateScheduler) -> Signal { precondition(interval >= 0) - return Signal { observer in - let disposable = CompositeDisposable() + return Signal { observer, lifetime in let date = scheduler.currentDate.addingTimeInterval(interval) - disposable += scheduler.schedule(after: date) { + let timeoutDisposable = scheduler.schedule(after: date) { observer.send(error: error) } - disposable += self.observe(observer) - return disposable + let disposable = self.observe(observer) + + _ = (disposable?.dispose).map(lifetime.observeEnded) + _ = (timeoutDisposable?.dispose).map(lifetime.observeEnded) } } } @@ -2406,8 +2454,8 @@ extension Signal where Error == NoError { /// /// - returns: A signal that has an instantiatable `ErrorType`. public func promoteErrors(_: F.Type) -> Signal { - return Signal { observer in - return self.observe { event in + return Signal { observer, lifetime in + let disposable = self.observe { event in switch event { case let .value(value): observer.send(value: value) @@ -2419,6 +2467,8 @@ extension Signal where Error == NoError { observer.sendInterrupted() } } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } @@ -2508,8 +2558,8 @@ extension Signal { /// /// - returns: A signal which forwards the transformed values. public func attemptMap(_ transform: @escaping (Value) -> Result) -> Signal { - return Signal { observer in - self.observe { event in + return Signal { observer, lifetime in + let disposable = self.observe { event in switch event { case let .value(value): transform(value).analysis( @@ -2524,6 +2574,8 @@ extension Signal { observer.sendInterrupted() } } + + _ = (disposable?.dispose).map(lifetime.observeEnded) } } } diff --git a/Sources/SignalProducer.swift b/Sources/SignalProducer.swift index ebf9187eb..8b7458f95 100644 --- a/Sources/SignalProducer.swift +++ b/Sources/SignalProducer.swift @@ -185,11 +185,7 @@ public struct SignalProducer { /// `Signal` commences. Both the produced `Signal` and an interrupt handle /// of the signal would be passed to the closure. public func startWithSignal(_ setup: (_ signal: Signal, _ interruptHandle: Disposable) -> Void) { - // Disposes of the work associated with the SignalProducer and any - // upstream producers. - let producerDisposable = CompositeDisposable() - - let (signal, observer) = Signal.pipe(disposable: producerDisposable) + let (signal, observer, lifetime) = Signal.pipe() // Directly disposed of when `start()` or `startWithSignal()` is // disposed. @@ -201,7 +197,7 @@ public struct SignalProducer { return } - startHandler(observer, Lifetime(producerDisposable)) + startHandler(observer, lifetime) } } diff --git a/Tests/ReactiveSwiftTests/SignalLifetimeSpec.swift b/Tests/ReactiveSwiftTests/SignalLifetimeSpec.swift index fbfa3f03f..bd8ab1df3 100644 --- a/Tests/ReactiveSwiftTests/SignalLifetimeSpec.swift +++ b/Tests/ReactiveSwiftTests/SignalLifetimeSpec.swift @@ -26,7 +26,7 @@ class SignalLifetimeSpec: QuickSpec { var isDisposed = false weak var signal: Signal? = { - let signal: Signal = Signal { _ in nil } + let signal: Signal = .never return signal.on(disposed: { isDisposed = true }) }() expect(signal).to(beNil()) @@ -35,7 +35,7 @@ class SignalLifetimeSpec: QuickSpec { it("should be disposed of if no one retains it") { var isDisposed = false - var signal: Signal? = Signal { _ in nil }.on(disposed: { isDisposed = true }) + var signal: Signal? = Signal.never.on(disposed: { isDisposed = true }) weak var weakSignal = signal expect(weakSignal).toNot(beNil()) @@ -56,9 +56,9 @@ class SignalLifetimeSpec: QuickSpec { var isDisposed = false weak var signal: Signal? = { - let signal: Signal = Signal { innerObserver in + let signal: Signal = Signal { innerObserver, _ in observer = innerObserver - return nil + } return signal.on(disposed: { isDisposed = true }) }() @@ -72,7 +72,7 @@ class SignalLifetimeSpec: QuickSpec { var disposable: Disposable? = nil weak var signal: Signal? = { - let signal: Signal = Signal { _ in nil } + let signal: Signal = Signal.never disposable = signal.on(disposed: { isDisposed = true }).observe(Signal.Observer()) return signal }() @@ -89,11 +89,10 @@ class SignalLifetimeSpec: QuickSpec { var isDisposed = false weak var signal: Signal? = { - let signal = Signal { observer in + let signal = Signal { observer, _ in testScheduler.schedule { observer.send(error: TestError.default) } - return nil } signal.on(disposed: { isDisposed = true }).observeFailed { _ in errored = true } return signal @@ -115,11 +114,10 @@ class SignalLifetimeSpec: QuickSpec { var isDisposed = false weak var signal: Signal? = { - let signal = Signal { observer in + let signal = Signal { observer, _ in testScheduler.schedule { observer.sendCompleted() } - return nil } signal.on(disposed: { isDisposed = true }).observeCompleted { completed = true } return signal @@ -141,12 +139,10 @@ class SignalLifetimeSpec: QuickSpec { var isDisposed = false weak var signal: Signal? = { - let signal = Signal { observer in + let signal = Signal { observer, _ in testScheduler.schedule { observer.sendInterrupted() } - - return nil } signal.on(disposed: { isDisposed = true }).observeInterrupted { interrupted = true } return signal @@ -166,7 +162,10 @@ class SignalLifetimeSpec: QuickSpec { describe("Signal.pipe") { it("should deallocate") { - weak var signal = Signal<(), NoError>.pipe().0 + weak var signal: AnyObject? = { + let (signal, _) = Signal<(), NoError>.pipe() + return signal + }() expect(signal).to(beNil()) } @@ -265,7 +264,7 @@ class SignalLifetimeSpec: QuickSpec { describe("testTransform") { it("should be disposed of") { var isDisposed = false - weak var signal: Signal? = Signal { _ in nil } + weak var signal: Signal? = Signal.never .testTransform() .on(disposed: { isDisposed = true }) @@ -278,7 +277,7 @@ class SignalLifetimeSpec: QuickSpec { var isDisposed = false weak var signal: Signal? = { - let signal: Signal = Signal { _ in nil }.testTransform() + let signal: Signal = Signal.never.testTransform() disposable = signal.on(disposed: { isDisposed = true }).observe(Signal.Observer()) return signal }() @@ -434,8 +433,9 @@ class SignalLifetimeSpec: QuickSpec { private extension Signal { func testTransform() -> Signal { - return Signal { observer in - return self.observe(observer.action) + return Signal { observer, lifetime in + let disposable = self.observe(observer.action) + _ = (disposable?.dispose).map(lifetime.observeEnded) } } } diff --git a/Tests/ReactiveSwiftTests/SignalSpec.swift b/Tests/ReactiveSwiftTests/SignalSpec.swift index b7aac6ba6..4c5590b2c 100755 --- a/Tests/ReactiveSwiftTests/SignalSpec.swift +++ b/Tests/ReactiveSwiftTests/SignalSpec.swift @@ -44,9 +44,8 @@ class SignalSpec: QuickSpec { it("should run the generator immediately") { var didRunGenerator = false - _ = Signal { observer in + _ = Signal { observer, _ in didRunGenerator = true - return nil } expect(didRunGenerator) == true @@ -55,14 +54,13 @@ class SignalSpec: QuickSpec { it("should forward events to observers") { let numbers = [ 1, 2, 5 ] - let signal: Signal = Signal { observer in + let signal: Signal = Signal { observer, _ in testScheduler.schedule { for number in numbers { observer.send(value: number) } observer.sendCompleted() } - return nil } var fromSignal: [Int] = [] @@ -91,11 +89,11 @@ class SignalSpec: QuickSpec { it("should dispose of returned disposable upon error") { let disposable = SimpleDisposable() - let signal: Signal = Signal { observer in + let signal: Signal = Signal { observer, lifetime in testScheduler.schedule { observer.send(error: TestError.default) } - return disposable + lifetime.observeEnded(disposable.dispose) } var errored = false @@ -114,11 +112,11 @@ class SignalSpec: QuickSpec { it("should dispose of returned disposable upon completion") { let disposable = SimpleDisposable() - let signal: Signal = Signal { observer in + let signal: Signal = Signal { observer, lifetime in testScheduler.schedule { observer.sendCompleted() } - return disposable + lifetime.observeEnded(disposable.dispose) } var completed = false @@ -137,11 +135,11 @@ class SignalSpec: QuickSpec { it("should dispose of returned disposable upon interrupted") { let disposable = SimpleDisposable() - let signal: Signal = Signal { observer in + let signal: Signal = Signal { observer, lifetime in testScheduler.schedule { observer.sendInterrupted() } - return disposable + lifetime.observeEnded(disposable.dispose) } var interrupted = false @@ -161,10 +159,10 @@ class SignalSpec: QuickSpec { it("should dispose of the returned disposable if the signal has interrupted in the generator") { let disposable = SimpleDisposable() - let signal: Signal = Signal { observer in + let signal: Signal = Signal { observer, lifetime in observer.sendInterrupted() expect(disposable.isDisposed) == false - return disposable + lifetime.observeEnded(disposable.dispose) } expect(disposable.isDisposed) == true @@ -173,10 +171,10 @@ class SignalSpec: QuickSpec { it("should dispose of the returned disposable if the signal has completed in the generator") { let disposable = SimpleDisposable() - let signal: Signal = Signal { observer in + let signal: Signal = Signal { observer, lifetime in observer.sendCompleted() expect(disposable.isDisposed) == false - return disposable + lifetime.observeEnded(disposable.dispose) } expect(disposable.isDisposed) == true @@ -185,10 +183,10 @@ class SignalSpec: QuickSpec { it("should dispose of the returned disposable if the signal has failed in the generator") { let disposable = SimpleDisposable() - let signal: Signal = Signal { observer in + let signal: Signal = Signal { observer, lifetime in observer.send(error: .default) expect(disposable.isDisposed) == false - return disposable + lifetime.observeEnded(disposable.dispose) } expect(disposable.isDisposed) == true @@ -250,7 +248,8 @@ class SignalSpec: QuickSpec { it("should dispose the supplied disposable when the signal terminates") { let disposable = SimpleDisposable() - let (signal, observer) = Signal<(), NoError>.pipe(disposable: disposable) + let (signal, observer, lifetime) = Signal<(), NoError>.pipe() + lifetime.observeEnded(disposable.dispose) expect(disposable.isDisposed) == false @@ -385,7 +384,7 @@ class SignalSpec: QuickSpec { it("should stop forwarding events when disposed") { let disposable = SimpleDisposable() - let signal: Signal = Signal { observer in + let signal: Signal = Signal { observer, lifetime in testScheduler.schedule { for number in [ 1, 2 ] { observer.send(value: number) @@ -393,7 +392,7 @@ class SignalSpec: QuickSpec { observer.sendCompleted() observer.send(value: 4) } - return disposable + lifetime.observeEnded(disposable.dispose) } var fromSignal: [Int] = [] @@ -412,9 +411,8 @@ class SignalSpec: QuickSpec { it("should not trigger side effects") { var runCount = 0 - let signal: Signal<(), NoError> = Signal { observer in + let signal: Signal<(), NoError> = Signal { observer, _ in runCount += 1 - return nil } expect(runCount) == 1 @@ -1212,13 +1210,12 @@ class SignalSpec: QuickSpec { let numbers = [ 1, 2, 4, 4, 5 ] let testScheduler = TestScheduler() - var signal: Signal = Signal { observer in + var signal: Signal = Signal { observer, _ in testScheduler.schedule { for number in numbers { observer.send(value: number) } } - return nil } var completed = false @@ -1235,13 +1232,12 @@ class SignalSpec: QuickSpec { let numbers = [ 1, 2, 4, 4, 5 ] let testScheduler = TestScheduler() - let signal: Signal = Signal { observer in + let signal: Signal = Signal { observer, _ in testScheduler.schedule { for number in numbers { observer.send(value: number) } } - return nil } var result: [Int] = [] @@ -1606,7 +1602,7 @@ class SignalSpec: QuickSpec { describe("delay") { it("should send events on the given scheduler after the interval") { let testScheduler = TestScheduler() - let signal: Signal = Signal { observer in + let signal: Signal = Signal { observer, _ in testScheduler.schedule { observer.send(value: 1) } @@ -1614,7 +1610,6 @@ class SignalSpec: QuickSpec { observer.send(value: 2) observer.sendCompleted() } - return nil } var result: [Int] = [] @@ -1647,11 +1642,10 @@ class SignalSpec: QuickSpec { it("should schedule errors immediately") { let testScheduler = TestScheduler() - let signal: Signal = Signal { observer in + let signal: Signal = Signal { observer, _ in testScheduler.schedule { observer.send(error: TestError.default) } - return nil } var errored = false