Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add FlattenStrategy.race #233

Merged
merged 2 commits into from
Apr 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
144 changes: 144 additions & 0 deletions Sources/Flatten.swift
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ public enum FlattenStrategy: Equatable {
/// and the latest producer has completed.
case latest

/// Only the events from the "first input producer to send an event" (winning producer)
/// should be considered for the output.
/// Any other producers that already started (but not sending an event yet)
/// will be disposed.
///
/// The resulting producer will complete when:
/// 1. The producer-of-producers and the first "alive" producer has completed.
/// 2. The producer-of-producers has completed without inner producer being "alive".
case race

public static func ==(left: FlattenStrategy, right: FlattenStrategy) -> Bool {
switch (left, right) {
case (.latest, .latest):
Expand Down Expand Up @@ -78,6 +88,9 @@ extension Signal where Value: SignalProducerProtocol, Error == Value.Error {

case .latest:
return self.switchToLatest()

case .race:
return self.race()
}
}
}
Expand Down Expand Up @@ -117,6 +130,9 @@ extension Signal where Value: SignalProducerProtocol, Error == NoError, Value.Er

case .latest:
return self.switchToLatest()

case .race:
return self.race()
}
}
}
Expand Down Expand Up @@ -157,6 +173,9 @@ extension SignalProducer where Value: SignalProducerProtocol, Error == Value.Err

case .latest:
return self.switchToLatest()

case .race:
return self.race()
}
}
}
Expand Down Expand Up @@ -196,6 +215,9 @@ extension SignalProducer where Value: SignalProducerProtocol, Error == NoError,

case .latest:
return self.switchToLatest()

case .race:
return self.race()
}
}
}
Expand Down Expand Up @@ -764,6 +786,128 @@ private struct LatestState<Value, Error: Swift.Error> {
var replacingInnerSignal: Bool = false
}

extension Signal where Value: SignalProducerProtocol, Error == Value.Error {
/// Returns a signal that forwards values from the "first input signal to send an event"
/// (winning signal) that is sent on `self`, ignoring values sent from other inner signals.
///
/// An error sent on `self` or the winning inner signal will be sent on the
/// returned signal.
///
/// The returned signal completes when `self` and the winning inner signal have both completed.
fileprivate func race() -> Signal<Value.Value, Error> {
return Signal<Value.Value, Error> { observer in
let composite = CompositeDisposable()
let relayDisposable = CompositeDisposable()

composite += relayDisposable
composite += self.observeRace(observer, relayDisposable)

return composite
}
}

fileprivate func observeRace(_ observer: ReactiveSwift.Observer<Value.Value, Error>, _ relayDisposable: CompositeDisposable) -> Disposable? {
let state = Atomic(RaceState<Value.Value, Error>())

return self.observe { event in
switch event {
case let .value(innerProducer):
// Ignore consecutive `innerProducer`s if any `innerSignal` already sent an event.
guard !relayDisposable.isDisposed else {
return
}

innerProducer.producer.startWithSignal { innerSignal, innerDisposable in

state.modify {
$0.innerSignalComplete = false
}

let disposableHandle = relayDisposable.add(innerDisposable)

innerSignal.observe { [unowned innerSignal] event in

let isWinningSignal: Bool = state.modify { state in
if state.active == nil {
state.active = innerSignal
}
return state.active === innerSignal
}

// Ignore non-winning signals.
guard isWinningSignal else { return }

// Dispose all running innerSignals except winning one.
if !relayDisposable.isDisposed {
disposableHandle.remove()
relayDisposable.dispose()
}

switch event {
case .completed:
let shouldComplete: Bool = state.modify { state in
state.active = nil
state.innerSignalComplete = true
return state.outerSignalComplete
}

if shouldComplete {
observer.sendCompleted()
}

case .value, .failed, .interrupted:
observer.action(event)
}
}
}

case let .failed(error):
observer.send(error: error)

case .completed:
let shouldComplete: Bool = state.modify { state in
state.outerSignalComplete = true
return state.innerSignalComplete
}

if shouldComplete {
observer.sendCompleted()
}

case .interrupted:
observer.sendInterrupted()
}
}
}
}

extension SignalProducer where Value: SignalProducerProtocol, Error == Value.Error {
/// Returns a producer that forwards values from the "first input producer to send an event"
/// (winning producer) that is sent on `self`, ignoring values sent from other inner producers.
///
/// An error sent on `self` or the winning inner producer will be sent on the
/// returned producer.
///
/// The returned producer completes when `self` and the winning inner producer have both completed.
fileprivate func race() -> SignalProducer<Value.Value, Error> {
return SignalProducer<Value.Value, Error> { observer, disposable in
let relayDisposable = CompositeDisposable()
disposable += relayDisposable

self.startWithSignal { signal, signalDisposable in
disposable += signalDisposable
disposable += signal.observeRace(observer, relayDisposable)
}
}
}
}

