Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a recurring collect function and associated unit tests #619

Merged
merged 14 commits into from
Apr 16, 2018
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# master
*Please add new entries at the top.*

1. New method `chunk(_:on:ignoreEmptyChunks:)` which delivers all values that occurred during a time chunk (#619, kudos to @Qata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be updated.

1. Result now interoperates with SignalProducer n-ary operators as a constant producer (#606, kudos to @Qata)
1. New property operator: `filter` (#586, kudos to @iv-mexx)
1. New operator `merge(with:)` (#600, kudos to @ra1028)
Expand Down
36 changes: 36 additions & 0 deletions Sources/Event.swift
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,42 @@ extension Signal.Event {
}
}
}

internal static func collect(_ interval: DispatchTimeInterval, on scheduler: DateScheduler, ignoreWhenEmpty: Bool) -> Transformation<[Value], Error> {
return { action, lifetime in
let values = Atomic<[Value]>([])
let d = SerialDisposable()

d.inner = scheduler.schedule(after: scheduler.currentDate.addingTimeInterval(interval), interval: interval, leeway: interval * 0.1, action: {
let currentValues: [Value]? = values.modify { values in
guard !(values.isEmpty && ignoreWhenEmpty) else { return nil }
defer { values = [] }
return values
}
if let values = currentValues {
action(.value(values))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be better written like so:

let currentValues = values.swap([])
if !currentValues.isEmpty {
    action(.value(currentValues))
}

})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use trailing closure syntax.


lifetime.observeEnded {
d.dispose()
scheduler.schedule { action(.interrupted) }
}

return { event in
switch event {
case let .value(value):
values.modify { $0.append(value) }
case let .failed(error):
d.inner = scheduler.schedule { action(.failed(error)) }
case .completed:
d.inner = scheduler.schedule { action(.completed) }
case .interrupted:
d.inner = scheduler.schedule { action(.interrupted) }
}
}
}
}
}

private struct ThrottleState<Value> {
Expand Down
19 changes: 19 additions & 0 deletions Sources/Signal.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1506,6 +1506,25 @@ extension Signal {
public func debounce(_ interval: TimeInterval, on scheduler: DateScheduler) -> Signal<Value, Error> {
return flatMapEvent(Signal.Event.debounce(interval, on: scheduler))
}

/// Forward the latest values on `scheduler` every `interval`.
///
/// - seealso: `throttle`
/// - seealso: `debounce`
///
/// - note: If the input signal terminates, the returned signal will
/// terminate immediately without forwarding the values
/// currently being accumulated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the spirit of #605, maybe there should be an option to forward the values first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking into this I can't find a way to make it work consistently, given:
When you complete the Signal when you'd expect is for it to still delay up to the supplied interval, and only then deliver the values followed immediately by a completed event.
In the event of interrupted or failed, you'd need to forward those events immediately, so the values would also need to be delivered immediately.
This would probably create tricky behaviour for a lot of people consuming this function, since you'd expect values to be delivered exactly interval apart (or a multiple of interval if you're ignoring when empty).

So should it only not discard in the case of .completed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, values should only be forwarded in the case of .completed.

///
/// - parameters:
/// - interval: A repetition interval.
/// - scheduler: A scheduler to send values on.
///
/// - returns: A signal that sends all values that are sent from `self` at
/// `interval` seconds apart.
public func collect(_ interval: DispatchTimeInterval, on scheduler: DateScheduler, ignoreWhenEmpty: Bool = false) -> Signal<[Value], Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've standardized on skip for values that aren't sent. We should use that here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should name the interval parameter. Maybe every:?

return flatMapEvent(Signal.Event.collect(interval, on: scheduler, ignoreWhenEmpty: ignoreWhenEmpty))
}
}

extension Signal {
Expand Down
19 changes: 19 additions & 0 deletions Sources/SignalProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1526,6 +1526,25 @@ extension SignalProducer {
public func debounce(_ interval: TimeInterval, on scheduler: DateScheduler) -> SignalProducer<Value, Error> {
return core.flatMapEvent(Signal.Event.debounce(interval, on: scheduler))
}

/// Forward the latest values on `scheduler` every `interval`.
///
/// - seealso: `throttle`
/// - seealso: `debounce`
///
/// - note: If the input signal terminates, the returned signal will
/// terminate immediately without forwarding the values
/// currently being accumulated.
///
/// - parameters:
/// - interval: A repetition interval.
/// - scheduler: A scheduler to send values on.
///
/// - returns: A producer that sends all values that are sent from `self`
/// at `interval` seconds apart.
public func collect(_ interval: DispatchTimeInterval, on scheduler: DateScheduler, ignoreWhenEmpty: Bool = false) -> SignalProducer<[Value], Error> {
return core.flatMapEvent(Signal.Event.collect(interval, on: scheduler, ignoreWhenEmpty: ignoreWhenEmpty))
}

/// Forward events from `self` until `interval`. Then if producer isn't
/// completed yet, fails with `error` on `scheduler`.
Expand Down
184 changes: 184 additions & 0 deletions Tests/ReactiveSwiftTests/SignalSpec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1953,6 +1953,190 @@ class SignalSpec: QuickSpec {
expect(completed) == true
}
}

describe("collect(_:on:ignoreWhenEmpty:) where ignoreWhenEmpty is false") {
var scheduler: TestScheduler!
var observer: Signal<Int, NoError>.Observer!
var signal: Signal<[Int], NoError>!

beforeEach {
scheduler = TestScheduler()

let (baseSignal, baseObserver) = Signal<Int, NoError>.pipe()
observer = baseObserver

signal = baseSignal.collect(.seconds(1), on: scheduler, ignoreWhenEmpty: false)
expect(signal).notTo(beNil())
}

it("should send accumulated values on the given scheduler every interval") {
var values: [[Int]] = []
signal.observeValues { value in
values.append(value)
}

expect(values.count) == 0

observer.send(value: 0)
expect(values.count) == 0

scheduler.advance()
expect(values.count) == 0

observer.send(value: 1)
observer.send(value: 2)
expect(values.count) == 0

scheduler.advance(by: .milliseconds(1500))
expect(values.count) == 1
expect(values[0]) == [ 0, 1, 2 ]

scheduler.advance(by: .seconds(2))
expect(values.count) == 3
expect(values[0]) == [ 0, 1, 2 ]
expect(values[1]) == [ ]
expect(values[2]) == [ ]

observer.send(value: 3)
expect(values.count) == 3

scheduler.advance()
expect(values.count) == 3

observer.send(value: 4)
observer.send(value: 5)
scheduler.advance()
expect(values.count) == 3

scheduler.advance(by: .milliseconds(500))
expect(values.count) == 4
expect(values.first) == [ 0, 1, 2 ]
expect(values.last) == [ 3, 4, 5 ]
}

it("should schedule completion immediately") {
var values: [[Int]] = []
var completed = false

signal.observe { event in
switch event {
case let .value(value):
values.append(value)
case .completed:
completed = true
default:
break
}
}

observer.send(value: 0)
scheduler.advance()
expect(values.count) == 0

observer.send(value: 1)
observer.sendCompleted()
expect(completed) == false

scheduler.advance()
expect(values.count) == 0
expect(completed) == true

scheduler.run()
expect(values.count) == 0
expect(completed) == true
}
}

describe("collect(_:on:ignoreWhenEmpty:) where ignoreWhenEmpty is true") {
var scheduler: TestScheduler!
var observer: Signal<Int, NoError>.Observer!
var signal: Signal<[Int], NoError>!

beforeEach {
scheduler = TestScheduler()

let (baseSignal, baseObserver) = Signal<Int, NoError>.pipe()
observer = baseObserver

signal = baseSignal.collect(.seconds(1), on: scheduler, ignoreWhenEmpty: true)
expect(signal).notTo(beNil())
}

it("should send accumulated values on the given scheduler every interval") {
var values: [[Int]] = []
signal.observeValues { value in
values.append(value)
}

expect(values.count) == 0

observer.send(value: 0)
expect(values.count) == 0

scheduler.advance()
expect(values.count) == 0

observer.send(value: 1)
observer.send(value: 2)
expect(values.count) == 0

scheduler.advance(by: .milliseconds(1500))
expect(values.count) == 1
expect(values[0]) == [ 0, 1, 2 ]

scheduler.advance(by: .seconds(2))
expect(values.count) == 1
expect(values[0]) == [ 0, 1, 2 ]

observer.send(value: 3)
expect(values.count) == 1

scheduler.advance()
expect(values.count) == 1

observer.send(value: 4)
observer.send(value: 5)
scheduler.advance()
expect(values.count) == 1

scheduler.advance(by: .seconds(100))
expect(values.count) == 2
expect(values[0]) == [ 0, 1, 2 ]
expect(values[1]) == [ 3, 4, 5 ]
}

it("should schedule completion immediately") {
var values: [[Int]] = []
var completed = false

signal.observe { event in
switch event {
case let .value(value):
values.append(value)
case .completed:
completed = true
default:
break
}
}

observer.send(value: 0)
scheduler.advance()
expect(values.count) == 0

observer.send(value: 1)
observer.sendCompleted()
expect(completed) == false

scheduler.advance()
expect(values.count) == 0
expect(completed) == true

scheduler.run()
expect(values.count) == 0
expect(completed) == true
}
}

describe("sampleWith") {
var sampledSignal: Signal<(Int, String), NoError>!
Expand Down