Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a recurring collect function and associated unit tests #619

Merged
merged 14 commits into from
Apr 16, 2018

Conversation

Qata
Copy link
Contributor

@Qata Qata commented Mar 4, 2018

Checklist

  • Updated CHANGELOG.md.

This PR introduces a new function that collects values from self and delivers them to the subscriber every interval.
Solves issue #614.

Copy link
Member

@NachoSoto NachoSoto left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

///
/// - note: If the input signal terminates, the returned signal will
/// terminate immediately without forwarding the values
/// currently being accumulated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the spirit of #605, maybe there should be an option to forward the values first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After looking into this I can't find a way to make it work consistently, given:
When you complete the Signal when you'd expect is for it to still delay up to the supplied interval, and only then deliver the values followed immediately by a completed event.
In the event of interrupted or failed, you'd need to forward those events immediately, so the values would also need to be delivered immediately.
This would probably create tricky behaviour for a lot of people consuming this function, since you'd expect values to be delivered exactly interval apart (or a multiple of interval if you're ignoring when empty).

So should it only not discard in the case of .completed?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, values should only be forwarded in the case of .completed.

///
/// - returns: A signal that sends all values that are sent from `self` at
/// `interval` seconds apart.
public func chunk(_ interval: DispatchTimeInterval, on scheduler: DateScheduler, ignoreEmptyChunks: Bool = false) -> Signal<[Value], Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very hesitant to introduce complete new operator names. I think our grouping of operators by base name has been very successful and would like for us to continue it.

While this is similar to debounce/throttle, I'm not sure it's really throttling them. I don't think a throttle is always regular.

I think this could fit under collect though.

func collect(every interval: DispatchTimeInterval, on scheduler: DataScheduler) -> Signal<[Value], Error> {

collect serves as the base name for operators that go from A to [A].

Thoughts @ReactiveCocoa/reactiveswift?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yep I forgot to say that bikeshedding the name is very welcome, I have no attachments to the one I chose.

@Qata Qata changed the title Add the chunk function and associated unit tests Add a recurring collect function and associated unit tests Mar 9, 2018
CHANGELOG.md Outdated
@@ -1,6 +1,7 @@
# master
*Please add new entries at the top.*

1. New method `chunk(_:on:ignoreEmptyChunks:)` which delivers all values that occurred during a time chunk (#619, kudos to @Qata)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be updated.

///
/// - note: If the input signal terminates, the returned signal will
/// terminate immediately without forwarding the values
/// currently being accumulated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, values should only be forwarded in the case of .completed.

///
/// - returns: A signal that sends all values that are sent from `self` at
/// `interval` seconds apart.
public func collect(_ interval: DispatchTimeInterval, on scheduler: DateScheduler, ignoreWhenEmpty: Bool = false) -> Signal<[Value], Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We've standardized on skip for values that aren't sent. We should use that here.

///
/// - returns: A signal that sends all values that are sent from `self` at
/// `interval` seconds apart.
public func collect(_ interval: DispatchTimeInterval, on scheduler: DateScheduler, ignoreWhenEmpty: Bool = false) -> Signal<[Value], Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should name the interval parameter. Maybe every:?

}
if let values = currentValues {
action(.value(values))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be better written like so:

let currentValues = values.swap([])
if !currentValues.isEmpty {
    action(.value(currentValues))
}

if let values = currentValues {
action(.value(values))
}
})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use trailing closure syntax.

@mdiep mdiep requested a review from andersio March 20, 2018 14:19
@Qata
Copy link
Contributor Author

Qata commented Mar 22, 2018

@mdiep Issues addressed 👍

@mdiep
Copy link
Contributor

mdiep commented Mar 28, 2018

I'll try to get to this in the next day or two

case .completed:
let currentValues = values.swap([])
if !discardsWhenCompleted, !(currentValues.isEmpty && skipEmpty) {
d.inner = scheduler.schedule(after: lastDelivered.addingTimeInterval(interval)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This schedule call makes me a little uneasy. It's difficult to reason about how this will interact with the other scheduling call. (e.g. it's possible that the above schedule could run in-between this values.swap and this scheduler.schedule).

I think we should solve this by removing this schedule and sending the complete through the top schedule. This would be best done by adding a CollectEveryState struct and putting that inside an Atomic instead of a Atomic<[Value]>.

Something like this:

state CollectEveryState<Value> {
    var skipEmpty: Bool
    var values: [Value] = []
    var isCompleted: Bool

    var hasValues: Bool {
       return !values.isEmpty || !skipEmpty
    }

    func collect() -> [Value]? {
        if !hasValues {
            return nil
        }
        defer { values.removeAll() }
        return values
    }
}

That should clean up the implementation a bit too, which is nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mdiep Done 👍

action(.value(currentValues))
}
if isCompleted {
action(.completed)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mdiep Should I call d.dispose() manually here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so. Otherwise the timer would continue firing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

d is disposed already as part of the lifetime observer, so repeating it here is not necessary.

action(.value(currentValues))
}
if isCompleted {
action(.completed)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so. Otherwise the timer would continue firing.


internal static func collect(every interval: DispatchTimeInterval, on scheduler: DateScheduler, skipEmpty: Bool, discardsWhenCompleted: Bool) -> Transformation<[Value], Error> {
return { action, lifetime in
let values = Atomic<CollectEveryState<Value>>(.init(skipEmpty: skipEmpty))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also rename values to state since we have to add the dispose call below?

@mdiep
Copy link
Contributor

mdiep commented Apr 4, 2018

I think this cleaned up rather nicely. 👏 After the minor fixes noted above, we should be good to go.

@@ -861,6 +861,63 @@ extension Signal.Event {
}
}
}

internal static func collect(every interval: DispatchTimeInterval, on scheduler: DateScheduler, skipEmpty: Bool, discardsWhenCompleted: Bool) -> Transformation<[Value], Error> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, let's make this discardWhenCompleted (no s) since we're doing that in #605 as well.

@mdiep
Copy link
Contributor

mdiep commented Apr 6, 2018

Unfortunately, the linux build is failing on CI and I don't see why. 😐

@mdiep
Copy link
Contributor

mdiep commented Apr 6, 2018

This will need a merge now that #605 has been merged.

@Qata
Copy link
Contributor Author

Qata commented Apr 6, 2018

@andersio Could you have a look when you have time

Copy link
Member

@andersio andersio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM otherwise

action(.value(currentValues))
}
if isCompleted {
action(.completed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

d is disposed already as part of the lifetime observer, so repeating it here is not necessary.

@andersio andersio added this to the 4.0 milestone Apr 15, 2018
@Qata
Copy link
Contributor Author

Qata commented Apr 16, 2018

@andersio Done 👍

@mdiep mdiep merged commit 34a7750 into ReactiveCocoa:master Apr 16, 2018
@mdiep
Copy link
Contributor

mdiep commented Apr 16, 2018

Thanks @Qata!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants