Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add throttle(while:on:) #58

Merged
merged 8 commits into from
Nov 29, 2016
Merged
105 changes: 105 additions & 0 deletions Sources/Signal.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1552,6 +1552,96 @@ extension SignalProtocol {
}
}

/// Conditionally throttles values sent on the receiver whenever
/// `shouldThrottle` is true, forwarding values on the given scheduler.
///
/// - note: While `shouldThrottle` remains false, values are forwarded on the
/// given scheduler. If multiple values are received while
/// `shouldThrottle` is true, the latest value is the one that will
/// be passed on.
///
/// - note: If the input signal terminates while a value is being throttled,
/// that value will be discarded and the returned signal will
/// terminate immediately.
///
/// - note: If `shouldThrottle` completes before the receiver, and its last
/// value is `true`, the returned signal will remain in the throttled
/// state, emitting no further values until it terminates.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is 4 notes excessive? 😅

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps just two notes (shouldThrottle + termination behavior), or no notes at all?

///
/// - parameters:
/// - shouldThrottle: A boolean property that controls whether values
/// should be throttled.
/// - scheduler: A scheduler to deliver events on.
///
/// - returns: A signal that sends values only while `shouldThrottle` is false.
public func throttle<P: PropertyProtocol>(while shouldThrottle: P, on scheduler: SchedulerProtocol) -> Signal<Value, Error>
where P.Value == Bool
{
return Signal { observer in
let initial: ThrottleWhileState<Value> = .resumed
let state = Atomic(initial)
let schedulerDisposable = SerialDisposable()

let disposable = CompositeDisposable()
disposable += schedulerDisposable

disposable += shouldThrottle.producer
.skipRepeats()
.startWithValues { shouldThrottle in
let valueToSend = state.modify { state -> Value? in
guard !state.isTerminated else { return nil }

if shouldThrottle {
state = .throttled(nil)
} else {
defer { state = .resumed }

if case let .throttled(value?) = state {
return value
}
}

return nil
}
Copy link
Member

@ikesyo ikesyo Oct 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The returned value from an action of Atomic.modify is not a new value for the Atomic instance since ReactiveCocoa/ReactiveCocoa#2984. So this should be written as follows:

let valueToSend = state.modify { state -> Value? in
    guard !state.isTerminated else { return nil }

    if shouldThrottle {
        state = .throttled(nil)
        return nil
    } else {
        state = .resumed
        if case let .throttled(value) = state {
            return value
        } else {
            return nil
        }
    }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call! I've pushed 58e5bed. I ended up having a single return nil after the conditional block, but if everyone prefers this style (a return at every branch) I can change it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I ended up having a single return nil after the conditional block

👍


if let value = valueToSend {
schedulerDisposable.innerDisposable = scheduler.schedule {
observer.send(value: value)
}
}
}

disposable += self.observe { event in
let eventToSend = state.modify { state -> Event<Value, Error>? in
switch event {
case let .value(value):
switch state {
case .throttled:
state = .throttled(value)
return nil
case .resumed:
return event
case .terminated:
return nil
}

case .completed, .interrupted, .failed:
state = .terminated
return event
}
}
Copy link
Member

@ikesyo ikesyo Oct 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as https://github.com/ReactiveCocoa/ReactiveSwift/pull/58/files#r82397528:

let eventToSend = state.modify { state -> Event<Value, Error>? in
    switch event {
    case let .value(value):
        switch state {
        case .throttled:
            state = .throttled(value)
            return nil
        case .resumed:
            return event
        case .terminated:
            return nil
        }

    case .completed, .interrupted, .failed:
        state = .terminated
        return event
    }
}


if let event = eventToSend {
schedulerDisposable.innerDisposable = scheduler.schedule {
observer.action(event)
}
}
}

return disposable
}
}

/// Debounce values sent by the receiver, such that at least `interval`
/// seconds pass after the receiver has last sent a value, then forward the
/// latest value on the given scheduler.
Expand Down Expand Up @@ -1638,6 +1728,21 @@ private struct ThrottleState<Value> {
var pendingValue: Value? = nil
}

private enum ThrottleWhileState<Value> {
case resumed
case throttled(Value?)
case terminated

var isTerminated: Bool {
switch self {
case .terminated:
return true
case .resumed, .throttled:
return false
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could just be return self == .terminated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That requires explicit Equatable conformance of ThrottleWhileState since case throttled has an associated value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stylistically, this could also become a switch statement if you prefer, which would add exhaustively checking.

Copy link
Member

@NachoSoto NachoSoto Oct 7, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which would add exhaustively checking.

And fail to compile if one adds a new case, which is a good thing

}

extension SignalProtocol {
/// Combines the values of all the given signals, in the manner described by
/// `combineLatestWith`.
Expand Down
32 changes: 32 additions & 0 deletions Sources/SignalProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1040,6 +1040,38 @@ extension SignalProducerProtocol {
return lift { $0.throttle(interval, on: scheduler) }
}

/// Conditionally throttles values sent on the receiver whenever
/// `shouldThrottle` is true, forwarding values on the given scheduler.
///
/// - note: While `shouldThrottle` remains false, values are forwarded on the
/// given scheduler. If multiple values are received while
/// `shouldThrottle` is true, the latest value is the one that will
/// be passed on.
///
/// - note: If the input signal terminates while a value is being throttled,
/// that value will be discarded and the returned signal will
/// terminate immediately.
///
/// - note: If `shouldThrottle` completes before the receiver, and its last
/// value is `true`, the returned signal will remain in the throttled
/// state, emitting no further values until it terminates.
///
/// - parameters:
/// - shouldThrottle: A boolean property that controls whether values
/// should be throttled.
/// - scheduler: A scheduler to deliver events on.
///
/// - returns: A producer that sends values only while `shouldThrottle` is false.
public func throttle<P: PropertyProtocol>(while shouldThrottle: P, on scheduler: SchedulerProtocol) -> SignalProducer<Value, Error>
where P.Value == Bool
{
// Using `Property.init(_:)` avoids capturing a strong reference
// to `shouldThrottle`, so that we don't extend its lifetime.
let shouldThrottle = Property(shouldThrottle)

return lift { $0.throttle(while: shouldThrottle, on: scheduler) }
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This keeps a strong reference to shouldThrottle for SignalProducers. @andersio does this need to change as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first thing I tried was to wrap shouldThrottle in an existential:

let shouldThrottle = Property(shouldThrottle)
return lift { $0.throttle(while: shouldThrottle, on: scheduler) }

But that also increases the lifetime of the underlying property. Is there any way to capture a reference to this property that won't extend its lifetime? It looks like composing properties fundamentally alters the lifetime of the source property, extending it to be the union of all transitive property lifetimes. Is that intentional?

Copy link
Member

@andersio andersio Nov 24, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is intentional IIRC. All composed properties derived from a property is considered just a reduced/transformed view of that property, so that the behaviour is in line with existentials which retain the wrapped property (like the one in your example).

That's said it might be worth discussing if the source capturing should be dropped. It could be an oversight (especially with the arguments I made in #58 (comment)... ehm), since in ReactiveCocoa/ReactiveCocoa#2922 @mdiep and I focused just on how signal and producer should behave.

Edit: Edited many times.

Edit 2: It seems the capturing is originated from ReactiveCocoa/ReactiveCocoa#2788. The argument to capture was different though, and it was actually invalidated by ReactiveCocoa/ReactiveCocoa#2922...

Edit 3: Opened a PR. #117

Copy link
Member

@andersio andersio Nov 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#117 is merged. Now this is the last stop before RC1 (hopefully). 🎉

}

/// Debounce values sent by the receiver, such that at least `interval`
/// seconds pass after the receiver has last sent a value, then
/// forward the latest value on the given scheduler.
Expand Down
30 changes: 30 additions & 0 deletions Tests/ReactiveSwiftTests/SignalProducerSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -851,6 +851,36 @@ class SignalProducerSpec: QuickSpec {
}
}

describe("throttle while") {
var scheduler: ImmediateScheduler!
var shouldThrottle: MutableProperty<Bool>!
var observer: Signal<Int, NoError>.Observer!
var producer: SignalProducer<Int, NoError>!

beforeEach {
scheduler = ImmediateScheduler()
shouldThrottle = MutableProperty(false)

let (baseSignal, baseObserver) = Signal<Int, NoError>.pipe()
observer = baseObserver

producer = SignalProducer(signal: baseSignal)
.throttle(while: shouldThrottle, on: scheduler)

expect(producer).notTo(beNil())
}

it("doesn't extend the lifetime of the throttle property") {
var completed = false
shouldThrottle.lifetime.ended.observeCompleted { completed = true }

observer.send(value: 1)
shouldThrottle = nil

expect(completed) == true
}
}

describe("on") {
it("should attach event handlers to each started signal") {
let (baseProducer, observer) = SignalProducer<Int, TestError>.pipe()
Expand Down
92 changes: 92 additions & 0 deletions Tests/ReactiveSwiftTests/SignalSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1445,6 +1445,98 @@ class SignalSpec: QuickSpec {
}
}

describe("throttle while") {
var scheduler: ImmediateScheduler!
var shouldThrottle: MutableProperty<Bool>!
var observer: Signal<Int, NoError>.Observer!
var signal: Signal<Int, NoError>!

beforeEach {
scheduler = ImmediateScheduler()
shouldThrottle = MutableProperty(false)

let (baseSignal, baseObserver) = Signal<Int, NoError>.pipe()
observer = baseObserver

signal = baseSignal.throttle(while: shouldThrottle, on: scheduler)
expect(signal).notTo(beNil())
}

it("passes through unthrottled values") {
var values: [Int] = []
signal.observeValues { values.append($0) }

observer.send(value: 1)
observer.send(value: 2)
observer.send(value: 3)

expect(values) == [1, 2, 3]
}

it("emits the latest throttled value when resumed") {
var values: [Int] = []
signal.observeValues { values.append($0) }

shouldThrottle.value = true
observer.send(value: 1)
observer.send(value: 2)
shouldThrottle.value = false

expect(values) == [2]
}

it("continues sending values after being resumed") {
var values: [Int] = []
signal.observeValues { values.append($0) }

shouldThrottle.value = true
observer.send(value: 1)
shouldThrottle.value = false
observer.send(value: 2)
observer.send(value: 3)

expect(values) == [1, 2, 3]
}

it("stays throttled if the property completes while throttled") {
var values: [Int] = []
signal.observeValues { values.append($0) }

shouldThrottle.value = false
observer.send(value: 1)
shouldThrottle.value = true
observer.send(value: 2)
shouldThrottle = nil
observer.send(value: 3)

expect(values) == [1]
}

it("stays resumed if the property completes while resumed") {
var values: [Int] = []
signal.observeValues { values.append($0) }

shouldThrottle.value = true
observer.send(value: 1)
shouldThrottle.value = false
observer.send(value: 2)
shouldThrottle = nil
observer.send(value: 3)

expect(values) == [1, 2, 3]
}

it("doesn't extend the lifetime of the throttle property") {
var completed = false
shouldThrottle.lifetime.ended.observeCompleted { completed = true }

observer.send(value: 1)
shouldThrottle = nil

expect(completed) == true
}
}

describe("debounce") {
var scheduler: TestScheduler!
var observer: Signal<Int, NoError>.Observer!
Expand Down