diff --git a/Sources/Signal.swift b/Sources/Signal.swift index 3cacef96c..90eea8862 100644 --- a/Sources/Signal.swift +++ b/Sources/Signal.swift @@ -1552,6 +1552,96 @@ extension SignalProtocol { } } + /// Conditionally throttles values sent on the receiver whenever + /// `shouldThrottle` is true, forwarding values on the given scheduler. + /// + /// - note: While `shouldThrottle` remains false, values are forwarded on the + /// given scheduler. If multiple values are received while + /// `shouldThrottle` is true, the latest value is the one that will + /// be passed on. + /// + /// - note: If the input signal terminates while a value is being throttled, + /// that value will be discarded and the returned signal will + /// terminate immediately. + /// + /// - note: If `shouldThrottle` completes before the receiver, and its last + /// value is `true`, the returned signal will remain in the throttled + /// state, emitting no further values until it terminates. + /// + /// - parameters: + /// - shouldThrottle: A boolean property that controls whether values + /// should be throttled. + /// - scheduler: A scheduler to deliver events on. + /// + /// - returns: A signal that sends values only while `shouldThrottle` is false. + public func throttle(while shouldThrottle: P, on scheduler: SchedulerProtocol) -> Signal + where P.Value == Bool + { + return Signal { observer in + let initial: ThrottleWhileState = .resumed + let state = Atomic(initial) + let schedulerDisposable = SerialDisposable() + + let disposable = CompositeDisposable() + disposable += schedulerDisposable + + disposable += shouldThrottle.producer + .skipRepeats() + .startWithValues { shouldThrottle in + let valueToSend = state.modify { state -> Value? in + guard !state.isTerminated else { return nil } + + if shouldThrottle { + state = .throttled(nil) + } else { + defer { state = .resumed } + + if case let .throttled(value?) = state { + return value + } + } + + return nil + } + + if let value = valueToSend { + schedulerDisposable.innerDisposable = scheduler.schedule { + observer.send(value: value) + } + } + } + + disposable += self.observe { event in + let eventToSend = state.modify { state -> Event? in + switch event { + case let .value(value): + switch state { + case .throttled: + state = .throttled(value) + return nil + case .resumed: + return event + case .terminated: + return nil + } + + case .completed, .interrupted, .failed: + state = .terminated + return event + } + } + + if let event = eventToSend { + schedulerDisposable.innerDisposable = scheduler.schedule { + observer.action(event) + } + } + } + + return disposable + } + } + /// Debounce values sent by the receiver, such that at least `interval` /// seconds pass after the receiver has last sent a value, then forward the /// latest value on the given scheduler. @@ -1638,6 +1728,21 @@ private struct ThrottleState { var pendingValue: Value? = nil } +private enum ThrottleWhileState { + case resumed + case throttled(Value?) + case terminated + + var isTerminated: Bool { + switch self { + case .terminated: + return true + case .resumed, .throttled: + return false + } + } +} + extension SignalProtocol { /// Combines the values of all the given signals, in the manner described by /// `combineLatestWith`. diff --git a/Sources/SignalProducer.swift b/Sources/SignalProducer.swift index 5decb11c6..4aa0dfc4c 100644 --- a/Sources/SignalProducer.swift +++ b/Sources/SignalProducer.swift @@ -1040,6 +1040,38 @@ extension SignalProducerProtocol { return lift { $0.throttle(interval, on: scheduler) } } + /// Conditionally throttles values sent on the receiver whenever + /// `shouldThrottle` is true, forwarding values on the given scheduler. + /// + /// - note: While `shouldThrottle` remains false, values are forwarded on the + /// given scheduler. If multiple values are received while + /// `shouldThrottle` is true, the latest value is the one that will + /// be passed on. + /// + /// - note: If the input signal terminates while a value is being throttled, + /// that value will be discarded and the returned signal will + /// terminate immediately. + /// + /// - note: If `shouldThrottle` completes before the receiver, and its last + /// value is `true`, the returned signal will remain in the throttled + /// state, emitting no further values until it terminates. + /// + /// - parameters: + /// - shouldThrottle: A boolean property that controls whether values + /// should be throttled. + /// - scheduler: A scheduler to deliver events on. + /// + /// - returns: A producer that sends values only while `shouldThrottle` is false. + public func throttle(while shouldThrottle: P, on scheduler: SchedulerProtocol) -> SignalProducer + where P.Value == Bool + { + // Using `Property.init(_:)` avoids capturing a strong reference + // to `shouldThrottle`, so that we don't extend its lifetime. + let shouldThrottle = Property(shouldThrottle) + + return lift { $0.throttle(while: shouldThrottle, on: scheduler) } + } + /// Debounce values sent by the receiver, such that at least `interval` /// seconds pass after the receiver has last sent a value, then /// forward the latest value on the given scheduler. diff --git a/Tests/ReactiveSwiftTests/SignalProducerSpec.swift b/Tests/ReactiveSwiftTests/SignalProducerSpec.swift index ba29a2c4d..19c86a0a5 100644 --- a/Tests/ReactiveSwiftTests/SignalProducerSpec.swift +++ b/Tests/ReactiveSwiftTests/SignalProducerSpec.swift @@ -851,6 +851,36 @@ class SignalProducerSpec: QuickSpec { } } + describe("throttle while") { + var scheduler: ImmediateScheduler! + var shouldThrottle: MutableProperty! + var observer: Signal.Observer! + var producer: SignalProducer! + + beforeEach { + scheduler = ImmediateScheduler() + shouldThrottle = MutableProperty(false) + + let (baseSignal, baseObserver) = Signal.pipe() + observer = baseObserver + + producer = SignalProducer(signal: baseSignal) + .throttle(while: shouldThrottle, on: scheduler) + + expect(producer).notTo(beNil()) + } + + it("doesn't extend the lifetime of the throttle property") { + var completed = false + shouldThrottle.lifetime.ended.observeCompleted { completed = true } + + observer.send(value: 1) + shouldThrottle = nil + + expect(completed) == true + } + } + describe("on") { it("should attach event handlers to each started signal") { let (baseProducer, observer) = SignalProducer.pipe() diff --git a/Tests/ReactiveSwiftTests/SignalSpec.swift b/Tests/ReactiveSwiftTests/SignalSpec.swift index 2a4223566..87af64e02 100755 --- a/Tests/ReactiveSwiftTests/SignalSpec.swift +++ b/Tests/ReactiveSwiftTests/SignalSpec.swift @@ -1445,6 +1445,98 @@ class SignalSpec: QuickSpec { } } + describe("throttle while") { + var scheduler: ImmediateScheduler! + var shouldThrottle: MutableProperty! + var observer: Signal.Observer! + var signal: Signal! + + beforeEach { + scheduler = ImmediateScheduler() + shouldThrottle = MutableProperty(false) + + let (baseSignal, baseObserver) = Signal.pipe() + observer = baseObserver + + signal = baseSignal.throttle(while: shouldThrottle, on: scheduler) + expect(signal).notTo(beNil()) + } + + it("passes through unthrottled values") { + var values: [Int] = [] + signal.observeValues { values.append($0) } + + observer.send(value: 1) + observer.send(value: 2) + observer.send(value: 3) + + expect(values) == [1, 2, 3] + } + + it("emits the latest throttled value when resumed") { + var values: [Int] = [] + signal.observeValues { values.append($0) } + + shouldThrottle.value = true + observer.send(value: 1) + observer.send(value: 2) + shouldThrottle.value = false + + expect(values) == [2] + } + + it("continues sending values after being resumed") { + var values: [Int] = [] + signal.observeValues { values.append($0) } + + shouldThrottle.value = true + observer.send(value: 1) + shouldThrottle.value = false + observer.send(value: 2) + observer.send(value: 3) + + expect(values) == [1, 2, 3] + } + + it("stays throttled if the property completes while throttled") { + var values: [Int] = [] + signal.observeValues { values.append($0) } + + shouldThrottle.value = false + observer.send(value: 1) + shouldThrottle.value = true + observer.send(value: 2) + shouldThrottle = nil + observer.send(value: 3) + + expect(values) == [1] + } + + it("stays resumed if the property completes while resumed") { + var values: [Int] = [] + signal.observeValues { values.append($0) } + + shouldThrottle.value = true + observer.send(value: 1) + shouldThrottle.value = false + observer.send(value: 2) + shouldThrottle = nil + observer.send(value: 3) + + expect(values) == [1, 2, 3] + } + + it("doesn't extend the lifetime of the throttle property") { + var completed = false + shouldThrottle.lifetime.ended.observeCompleted { completed = true } + + observer.send(value: 1) + shouldThrottle = nil + + expect(completed) == true + } + } + describe("debounce") { var scheduler: TestScheduler! var observer: Signal.Observer!