Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed a scenario of downstream interruptions being dropped. #577

Merged
merged 3 commits into from
Dec 22, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,26 @@
# master
*Please add new entries at the top.*

1. Fixed a scenario of downstream interruptions being dropped. (#577, kudos to @andersio)

Manual interruption of time shifted producers, including `delay`, `observe(on:)`, `throttle`, `debounce` and `lazyMap`, should discard outstanding events at best effort ASAP.

But in ReactiveSwift 2.0 to 3.0, the manual interruption is ignored if the upstream producer has terminated. For example:

```swift
// Completed upstream + `delay`.
SignalProducer.empty
.delay(10.0, on: QueueScheduler.main)
.startWithCompleted { print("Value should have been discarded!") }
.dispose()

// Console(t+10): Value should have been discarded!
```

The expected behavior has now been restored.

Please note that, since ReactiveSwift 2.0, while the interruption is handled immediately, the `interrupted` event delivery is not synchronous — it generally respects the closest asynchronous operator applied, and delivers on that scheduler.

1. `concat` for `SignalProducer` now has an overload that accepts an error.

1. Fix some documentation errors (#560, kudos to @ikesyo)
Expand Down
82 changes: 54 additions & 28 deletions Sources/Event.swift
Original file line number Diff line number Diff line change
Expand Up @@ -203,10 +203,10 @@ extension Signal.Event: EventProtocol {
// This operator performs side effect upon interruption.

extension Signal.Event {
internal typealias Transformation<U, E: Swift.Error> = (@escaping Signal<U, E>.Observer.Action) -> (Signal<Value, Error>.Event) -> Void
internal typealias Transformation<U, E: Swift.Error> = (@escaping Signal<U, E>.Observer.Action, Lifetime) -> Signal<Value, Error>.Observer.Action

internal static func filter(_ isIncluded: @escaping (Value) -> Bool) -> Transformation<Value, Error> {
return { action in
return { action, _ in
return { event in
switch event {
case let .value(value):
Expand All @@ -228,7 +228,7 @@ extension Signal.Event {
}

internal static func filterMap<U>(_ transform: @escaping (Value) -> U?) -> Transformation<U, Error> {
return { action in
return { action, _ in
return { event in
switch event {
case let .value(value):
Expand All @@ -250,7 +250,7 @@ extension Signal.Event {
}

internal static func map<U>(_ transform: @escaping (Value) -> U) -> Transformation<U, Error> {
return { action in
return { action, _ in
return { event in
switch event {
case let .value(value):
Expand All @@ -270,7 +270,7 @@ extension Signal.Event {
}

internal static func mapError<E>(_ transform: @escaping (Error) -> E) -> Transformation<Value, E> {
return { action in
return { action, _ in
return { event in
switch event {
case let .value(value):
Expand All @@ -290,7 +290,7 @@ extension Signal.Event {
}

internal static var materialize: Transformation<Signal<Value, Error>.Event, NoError> {
return { action in
return { action, _ in
return { event in
action(.value(event))

Expand All @@ -309,7 +309,7 @@ extension Signal.Event {
}

internal static func attemptMap<U>(_ transform: @escaping (Value) -> Result<U, Error>) -> Transformation<U, Error> {
return { action in
return { action, _ in
return { event in
switch event {
case let .value(value):
Expand Down Expand Up @@ -356,7 +356,7 @@ extension Signal.Event {
internal static func take(first count: Int) -> Transformation<Value, Error> {
assert(count >= 1)

return { action in
return { action, _ in
var taken = 0

return { event in
Expand All @@ -378,7 +378,7 @@ extension Signal.Event {
}

internal static func take(last count: Int) -> Transformation<Value, Error> {
return { action in
return { action, _ in
var buffer: [Value] = []
buffer.reserveCapacity(count)

Expand Down Expand Up @@ -406,7 +406,7 @@ extension Signal.Event {
}

internal static func take(while shouldContinue: @escaping (Value) -> Bool) -> Transformation<Value, Error> {
return { action in
return { action, _ in
return { event in
if let value = event.value, !shouldContinue(value) {
action(.completed)
Expand All @@ -420,7 +420,7 @@ extension Signal.Event {
internal static func skip(first count: Int) -> Transformation<Value, Error> {
precondition(count > 0)

return { action in
return { action, _ in
var skipped = 0

return { event in
Expand All @@ -434,7 +434,7 @@ extension Signal.Event {
}

internal static func skip(while shouldContinue: @escaping (Value) -> Bool) -> Transformation<Value, Error> {
return { action in
return { action, _ in
var isSkipping = true

return { event in
Expand All @@ -455,7 +455,7 @@ extension Signal.Event {

extension Signal.Event where Value: EventProtocol {
internal static var dematerialize: Transformation<Value.Value, Value.Error> {
return { action in
return { action, _ in
return { event in
switch event {
case let .value(innerEvent):
Expand Down Expand Up @@ -524,7 +524,7 @@ extension Signal.Event {
}

internal static func collect(_ shouldEmit: @escaping (_ collectedValues: [Value]) -> Bool) -> Transformation<[Value], Error> {
return { action in
return { action, _ in
let state = CollectState<Value>()

return { event in
Expand All @@ -550,7 +550,7 @@ extension Signal.Event {
}

internal static func collect(_ shouldEmit: @escaping (_ collected: [Value], _ latest: Value) -> Bool) -> Transformation<[Value], Error> {
return { action in
return { action, _ in
let state = CollectState<Value>()

return { event in
Expand Down Expand Up @@ -580,7 +580,7 @@ extension Signal.Event {
/// `nil` literal would be materialized as `Optional<Value>.none` instead of `Value`,
/// thus changing the semantic.
internal static func combinePrevious(initial: Value?) -> Transformation<(Value, Value), Error> {
return { action in
return { action, _ in
var previous = initial

return { event in
Expand All @@ -602,7 +602,7 @@ extension Signal.Event {
}

internal static func skipRepeats(_ isEquivalent: @escaping (Value, Value) -> Bool) -> Transformation<Value, Error> {
return { action in
return { action, _ in
var previous: Value?

return { event in
Expand All @@ -621,7 +621,7 @@ extension Signal.Event {
}

internal static func uniqueValues<Identity: Hashable>(_ transform: @escaping (Value) -> Identity) -> Transformation<Value, Error> {
return { action in
return { action, _ in
var seenValues: Set<Identity> = []

return { event in
Expand All @@ -641,7 +641,7 @@ extension Signal.Event {
}

internal static func scan<U>(into initialResult: U, _ nextPartialResult: @escaping (inout U, Value) -> Void) -> Transformation<U, Error> {
return { action in
return { action, _ in
var accumulator = initialResult

return { event in
Expand All @@ -658,7 +658,7 @@ extension Signal.Event {
}

internal static func reduce<U>(into initialResult: U, _ nextPartialResult: @escaping (inout U, Value) -> Void) -> Transformation<U, Error> {
return { action in
return { action, _ in
var accumulator = initialResult

return { event in
Expand All @@ -682,10 +682,18 @@ extension Signal.Event {
}

internal static func observe(on scheduler: Scheduler) -> Transformation<Value, Error> {
return { action in
return { action, lifetime in
lifetime.observeEnded {
scheduler.schedule {
action(.interrupted)
}
}

return { event in
scheduler.schedule {
action(event)
if !lifetime.hasEnded {
action(event)
}
}
}
}
Expand All @@ -694,7 +702,13 @@ extension Signal.Event {
internal static func delay(_ interval: TimeInterval, on scheduler: DateScheduler) -> Transformation<Value, Error> {
precondition(interval >= 0)

return { action in
return { action, lifetime in
lifetime.observeEnded {
scheduler.schedule {
action(.interrupted)
}
}

return { event in
switch event {
case .failed, .interrupted:
Expand All @@ -705,7 +719,9 @@ extension Signal.Event {
case .value, .completed:
let date = scheduler.currentDate.addingTimeInterval(interval)
scheduler.schedule(after: date) {
action(event)
if !lifetime.hasEnded {
action(event)
}
}
}
}
Expand All @@ -715,10 +731,15 @@ extension Signal.Event {
internal static func throttle(_ interval: TimeInterval, on scheduler: DateScheduler) -> Transformation<Value, Error> {
precondition(interval >= 0)

return { action in
return { action, lifetime in
let state: Atomic<ThrottleState<Value>> = Atomic(ThrottleState())
let schedulerDisposable = SerialDisposable()

lifetime.observeEnded {
schedulerDisposable.dispose()
scheduler.schedule { action(.interrupted) }
}

return { event in
guard let value = event.value else {
schedulerDisposable.inner = scheduler.schedule {
Expand Down Expand Up @@ -769,9 +790,14 @@ extension Signal.Event {
internal static func debounce(_ interval: TimeInterval, on scheduler: DateScheduler) -> Transformation<Value, Error> {
precondition(interval >= 0)

return { action in
return { action, lifetime in
let d = SerialDisposable()

lifetime.observeEnded {
d.dispose()
scheduler.schedule { action(.interrupted) }
}

return { event in
switch event {
case let .value(value):
Expand All @@ -797,7 +823,7 @@ private struct ThrottleState<Value> {

extension Signal.Event where Error == NoError {
internal static func promoteError<F>(_: F.Type) -> Transformation<Value, F> {
return { action in
return { action, _ in
return { event in
switch event {
case let .value(value):
Expand All @@ -816,7 +842,7 @@ extension Signal.Event where Error == NoError {

extension Signal.Event where Value == Never {
internal static func promoteValue<U>(_: U.Type) -> Transformation<U, Error> {
return { action in
return { action, _ in
return { event in
switch event {
case .value:
Expand Down
56 changes: 8 additions & 48 deletions Sources/Observer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,55 +13,9 @@ extension Signal {
public typealias Action = (Event) -> Void
private let _send: Action

/// An action that will be performed upon arrival of the event.
@available(*, deprecated: 2.0, renamed:"send(_:)")
public var action: Action {
guard !interruptsOnDeinit && wrapped == nil else {
return { self._send($0) }
}
return _send
}

/// Whether the observer should send an `interrupted` event as it deinitializes.
private let interruptsOnDeinit: Bool

/// The target observer of `self`.
private let wrapped: AnyObject?

/// An initializer that transforms the action of the given observer with the
/// given transform.
///
/// If the given observer would perform side effect on deinitialization, the
/// created observer would retain it.
///
/// - parameters:
/// - observer: The observer to transform.
/// - transform: The transform.
/// - disposable: The disposable to be disposed of when the `TransformerCore`
/// yields any terminal event. If `observer` is a `Signal` input
/// observer, this can be omitted.
internal init<U, E>(
_ observer: Signal<U, E>.Observer,
_ transform: @escaping Event.Transformation<U, E>,
_ disposable: Disposable? = nil
) {
var hasDeliveredTerminalEvent = false

self._send = transform { event in
if !hasDeliveredTerminalEvent {
observer._send(event)

if event.isTerminating {
hasDeliveredTerminalEvent = true
disposable?.dispose()
}
}
}

self.wrapped = observer.interruptsOnDeinit ? observer : nil
self.interruptsOnDeinit = false
}

/// An initializer that accepts a closure accepting an event for the
/// observer.
///
Expand All @@ -71,7 +25,6 @@ extension Signal {
/// event as it deinitializes. `false` otherwise.
internal init(action: @escaping Action, interruptsOnDeinit: Bool) {
self._send = action
self.wrapped = nil
self.interruptsOnDeinit = interruptsOnDeinit
}

Expand All @@ -82,7 +35,6 @@ extension Signal {
/// - action: A closure to lift over received event.
public init(_ action: @escaping Action) {
self._send = action
self.wrapped = nil
self.interruptsOnDeinit = false
}

Expand Down Expand Up @@ -171,3 +123,11 @@ extension Signal {
}
}
}

/// FIXME: Cannot be placed in `Deprecations+Removal.swift` if compiling with
/// Xcode 9.2.
extension Signal.Observer {
/// An action that will be performed upon arrival of the event.
@available(*, unavailable, renamed:"send(_:)")
public var action: Action { fatalError() }
}
8 changes: 6 additions & 2 deletions Sources/Signal.swift
Original file line number Diff line number Diff line change
Expand Up @@ -538,8 +538,12 @@ extension Signal {
///
/// - returns: A signal that forwards events yielded by the action.
internal func flatMapEvent<U, E>(_ transform: @escaping Event.Transformation<U, E>) -> Signal<U, E> {
return Signal<U, E> { observer, lifetime in
lifetime += self.observe(Signal.Observer(observer, transform))
return Signal<U, E> { output, lifetime in
// Create an input sink whose events would go through the given
// event transformation, and have the resulting events propagated
// to the resulting `Signal`.
let input = transform(output.send, lifetime)
lifetime += self.observe(input)
}
}

Expand Down
Loading