Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce FlattenStrategy.concurrent. #298

Merged
merged 7 commits into from
Mar 31, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
270 changes: 111 additions & 159 deletions Sources/Flatten.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,27 @@ public enum FlattenStrategy: Equatable {
///
/// The resulting producer will complete only when all inputs have
/// completed.
case merge
public static let merge = FlattenStrategy.concurrent(limit: .max)

/// The producers should be concatenated, so that their values are sent in
/// the order of the producers themselves.
///
/// The resulting producer will complete only when all inputs have
/// completed.
case concat
public static let concat = FlattenStrategy.concurrent(limit: 1)

/// The producers should be merged, but only up to the given limit at any
/// point of time, so that any value received on any of the input producers
/// will be forwarded immediately to the output producer.
///
/// When the number of active producers reaches the limit, subsequent
/// producers are queued.
///
/// The resulting producer will complete only when all inputs have
/// completed.
///
/// - precondition: `limit > 0`.
case concurrent(limit: UInt)

/// Only the events from the latest input producer should be considered for
/// the output. Any producers received before that point will be disposed
Expand All @@ -31,8 +44,20 @@ public enum FlattenStrategy: Equatable {
/// The resulting producer will complete only when the producer-of-producers
/// and the latest producer has completed.
case latest
}

public static func ==(left: FlattenStrategy, right: FlattenStrategy) -> Bool {
switch (left, right) {
case (.latest, .latest):
return true

case (.concurrent(let leftLimit), .concurrent(let rightLimit)):
return leftLimit == rightLimit

default:
return false
}
}
}

extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
/// Flattens the inner producers sent upon `signal` (into a single signal of
Expand All @@ -45,11 +70,8 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Err
/// `Completed events on inner producers.
public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Error> {
switch strategy {
case .merge:
return self.merge()

case .concat:
return self.concat()
case .concurrent(let limit):
return self.concurrent(limit: limit)

case .latest:
return self.switchToLatest()
Expand Down Expand Up @@ -87,11 +109,8 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == NoError,
/// - strategy: Strategy used when flattening signals.
public func flatten(_ strategy: FlattenStrategy) -> Signal<Value.Value, Value.Error> {
switch strategy {
case .merge:
return self.merge()

case .concat:
return self.concat()
case .concurrent(let limit):
return self.concurrent(limit: limit)

case .latest:
return self.switchToLatest()
Expand Down Expand Up @@ -124,11 +143,8 @@ extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == V
/// `completed` events on inner producers.
public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Error> {
switch strategy {
case .merge:
return self.merge()

case .concat:
return self.concat()
case .concurrent(let limit):
return self.concurrent(limit: limit)

case .latest:
return self.switchToLatest()
Expand Down Expand Up @@ -160,11 +176,8 @@ extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == N
/// `completed` events on inner producers.
public func flatten(_ strategy: FlattenStrategy) -> SignalProducer<Value.Value, Value.Error> {
switch strategy {
case .merge:
return self.merge()

case .concat:
return self.concat()
case .concurrent(let limit):
return self.concurrent(limit: limit)

case .latest:
return self.switchToLatest()
Expand Down Expand Up @@ -341,58 +354,61 @@ extension SignalProducerProtocol where Value: PropertyProtocol {
}

extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
/// Returns a signal which sends all the values from producer signal emitted
/// from `signal`, waiting until each inner producer completes before
/// beginning to send the values from the next inner producer.
///
/// - note: If any of the inner producers fail, the returned signal will
/// forward that failure immediately
///
/// - note: The returned signal completes only when `signal` and all
/// producers emitted from `signal` complete.
fileprivate func concat() -> Signal<Value.Value, Error> {
fileprivate func concurrent(limit: UInt) -> Signal<Value.Value, Error> {
precondition(limit > 0, "The concurrent limit must be greater than zero.")

return Signal<Value.Value, Error> { relayObserver in
let disposable = CompositeDisposable()
let relayDisposable = CompositeDisposable()

disposable += relayDisposable
disposable += self.observeConcat(relayObserver, relayDisposable)
disposable += self.observeConcurrent(relayObserver, limit, relayDisposable)

return disposable
}
}
fileprivate func observeConcat(_ observer: Observer<Value.Value, Error>, _ disposable: CompositeDisposable? = nil) -> Disposable? {
let state = Atomic(ConcatState<Value.Value, Error>())

fileprivate func observeConcurrent(_ observer: Observer<Value.Value, Error>, _ limit: UInt, _ disposable: CompositeDisposable) -> Disposable? {
let state = Atomic(ConcurrentFlattenState<Value.Value, Error>(limit: limit))

func startNextIfNeeded() {
while let producer = state.modify({ $0.dequeue() }) {
let producerState = UnsafeAtomicState<ProducerState>(.starting)
let deinitializer = ScopedDisposable(ActionDisposable(action: producerState.deinitialize))

producer.startWithSignal { signal, inner in
let handle = disposable?.add(inner)
let handle = disposable.add(inner)

signal.observe { event in
switch event {
case .completed, .interrupted:
handle?.remove()
let shouldStart: Bool = state.modify {
$0.active = nil
return !$0.isStarting
handle.remove()

let shouldComplete: Bool = state.modify { state in
state.activeCount -= 1
return state.shouldComplete
}

if shouldStart {
startNextIfNeeded()
withExtendedLifetime(deinitializer) {
if shouldComplete {
observer.sendCompleted()
} else if producerState.is(.started) {
startNextIfNeeded()
}
}

case .value, .failed:
observer.action(event)
}
}
}
state.modify { $0.isStarting = false }

withExtendedLifetime(deinitializer) {
producerState.setStarted()
}
}
}

return observe { event in
switch event {
case let .value(value):
Expand All @@ -403,11 +419,15 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Err
observer.send(error: error)

case .completed:
state.modify { state in
state.queue.append(SignalProducer.empty.on(completed: observer.sendCompleted))
let shouldComplete: Bool = state.modify { state in
state.isOuterCompleted = true
return state.shouldComplete
}
startNextIfNeeded()


if shouldComplete {
observer.sendCompleted()
}

case .interrupted:
observer.sendInterrupted()
}
Expand All @@ -416,20 +436,14 @@ extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Err
}

extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
/// Returns a producer which sends all the values from each producer emitted
/// from `producer`, waiting until each inner producer completes before
/// beginning to send the values from the next inner producer.
///
/// - note: If any of the inner producers emit an error, the returned
/// producer will emit that error.
///
/// - note: The returned producer completes only when `producer` and all
/// producers emitted from `producer` complete.
fileprivate func concat() -> SignalProducer<Value.Value, Error> {
return SignalProducer<Value.Value, Error> { observer, disposable in
fileprivate func concurrent(limit: UInt) -> SignalProducer<Value.Value, Error> {
precondition(limit > 0, "The concurrent limit must be greater than zero.")

return SignalProducer<Value.Value, Error> { relayObserver, disposable in
self.startWithSignal { signal, signalDisposable in
disposable += signalDisposable
_ = signal.observeConcat(observer, disposable)

_ = signal.observeConcurrent(relayObserver, limit, disposable)
}
}
}
Expand Down Expand Up @@ -459,114 +473,52 @@ extension SignalProducerProtocol {
}
}

private final class ConcatState<Value, Error: Swift.Error> {
typealias SignalProducer = ReactiveSwift.SignalProducer<Value, Error>
private final class ConcurrentFlattenState<Value, Error: Swift.Error> {
typealias Producer = ReactiveSwift.SignalProducer<Value, Error>

/// The limit of active producers.
let limit: UInt

/// The active producer, if any.
var active: SignalProducer? = nil
/// The number of active producers.
var activeCount: UInt = 0

/// The producers waiting to be started.
var queue: [SignalProducer] = []

/// Whether the active producer is currently starting.
/// Used to prevent deep recursion.
var isStarting: Bool = false
var queue: [Producer] = []

/// Whether the outer producer has completed.
var isOuterCompleted = false

/// Whether the flattened signal should complete.
var shouldComplete: Bool {
return isOuterCompleted && activeCount == 0 && queue.isEmpty
}

init(limit: UInt) {
self.limit = limit
}

/// Dequeue the next producer if one should be started.
///
/// - note: The caller *must* set `isStarting` to false after the returned
/// producer has been started.
///
/// - returns: The `SignalProducer` to start or `nil` if no producer should
/// be started.
func dequeue() -> SignalProducer? {
if active != nil {
/// - returns: The `Producer` to start or `nil` if no producer should be
/// started.
func dequeue() -> Producer? {
if activeCount < limit, !queue.isEmpty {
activeCount += 1
return queue.removeFirst()
} else {
return nil
}

active = queue.first
if active != nil {
queue.removeFirst()
isStarting = true
}
return active
}
}

extension SignalProtocol where Value: SignalProducerProtocol, Error == Value.Error {
/// Merges a `signal` of SignalProducers down into a single signal, biased
/// toward the producer added earlier. Returns a Signal that will forward
/// events from the inner producers as they arrive.
fileprivate func merge() -> Signal<Value.Value, Error> {
return Signal<Value.Value, Error> { relayObserver in
let disposable = CompositeDisposable()
let relayDisposable = CompositeDisposable()

disposable += relayDisposable
disposable += self.observeMerge(relayObserver, relayDisposable)

return disposable
}
}

fileprivate func observeMerge(_ observer: Observer<Value.Value, Error>, _ disposable: CompositeDisposable) -> Disposable? {
let inFlight = Atomic(1)
let decrementInFlight = {
let shouldComplete: Bool = inFlight.modify {
$0 -= 1
return $0 == 0
}

if shouldComplete {
observer.sendCompleted()
}
}

return self.observe { event in
switch event {
case let .value(producer):
producer.startWithSignal { innerSignal, innerDisposable in
inFlight.modify { $0 += 1 }
let handle = disposable.add(innerDisposable)

innerSignal.observe { event in
switch event {
case .completed, .interrupted:
handle.remove()
decrementInFlight()

case .value, .failed:
observer.action(event)
}
}
}

case let .failed(error):
observer.send(error: error)

case .completed:
decrementInFlight()

case .interrupted:
observer.sendInterrupted()
}
}
}
private enum ProducerState: Int32 {
case starting
case started
}

extension SignalProducerProtocol where Value: SignalProducerProtocol, Error == Value.Error {
/// Merges a `signal` of SignalProducers down into a single signal, biased
/// toward the producer added earlier. Returns a Signal that will forward
/// events from the inner producers as they arrive.
fileprivate func merge() -> SignalProducer<Value.Value, Error> {
return SignalProducer<Value.Value, Error> { relayObserver, disposable in
self.startWithSignal { signal, signalDisposable in
disposable += signalDisposable

_ = signal.observeMerge(relayObserver, disposable)
}

}
extension AtomicStateProtocol where State == ProducerState {
fileprivate func setStarted() {
precondition(tryTransition(from: .starting, to: .started), "The transition is not supposed to fail.")
}
}

Expand Down
Loading