Skip to content

Commit

Permalink
Signal resource management with Lifetime.
Browse files Browse the repository at this point in the history
  • Loading branch information
andersio committed May 23, 2017
1 parent 1fa0a89 commit 6edbfb4
Show file tree
Hide file tree
Showing 9 changed files with 255 additions and 212 deletions.
8 changes: 8 additions & 0 deletions Sources/Deprecations+Removals.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,14 @@ import Dispatch
import Result

// MARK: Unavailable methods in ReactiveSwift 2.0.
extension Signal {
@available(*, unavailable, message:"Use the `Signal.init` that accepts a two-argument generator.")
public convenience init(_ generator: (Observer) -> Disposable?) { fatalError() }

@available(*, unavailable, message:"Use the `Signal.pipe` variant that returns a `Lifetime` instead.")
public static func pipe(disposable: Disposable?) -> (output: Signal, input: Observer) { fatalError() }
}

extension Lifetime {
@available(*, unavailable, renamed:"hasEnded")
public var isDisposed: Bool { fatalError() }
Expand Down
48 changes: 19 additions & 29 deletions Sources/Flatten.swift
Original file line number Diff line number Diff line change
Expand Up @@ -427,18 +427,13 @@ extension Signal where Value: SignalProducerProtocol, Error == Value.Error {
fileprivate func concurrent(limit: UInt) -> Signal<Value.Value, Error> {
precondition(limit > 0, "The concurrent limit must be greater than zero.")

return Signal<Value.Value, Error> { relayObserver in
let disposable = CompositeDisposable()
let relayDisposable = CompositeDisposable()

disposable += relayDisposable
disposable += self.observeConcurrent(relayObserver, limit, relayDisposable)

return disposable
return Signal<Value.Value, Error> { relayObserver, lifetime in
let disposable = self.observeConcurrent(relayObserver, limit, lifetime)
_ = (disposable?.dispose).map(lifetime.observeEnded)
}
}

fileprivate func observeConcurrent(_ observer: Signal<Value.Value, Error>.Observer, _ limit: UInt, _ disposable: CompositeDisposable) -> Disposable? {
fileprivate func observeConcurrent(_ observer: Signal<Value.Value, Error>.Observer, _ limit: UInt, _ lifetime: Lifetime) -> Disposable? {
let state = Atomic(ConcurrentFlattenState<Value.Value, Error>(limit: limit))

func startNextIfNeeded() {
Expand All @@ -447,7 +442,7 @@ extension Signal where Value: SignalProducerProtocol, Error == Value.Error {
let deinitializer = ScopedDisposable(ActionDisposable(action: producerState.deinitialize))

producer.startWithSignal { signal, inner in
let handle = disposable.add(inner)
let handle = lifetime.observeEnded(inner.dispose)

signal.observe { event in
switch event {
Expand Down Expand Up @@ -510,12 +505,10 @@ extension SignalProducer where Value: SignalProducerProtocol, Error == Value.Err
precondition(limit > 0, "The concurrent limit must be greater than zero.")

return SignalProducer<Value.Value, Error> { relayObserver, lifetime in
self.startWithSignal { signal, signalDisposable in
let disposables = CompositeDisposable()
lifetime.observeEnded(signalDisposable.dispose)
lifetime.observeEnded(disposables.dispose)
self.startWithSignal { signal, interruptHandle in
lifetime.observeEnded(interruptHandle.dispose)

_ = signal.observeConcurrent(relayObserver, limit, disposables)
_ = signal.observeConcurrent(relayObserver, limit, lifetime)
}
}
}
Expand Down Expand Up @@ -675,14 +668,12 @@ extension Signal where Value: SignalProducerProtocol, Error == Value.Error {
/// - note: The returned signal completes when `signal` and the latest inner
/// signal have both completed.
fileprivate func switchToLatest() -> Signal<Value.Value, Error> {
return Signal<Value.Value, Error> { observer in
let composite = CompositeDisposable()
return Signal<Value.Value, Error> { observer, lifetime in
let serial = SerialDisposable()
lifetime.observeEnded(serial.dispose)

composite += serial
composite += self.observeSwitchToLatest(observer, serial)

return composite
let disposable = self.observeSwitchToLatest(observer, serial)
_ = (disposable?.dispose).map(lifetime.observeEnded)
}
}

Expand Down Expand Up @@ -800,14 +791,12 @@ extension Signal where Value: SignalProducerProtocol, Error == Value.Error {
///
/// The returned signal completes when `self` and the winning inner signal have both completed.
fileprivate func race() -> Signal<Value.Value, Error> {
return Signal<Value.Value, Error> { observer in
let composite = CompositeDisposable()
return Signal<Value.Value, Error> { observer, lifetime in
let relayDisposable = CompositeDisposable()
lifetime.observeEnded(relayDisposable.dispose)

composite += relayDisposable
composite += self.observeRace(observer, relayDisposable)

return composite
let disposable = self.observeRace(observer, relayDisposable)
_ = (disposable?.dispose).map(lifetime.observeEnded)
}
}

Expand Down Expand Up @@ -1192,8 +1181,9 @@ extension Signal {
/// - transform: A closure that accepts emitted error and returns a signal
/// producer with a different type of error.
public func flatMapError<F>(_ transform: @escaping (Error) -> SignalProducer<Value, F>) -> Signal<Value, F> {
return Signal<Value, F> { observer in
self.observeFlatMapError(transform, observer, SerialDisposable())
return Signal<Value, F> { observer, lifetime in
let disposable = self.observeFlatMapError(transform, observer, SerialDisposable())
_ = (disposable?.dispose).map(lifetime.observeEnded)
}
}

Expand Down
4 changes: 2 additions & 2 deletions Sources/FoundationExtensions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,12 @@ extension Reactive where Base: NotificationCenter {
/// - note: The signal does not terminate naturally. Observers must be
/// explicitly disposed to avoid leaks.
public func notifications(forName name: Notification.Name?, object: AnyObject? = nil) -> Signal<Notification, NoError> {
return Signal { [base = self.base] observer in
return Signal { [base = self.base] observer, lifetime in
let notificationObserver = base.addObserver(forName: name, object: object, queue: nil) { notification in
observer.send(value: notification)
}

return ActionDisposable {
lifetime.observeEnded {
base.removeObserver(notificationObserver)
}
}
Expand Down
5 changes: 3 additions & 2 deletions Sources/Lifetime.swift
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ public final class Lifetime {
/// - note: Consider using `Lifetime.observeEnded` if only a closure observer
/// is to be attached.
public var ended: Signal<Never, NoError> {
return Signal { observer in
return disposables += observer.sendCompleted
return Signal { observer, lifetime in
let disposable = (disposables += observer.sendCompleted)
_ = (disposable?.dispose).map(lifetime.observeEnded)
}
}

Expand Down
4 changes: 3 additions & 1 deletion Sources/Property.swift
Original file line number Diff line number Diff line change
Expand Up @@ -527,7 +527,9 @@ public final class Property<Value>: PropertyProtocol {
unsafeProducer.startWithSignal { upstream, interruptHandle in
// A composed property tracks its active consumers through its relay signal, and
// interrupts `unsafeProducer` if the relay signal terminates.
let (signal, _observer) = Signal<Value, NoError>.pipe(disposable: interruptHandle)
let (signal, _observer, lifetime) = Signal<Value, NoError>.pipe()
lifetime.observeEnded(interruptHandle.dispose)

let observer = transform?(_observer) ?? _observer
relay = signal

Expand Down
Loading

0 comments on commit 6edbfb4

Please sign in to comment.