Skip to content

Commit

Permalink
Optimized producer lifting.
Browse files Browse the repository at this point in the history
  • Loading branch information
andersio committed Dec 7, 2016
1 parent 9030845 commit 1a48d8f
Showing 1 changed file with 98 additions and 76 deletions.
174 changes: 98 additions & 76 deletions Sources/SignalProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import Dispatch
import Foundation
import Result

/// A SignalProducer creates Signals that can produce values of type `Value`
/// A SignalProducer creates Signals that can produce values of type `Value`
/// and/or fail with errors of type `Error`. If no failure should be possible,
/// `NoError` can be specified for `Error`.
///
Expand All @@ -19,7 +19,7 @@ import Result
public struct SignalProducer<Value, Error: Swift.Error> {
public typealias ProducedSignal = Signal<Value, Error>

private let startHandler: (Signal<Value, Error>.Observer, CompositeDisposable) -> Void
fileprivate let startHandler: () -> (signal: Signal<Value, Error>, () -> Void, Disposable)

/// Initializes a `SignalProducer` that will emit the same events as the
/// given signal.
Expand Down Expand Up @@ -49,6 +49,29 @@ public struct SignalProducer<Value, Error: Swift.Error> {
/// - parameters:
/// - startHandler: A closure that accepts observer and a disposable.
public init(_ startHandler: @escaping (Signal<Value, Error>.Observer, CompositeDisposable) -> Void) {
self.init {
let disposable = CompositeDisposable()
let (signal, observer) = Signal<Value, Error>.pipe(disposable: disposable)
let start = { startHandler(observer, disposable) }
let interrupter = ActionDisposable(action: observer.sendInterrupted)

return (signal, start, interrupter)
}
}

/// Create a SignalProducer that will invoke the given factory once for each
/// invocation of `start()`.
///
/// - warning: `Signal` should not escape the factory unles absolutely
/// necessary.
///
/// - note: When nesting factories, remember to call the upstream's starting
/// side effect.
///
/// - parameters:
/// - startHandler: A factory that produces a `Signal`, its associated
/// starting side effects and an interrupter.
internal init(_ startHandler: @escaping () -> (Signal<Value, Error>, () -> Void, Disposable)) {
self.startHandler = startHandler
}

Expand Down Expand Up @@ -188,10 +211,22 @@ public struct SignalProducer<Value, Error: Swift.Error> {
return
}

startHandler(observer, producerDisposable)
let (upstream, start, interrupter) = startHandler()
producerDisposable += interrupter
upstream.observe(observer)
start()
}
}

/// The start order of `SignalProducer`s in a binary lifted operator.
internal enum StartOrder {
/// Start the left producer first.
case leftRight

/// Start the right producer first.
case rightLeft
}

