Skip to content

Commit

Permalink
Merge pull request #334 from ReactiveCocoa/producer-disposable
Browse files Browse the repository at this point in the history
`Lifetime`-based producer resource management
  • Loading branch information
mdiep authored May 22, 2017
2 parents 5d191be + 8fab9ad commit ffb2b47
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 216 deletions.
6 changes: 3 additions & 3 deletions Sources/Action.swift
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public final class Action<Input, Output, Error: Swift.Error> {
/// - returns: A producer that forwards events generated by its started unit of work,
/// or emits `ActionError.disabled` if the execution attempt is failed.
public func apply(_ input: Input) -> SignalProducer<Output, ActionError<Error>> {
return SignalProducer { observer, disposable in
return SignalProducer { observer, lifetime in
let startingState = self.state.modify { state -> Any? in
if state.isEnabled {
state.isExecuting = true
Expand All @@ -179,15 +179,15 @@ public final class Action<Input, Output, Error: Swift.Error> {
}

self.executeClosure(state, input).startWithSignal { signal, signalDisposable in
disposable += signalDisposable
lifetime.observeEnded(signalDisposable.dispose)

signal.observe { event in
observer.action(event.mapError(ActionError.producerFailed))
self.eventsObserver.send(value: event)
}
}

disposable += {
lifetime.observeEnded {
self.state.modify {
$0.isExecuting = false
}
Expand Down
25 changes: 25 additions & 0 deletions Sources/Deprecations+Removals.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,31 @@ import Dispatch
import Result

// MARK: Unavailable methods in ReactiveSwift 2.0.
extension Lifetime {
@available(*, unavailable, renamed:"hasEnded")
public var isDisposed: Bool { fatalError() }

@discardableResult
@available(*, unavailable, renamed:"observeEnded(_:)")
public func add(_ action: () -> Void) -> Disposable? { fatalError() }

@discardableResult
@available(*, unavailable, message:"Use `observeEnded(_:)` instead.")
public static func += (left: Lifetime, right: () -> Void) -> Disposable? { fatalError() }

@discardableResult
@available(*, deprecated, message:"Use `observeEnded(_:)` with a method reference to `dispose()` instead. This method is subject to removal in a future release.")
public func add(_ d: Disposable?) -> Disposable? {
return d.flatMap { observeEnded($0.dispose) }
}

@discardableResult
@available(*, deprecated, message:"Use `observeEnded(_:)` with a method reference to `dispose()` instead. This operator overload is subject to removal in a future release.")
public static func += (left: Lifetime, right: Disposable?) -> Disposable? {
return right.flatMap { left.observeEnded($0.dispose) }
}
}

extension SignalProducerProtocol {
@available(*, unavailable, renamed:"init(_:)")
public static func attempt(_ operation: @escaping () -> Result<Value, Error>) -> SignalProducer<Value, Error> { fatalError() }
Expand Down
34 changes: 21 additions & 13 deletions Sources/Flatten.swift
Original file line number Diff line number Diff line change
Expand Up @@ -509,11 +509,13 @@ extension SignalProducer where Value: SignalProducerProtocol, Error == Value.Err
fileprivate func concurrent(limit: UInt) -> SignalProducer<Value.Value, Error> {
precondition(limit > 0, "The concurrent limit must be greater than zero.")

return SignalProducer<Value.Value, Error> { relayObserver, disposable in
return SignalProducer<Value.Value, Error> { relayObserver, lifetime in
self.startWithSignal { signal, signalDisposable in
disposable += signalDisposable
let disposables = CompositeDisposable()
lifetime.observeEnded(signalDisposable.dispose)
lifetime.observeEnded(disposables.dispose)

_ = signal.observeConcurrent(relayObserver, limit, disposable)
_ = signal.observeConcurrent(relayObserver, limit, disposables)
}
}
}
Expand Down Expand Up @@ -767,13 +769,16 @@ extension SignalProducer where Value: SignalProducerProtocol, Error == Value.Err
/// - returns: A signal that forwards values from the latest signal sent on
/// `signal`, ignoring values sent on previous inner signal.
fileprivate func switchToLatest() -> SignalProducer<Value.Value, Error> {
return SignalProducer<Value.Value, Error> { observer, disposable in
return SignalProducer<Value.Value, Error> { observer, lifetime in
let latestInnerDisposable = SerialDisposable()
disposable += latestInnerDisposable
lifetime.observeEnded(latestInnerDisposable.dispose)

self.startWithSignal { signal, signalDisposable in
disposable += signalDisposable
disposable += signal.observeSwitchToLatest(observer, latestInnerDisposable)
lifetime.observeEnded(signalDisposable.dispose)

if let disposable = signal.observeSwitchToLatest(observer, latestInnerDisposable) {
lifetime.observeEnded(disposable.dispose)
}
}
}
}
Expand Down Expand Up @@ -891,13 +896,16 @@ extension SignalProducer where Value: SignalProducerProtocol, Error == Value.Err
///
/// The returned producer completes when `self` and the winning inner producer have both completed.
fileprivate func race() -> SignalProducer<Value.Value, Error> {
return SignalProducer<Value.Value, Error> { observer, disposable in
return SignalProducer<Value.Value, Error> { observer, lifetime in
let relayDisposable = CompositeDisposable()
disposable += relayDisposable
lifetime.observeEnded(relayDisposable.dispose)

self.startWithSignal { signal, signalDisposable in
disposable += signalDisposable
disposable += signal.observeRace(observer, relayDisposable)
lifetime.observeEnded(signalDisposable.dispose)

if let disposable = signal.observeRace(observer, relayDisposable) {
lifetime.observeEnded(disposable.dispose)
}
}
}
}
Expand Down Expand Up @@ -1216,9 +1224,9 @@ extension SignalProducer {
/// - transform: A closure that accepts emitted error and returns a signal
/// producer with a different type of error.
public func flatMapError<F>(_ transform: @escaping (Error) -> SignalProducer<Value, F>) -> SignalProducer<Value, F> {
return SignalProducer<Value, F> { observer, disposable in
return SignalProducer<Value, F> { observer, lifetime in
let serialDisposable = SerialDisposable()
disposable += serialDisposable
lifetime.observeEnded(serialDisposable.dispose)

self.startWithSignal { signal, signalDisposable in
serialDisposable.inner = signalDisposable
Expand Down
6 changes: 2 additions & 4 deletions Sources/FoundationExtensions.swift
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ extension Reactive where Base: URLSession {
/// side error (i.e. when a response with status code other than
/// 200...299 is received).
public func data(with request: URLRequest) -> SignalProducer<(Data, URLResponse), AnyError> {
return SignalProducer { [base = self.base] observer, disposable in
return SignalProducer { [base = self.base] observer, lifetime in
let task = base.dataTask(with: request) { data, response, error in
if let data = data, let response = response {
observer.send(value: (data, response))
Expand All @@ -73,9 +73,7 @@ extension Reactive where Base: URLSession {
}
}

disposable += {
task.cancel()
}
lifetime.observeEnded(task.cancel)
task.resume()
}
}
Expand Down
71 changes: 38 additions & 33 deletions Sources/Lifetime.swift
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,29 @@ import enum Result.NoError
/// Represents the lifetime of an object, and provides a hook to observe when
/// the object deinitializes.
public final class Lifetime {
// MARK: Type properties and methods

/// Factory method for creating a `Lifetime` and its associated `Token`.
///
/// - returns: A `(lifetime, token)` tuple.
public static func make() -> (lifetime: Lifetime, token: Token) {
let token = Token()
return (Lifetime(token), token)
}

/// A `Lifetime` that has already ended.
public static var empty: Lifetime {
return Lifetime(ended: .empty)
}

// MARK: Instance properties
private let disposables: CompositeDisposable

/// A signal that sends a `completed` event when the lifetime ends.
///
/// - note: Consider using `Lifetime.observeEnded` if only a closure observer
/// is to be attached.
public let ended: Signal<Never, NoError>
public var ended: Signal<Never, NoError> {
return Signal { observer in
return disposables += observer.sendCompleted
}
}

// MARK: Initializers
/// A flag indicating whether the lifetime has ended.
public var hasEnded: Bool {
return disposables.isDisposed
}

/// Initialize a `Lifetime` object with the supplied ended signal.
/// Initialize a `Lifetime` object with the supplied composite disposable.
///
/// - parameters:
/// - signal: The ended signal.
private init(ended signal: Signal<Never, NoError>) {
ended = signal
/// - signal: The composite disposable.
internal init(_ disposables: CompositeDisposable) {
self.disposables = disposables
}

/// Initialize a `Lifetime` from a lifetime token, which is expected to be
Expand All @@ -47,7 +39,7 @@ public final class Lifetime {
/// - token: A lifetime token for detecting the deinitialization of the
/// associated object.
public convenience init(_ token: Token) {
self.init(ended: token.ended)
self.init(token.disposables)
}

/// Observe the termination of `self`.
Expand All @@ -59,13 +51,28 @@ public final class Lifetime {
/// if `lifetime` has already ended.
@discardableResult
public func observeEnded(_ action: @escaping () -> Void) -> Disposable? {
return ended.observe { event in
if event.isTerminating {
action()
}
}
return disposables += action
}
}

extension Lifetime {
/// Factory method for creating a `Lifetime` and its associated `Token`.
///
/// - returns: A `(lifetime, token)` tuple.
public static func make() -> (lifetime: Lifetime, token: Token) {
let token = Token()
return (Lifetime(token), token)
}

/// A `Lifetime` that has already ended.
public static let empty: Lifetime = {
let disposables = CompositeDisposable()
disposables.dispose()
return Lifetime(disposables)
}()
}

extension Lifetime {
/// A token object which completes its signal when it deinitializes.
///
/// It is generally used in conjuncion with `Lifetime` as a private
Expand All @@ -78,16 +85,14 @@ public final class Lifetime {
/// ```
public final class Token {
/// A signal that sends a Completed event when the lifetime ends.
fileprivate let ended: Signal<Never, NoError>

private let endedObserver: Signal<Never, NoError>.Observer
fileprivate let disposables: CompositeDisposable

public init() {
(ended, endedObserver) = Signal.pipe()
disposables = CompositeDisposable()
}

deinit {
endedObserver.sendCompleted()
disposables.dispose()
}
}
}
19 changes: 12 additions & 7 deletions Sources/Property.swift
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,10 @@ public final class Property<Value>: PropertyProtocol {
/// - values: A producer that will start immediately and send values to
/// the property.
public convenience init(initial: Value, then values: SignalProducer<Value, NoError>) {
self.init(unsafeProducer: SignalProducer { observer, disposables in
self.init(unsafeProducer: SignalProducer { observer, lifetime in
observer.send(value: initial)
disposables += values.start(Signal.Observer(mappingInterruptedToCompleted: observer))
let disposable = values.start(Signal.Observer(mappingInterruptedToCompleted: observer))
lifetime.observeEnded(disposable.dispose)
})
}

Expand Down Expand Up @@ -557,10 +558,12 @@ public final class Property<Value>: PropertyProtocol {
_value = { box.value! }
signal = relay

producer = SignalProducer { [box, signal = relay!] observer, disposable in
producer = SignalProducer { [box, signal = relay!] observer, lifetime in
box.modify { value in
observer.send(value: value!)
disposable += signal.observe(Signal.Observer(mappingInterruptedToCompleted: observer))
if let d = signal.observe(Signal.Observer(mappingInterruptedToCompleted: observer)) {
lifetime.observeEnded(d.dispose)
}
}
}
}
Expand Down Expand Up @@ -617,10 +620,12 @@ public final class MutableProperty<Value>: ComposableMutablePropertyProtocol {
/// followed by all changes over time, then complete when the property has
/// deinitialized.
public var producer: SignalProducer<Value, NoError> {
return SignalProducer { [box, signal] producerObserver, producerDisposable in
return SignalProducer { [box, signal] observer, lifetime in
box.modify { value in
producerObserver.send(value: value)
producerDisposable += signal.observe(Signal.Observer(mappingInterruptedToCompleted: producerObserver))
observer.send(value: value)
if let d = signal.observe(Signal.Observer(mappingInterruptedToCompleted: observer)) {
lifetime.observeEnded(d.dispose)
}
}
}
}
Expand Down
Loading

0 comments on commit ffb2b47

Please sign in to comment.