private struct RaceState<Value, Error: Swift.Error> {
var outerSignalComplete: Bool = false
var innerSignalComplete: Bool = true

var active: Signal<Value, Error>? = nil
}

extension Signal {
/// Maps each event from `signal` to a new signal, then flattens the
Expand Down
4 changes: 3 additions & 1 deletion Tests/ReactiveSwiftTests/FlattenSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class FlattenSpec: QuickSpec {
describeSignalFlattenDisposal(.merge, name: "merge")
describeSignalFlattenDisposal(.concat, name: "concat")
describeSignalFlattenDisposal(.concurrent(limit: 1024), name: "concurrent(limit: 1024)")
describeSignalFlattenDisposal(.race, name: "race")
}

func describeSignalProducerFlattenDisposal(_ flattenStrategy: FlattenStrategy, name: String) {
Expand All @@ -97,8 +98,9 @@ class FlattenSpec: QuickSpec {
describeSignalProducerFlattenDisposal(.merge, name: "merge")
describeSignalProducerFlattenDisposal(.concat, name: "concat")
describeSignalProducerFlattenDisposal(.concurrent(limit: 1024), name: "concurrent(limit: 1024)")
describeSignalProducerFlattenDisposal(.race, name: "race")
}

describe("Signal.flatten()") {
it("works with TestError and a TestError Signal") {
typealias Inner = Signal<Int, TestError>
Expand Down
107 changes: 107 additions & 0 deletions Tests/ReactiveSwiftTests/SignalProducerSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,113 @@ class SignalProducerSpec: QuickSpec {
}
}

describe("FlattenStrategy.race") {
it("should forward values from the first inner producer to send an event") {
let (outer, outerObserver) = SignalProducer<SignalProducer<Int, TestError>, TestError>.pipe()
let (firstInner, firstInnerObserver) = SignalProducer<Int, TestError>.pipe()
let (secondInner, secondInnerObserver) = SignalProducer<Int, TestError>.pipe()

var receivedValues: [Int] = []
var errored = false
var completed = false

outer.flatten(.race).start { event in
switch event {
case let .value(value):
receivedValues.append(value)
case .completed:
completed = true
case .failed:
errored = true
case .interrupted:
break
}
}

outerObserver.send(value: firstInner)
outerObserver.send(value: secondInner)
firstInnerObserver.send(value: 1)
secondInnerObserver.send(value: 2)
outerObserver.sendCompleted()

expect(receivedValues) == [ 1 ]
expect(errored) == false
expect(completed) == false

secondInnerObserver.send(value: 3)
secondInnerObserver.sendCompleted()

expect(receivedValues) == [ 1 ]
expect(errored) == false
expect(completed) == false

firstInnerObserver.send(value: 4)
firstInnerObserver.sendCompleted()

expect(receivedValues) == [ 1, 4 ]
expect(errored) == false
expect(completed) == true
}

it("should forward an error from the first inner producer to send an error") {
let inner = SignalProducer<Int, TestError>(error: .default)
let outer = SignalProducer<SignalProducer<Int, TestError>, TestError>(value: inner)

let result = outer.flatten(.race).first()
expect(result?.error) == TestError.default
}

it("should forward an error from the outer producer") {
let outer = SignalProducer<SignalProducer<Int, TestError>, TestError>(error: .default)

let result = outer.flatten(.race).first()
expect(result?.error) == TestError.default
}

it("should complete when the 'outer producer' and 'first inner producer to send an event' have completed") {
let inner = SignalProducer<Int, TestError>.empty
let outer = SignalProducer<SignalProducer<Int, TestError>, TestError>(value: inner)

var completed = false
outer.flatten(.race).startWithCompleted {
completed = true
}

expect(completed) == true
}

it("should complete when the outer producer completes before sending any inner producers") {
let outer = SignalProducer<SignalProducer<Int, TestError>, TestError>.empty

var completed = false
outer.flatten(.race).startWithCompleted {
completed = true
}

expect(completed) == true
}

it("should not complete when the outer producer completes after sending an inner producer but it doesn't send an event") {
let inner = SignalProducer<Int, TestError>.never
let outer = SignalProducer<SignalProducer<Int, TestError>, TestError>(value: inner)

var completed = false
outer.flatten(.race).startWithCompleted {
completed = true
}

expect(completed) == false
}

it("should not deadlock") {
let producer = SignalProducer<Int, NoError>(value: 1)
.flatMap(.race) { _ in SignalProducer(value: 10) }

let result = producer.take(first: 1).last()
expect(result?.value) == 10
}
}

describe("interruption") {
var innerObserver: Signal<(), NoError>.Observer!
var outerObserver: Signal<SignalProducer<(), NoError>, NoError>.Observer!
Expand Down