public protocol SignalProducerProtocol {
/// The type of values being sent on the producer
associatedtype Value
Expand Down Expand Up @@ -365,15 +400,49 @@ extension SignalProducerProtocol {
/// - returns: A signal producer that applies signal's operator to every
/// created signal.
public func lift<U, F>(_ transform: @escaping (Signal<Value, Error>) -> Signal<U, F>) -> SignalProducer<U, F> {
return SignalProducer { observer, outerDisposable in
self.startWithSignal { signal, innerDisposable in
outerDisposable += innerDisposable
return SignalProducer<U, F> {
let (signal, start, interrupter) = self.producer.startHandler()
return (transform(signal), start, interrupter)
}
}

/// Lift a binary Signal operator to operate upon SignalProducers.
///
/// The start order of the producer must be specified. When both producers are
/// synchronous this order can be important depending on the operator to
/// generate correct results.
///
/// - parameters:
/// - order: The start order of the producers.
///
/// - returns: A factory that creates a SignalProducer with the given operator
/// applied. `self` would be the LHS, and the factory input would
/// be the RHS.
fileprivate func lift<U, F, V, G>(_ order: StartOrder, _ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
return { otherProducer in
return SignalProducer<V, G> {
let (leftSignal, leftStart, leftInterrupter) = self.producer.startHandler()
let (rightSignal, rightStart, rightInterrupter) = otherProducer.startHandler()

let start = {
switch order {
case .rightLeft:
rightStart()
leftStart()

case .leftRight:
leftStart()
rightStart()
}
}

transform(signal).observe(observer)
return (transform(leftSignal)(rightSignal), start, ActionDisposable {
leftInterrupter.dispose()
rightInterrupter.dispose()
})
}
}
}


/// Lift a binary Signal operator to operate upon SignalProducers instead.
///
Expand All @@ -390,47 +459,7 @@ extension SignalProducerProtocol {
///
/// - returns: A binary operator that operates on two signal producers.
public func lift<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
return liftRight(transform)
}

/// Right-associative lifting of a binary signal operator over producers.
/// That is, the argument producer will be started before the receiver. When
/// both producers are synchronous this order can be important depending on
/// the operator to generate correct results.
private func liftRight<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
return { otherProducer in
return SignalProducer { observer, outerDisposable in
self.startWithSignal { signal, disposable in
outerDisposable.add(disposable)

otherProducer.startWithSignal { otherSignal, otherDisposable in
outerDisposable += otherDisposable

transform(signal)(otherSignal).observe(observer)
}
}
}
}
}

/// Left-associative lifting of a binary signal operator over producers.
/// That is, the receiver will be started before the argument producer. When
/// both producers are synchronous this order can be important depending on
/// the operator to generate correct results.
private func liftLeft<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (SignalProducer<U, F>) -> SignalProducer<V, G> {
return { otherProducer in
return SignalProducer { observer, outerDisposable in
otherProducer.startWithSignal { otherSignal, otherDisposable in
outerDisposable += otherDisposable

self.startWithSignal { signal, disposable in
outerDisposable.add(disposable)

transform(signal)(otherSignal).observe(observer)
}
}
}
}
return lift(.rightLeft, transform)
}

/// Lift a binary Signal operator to operate upon a Signal and a
Expand All @@ -448,7 +477,7 @@ extension SignalProducerProtocol {
/// `SignalProducer`.
public func lift<U, F, V, G>(_ transform: @escaping (Signal<Value, Error>) -> (Signal<U, F>) -> Signal<V, G>) -> (Signal<U, F>) -> SignalProducer<V, G> {
return { otherSignal in
return self.liftRight(transform)(SignalProducer(signal: otherSignal))
return self.lift(.rightLeft, transform)(SignalProducer(signal: otherSignal))
}
}

Expand Down Expand Up @@ -644,7 +673,7 @@ extension SignalProducerProtocol {
/// - returns: A producer that, when started, will yield a tuple containing
/// values of `self` and given producer.
public func combineLatest<U>(with other: SignalProducer<U, Error>) -> SignalProducer<(Value, U), Error> {
return liftLeft(Signal.combineLatest)(other)
return lift(.leftRight, Signal.combineLatest)(other)
}

/// Combine the latest value of the receiver with the latest value from
Expand Down Expand Up @@ -722,7 +751,7 @@ extension SignalProducerProtocol {
/// once both input producers have completed, or interrupt if
/// either input producer is interrupted.
public func sample<T>(with sampler: SignalProducer<T, NoError>) -> SignalProducer<(Value, T), Error> {
return liftLeft(Signal.sample(with:))(sampler)
return lift(.leftRight, Signal.sample(with:))(sampler)
}

/// Forward the latest value from `self` with the value from `sampler` as a
Expand Down Expand Up @@ -759,7 +788,7 @@ extension SignalProducerProtocol {
/// once both input producers have completed, or interrupt if
/// either input producer is interrupted.
public func sample(on sampler: SignalProducer<(), NoError>) -> SignalProducer<Value, Error> {
return liftLeft(Signal.sample(on:))(sampler)
return lift(.leftRight, Signal.sample(on:))(sampler)
}

/// Forward the latest value from `self` whenever `sampler` sends a `value`
Expand Down Expand Up @@ -796,7 +825,7 @@ extension SignalProducerProtocol {
/// once `self` has terminated. **`samplee`'s terminated events
/// are ignored**.
public func withLatest<U>(from samplee: SignalProducer<U, NoError>) -> SignalProducer<(Value, U), Error> {
return liftRight(Signal.withLatest)(samplee)
return lift(.rightLeft, Signal.withLatest)(samplee)
}

/// Forward the latest value from `samplee` with the value from `self` as a
Expand Down Expand Up @@ -840,7 +869,7 @@ extension SignalProducerProtocol {
/// - returns: A producer that will deliver events until `trigger` sends
/// `value` or `completed` events.
public func take(until trigger: SignalProducer<(), NoError>) -> SignalProducer<Value, Error> {
return liftRight(Signal.take(until:))(trigger)
return lift(.rightLeft, Signal.take(until:))(trigger)
}

/// Forward events from `self` until `trigger` sends a `value` or
Expand All @@ -867,7 +896,7 @@ extension SignalProducerProtocol {
/// - returns: A producer that will deliver events once the `trigger` sends
/// `value` or `completed` events.
public func skip(until trigger: SignalProducer<(), NoError>) -> SignalProducer<Value, Error> {
return liftRight(Signal.skip(until:))(trigger)
return lift(.rightLeft, Signal.skip(until:))(trigger)
}

/// Do not forward any values from `self` until `trigger` sends a `value`
Expand Down Expand Up @@ -965,7 +994,7 @@ extension SignalProducerProtocol {
/// instead, regardless of whether `self` has sent events
/// already.
public func take(untilReplacement signal: SignalProducer<Value, Error>) -> SignalProducer<Value, Error> {
return liftRight(Signal.take(untilReplacement:))(signal)
return lift(.rightLeft, Signal.take(untilReplacement:))(signal)
}

/// Forwards events from `self` until `replacement` begins sending events.
Expand Down Expand Up @@ -1018,7 +1047,7 @@ extension SignalProducerProtocol {
///
/// - returns: A producer that sends tuples of `self` and `otherProducer`.
public func zip<U>(with other: SignalProducer<U, Error>) -> SignalProducer<(Value, U), Error> {
return liftLeft(Signal.zip(with:))(other)
return lift(.leftRight, Signal.zip(with:))(other)
}

/// Zip elements of this producer and a signal into pairs. The elements of
Expand Down Expand Up @@ -1303,24 +1332,17 @@ extension SignalProducerProtocol {
disposed: (() -> Void)? = nil,
value: ((Value) -> Void)? = nil
) -> SignalProducer<Value, Error> {
return SignalProducer { observer, compositeDisposable in
starting?()
defer { started?() }

self.startWithSignal { signal, disposable in
compositeDisposable += disposable
signal
.on(
event: event,
failed: failed,
completed: completed,
interrupted: interrupted,
terminated: terminated,
disposed: disposed,
value: value
)
.observe(observer)
}
return SignalProducer<Value, Error> {
let (signal, start, interrupter) = self.producer.startHandler()
return (signal.on(event: event,
failed: failed,
completed: completed,
interrupted: interrupted,
terminated: terminated,
disposed: disposed,
value: value),
{ starting?(); start(); started?() },
interrupter)
}
}

Expand Down

0 comments on commit 1a48d8f

Please sign in to comment.