Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a recurring collect function and associated unit tests #619

Merged
merged 14 commits into from
Apr 16, 2018
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# master
*Please add new entries at the top.*

1. New method `collect(every:on:skipEmpty:discardWhenCompleted:)` which delivers all values that occurred during a time interval (#619, kudos to @Qata)
1. Result now interoperates with SignalProducer n-ary operators as a constant producer (#606, kudos to @Qata)
1. New property operator: `filter` (#586, kudos to @iv-mexx)
1. New operator `merge(with:)` (#600, kudos to @ra1028)
Expand Down
42 changes: 42 additions & 0 deletions Sources/Event.swift
Original file line number Diff line number Diff line change
Expand Up @@ -861,6 +861,48 @@ extension Signal.Event {
}
}
}

internal static func collect(every interval: DispatchTimeInterval, on scheduler: DateScheduler, skipEmpty: Bool, discardsWhenCompleted: Bool) -> Transformation<[Value], Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, let's make this discardWhenCompleted (no s) since we're doing that in #605 as well.

return { action, lifetime in
let values = Atomic<[Value]>([])
let d = SerialDisposable()
var lastDelivered: Date = scheduler.currentDate

d.inner = scheduler.schedule(after: scheduler.currentDate.addingTimeInterval(interval), interval: interval, leeway: interval * 0.1) {
let currentValues = values.swap([])
if !(currentValues.isEmpty && skipEmpty) {
action(.value(currentValues))
lastDelivered = scheduler.currentDate
}
}

lifetime.observeEnded {
d.dispose()
scheduler.schedule { action(.interrupted) }
}

return { event in
switch event {
case let .value(value):
values.modify { $0.append(value) }
case let .failed(error):
d.inner = scheduler.schedule { action(.failed(error)) }
case .completed:
let currentValues = values.swap([])
if !discardsWhenCompleted, !(currentValues.isEmpty && skipEmpty) {
d.inner = scheduler.schedule(after: lastDelivered.addingTimeInterval(interval)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This schedule call makes me a little uneasy. It's difficult to reason about how this will interact with the other scheduling call. (e.g. it's possible that the above schedule could run in-between this values.swap and this scheduler.schedule).

I think we should solve this by removing this schedule and sending the complete through the top schedule. This would be best done by adding a CollectEveryState struct and putting that inside an Atomic instead of a Atomic<[Value]>.

Something like this:

state CollectEveryState<Value> {
    var skipEmpty: Bool
    var values: [Value] = []
    var isCompleted: Bool

    var hasValues: Bool {
       return !values.isEmpty || !skipEmpty
    }

    func collect() -> [Value]? {
        if !hasValues {
            return nil
        }
        defer { values.removeAll() }
        return values
    }
}

That should clean up the implementation a bit too, which is nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mdiep Done 👍

action(.value(currentValues))
action(.completed)
}
} else {
d.inner = scheduler.schedule { action(.completed) }
}
case .interrupted:
d.inner = scheduler.schedule { action(.interrupted) }
}
}
}
}
}

private struct ThrottleState<Value> {
Expand Down
22 changes: 22 additions & 0 deletions Sources/Signal.swift
Original file line number Diff line number Diff line change
Expand Up @@ -765,6 +765,28 @@ extension Signal {
return flatMapEvent(Signal.Event.collect(shouldEmit))
}

/// Forward the latest values on `scheduler` every `interval`.
///
/// - note: If `self` terminates while values are being accumulated,
/// the behaviour will be determined by `discardsWhenCompleted`.
/// If `true`, the values will be discarded and the returned signal
/// will terminate immediately.
/// If `false`, that values will be delivered at the next interval.
///
/// - parameters:
/// - interval: A repetition interval.
/// - scheduler: A scheduler to send values on.
/// - skipEmpty: Whether empty arrays should be sent if no values were
/// accumulated during the interval.
/// - discardsWhenCompleted: A boolean to indicate if the latest unsent
/// values should be discarded on completion.
///
/// - returns: A signal that sends all values that are sent from `self` at
/// `interval` seconds apart.
public func collect(every interval: DispatchTimeInterval, on scheduler: DateScheduler, skipEmpty: Bool = false, discardsWhenCompleted: Bool = true) -> Signal<[Value], Error> {
return flatMapEvent(Signal.Event.collect(every: interval, on: scheduler, skipEmpty: skipEmpty, discardsWhenCompleted: discardsWhenCompleted))
}

/// Forward all events onto the given scheduler, instead of whichever
/// scheduler they originally arrived upon.
///
Expand Down
22 changes: 22 additions & 0 deletions Sources/SignalProducer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,28 @@ extension SignalProducer {
return core.flatMapEvent(Signal.Event.collect(shouldEmit))
}

/// Forward the latest values on `scheduler` every `interval`.
///
/// - note: If `self` terminates while values are being accumulated,
/// the behaviour will be determined by `discardsWhenCompleted`.
/// If `true`, the values will be discarded and the returned producer
/// will terminate immediately.
/// If `false`, that values will be delivered at the next interval.
///
/// - parameters:
/// - interval: A repetition interval.
/// - scheduler: A scheduler to send values on.
/// - skipEmpty: Whether empty arrays should be sent if no values were
/// accumulated during the interval.
/// - discardsWhenCompleted: A boolean to indicate if the latest unsent
/// values should be discarded on completion.
///
/// - returns: A producer that sends all values that are sent from `self`
/// at `interval` seconds apart.
public func collect(every interval: DispatchTimeInterval, on scheduler: DateScheduler, skipEmpty: Bool = false, discardsWhenCompleted: Bool = true) -> SignalProducer<[Value], Error> {
return core.flatMapEvent(Signal.Event.collect(every: interval, on: scheduler, skipEmpty: skipEmpty, discardsWhenCompleted: discardsWhenCompleted))
}

/// Forward all events onto the given scheduler, instead of whichever
/// scheduler they originally arrived upon.
///
Expand Down
Loading