Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: output at time #84

Closed
jordanrfrazier opened this issue Mar 7, 2023 · 4 comments · Fixed by #86
Closed

feat: output at time #84

jordanrfrazier opened this issue Mar 7, 2023 · 4 comments · Fixed by #86
Assignees

Comments

@jordanrfrazier
Copy link
Collaborator

No description provided.

@jordanrfrazier jordanrfrazier self-assigned this Mar 7, 2023
@jordanrfrazier
Copy link
Collaborator Author

jordanrfrazier commented Mar 7, 2023

Yeah, so unfortunately the existing final_at_time behaves as I expect. It can't produce the output at the given time, but rather can only filter rows to that time, then tick after all input rows have been processed. This means that while the values are correct for the given time, we're not able to produce the final rows at the given time, but rather at the max input timestamp.

The reason for this is because the finished() tick operation needs to continually produce empty batches up to the upper bound of the incoming batch in order for downstream merge operations to progress. If one side is not progressing, the incoming channel for the other side fills up, and then prevents the sender from producing more messages, thus stopping execution. I tried to be sneaky and change final_tick's behavior to only track existing row time rather than upper bound, which works in most cases, but will fail with the above behavior if filtering occurs in the input batches.

I do not see an easy way around this using the existing finished() operation. Even if we plumbed the final_at_time to the scan operations, and stopped reading at that input time, we'd still have to contend with shift_to, which flushes its output batch, where the shifted_time may be greater than the final_at_time.

@jordanrfrazier
Copy link
Collaborator Author

I believe the solution then is to create a tick operation that can tick at a specific time, then use the following decoration:

result | last() | when(tick_at(__final_at_time__))

@jordanrfrazier
Copy link
Collaborator Author

jordanrfrazier commented Mar 7, 2023

Actually, there's an ugly solution. Assume we do re-add the filter in the table_reader to stop reading after the specified time. I can then use finished() as is, leaving it to output up to the upper bound. The only time that this becomes a problem is when shift_to shifts rows past the at_time, because even if the rows are filtered out, we'll still see the upper_bound of those rows, and increment the final time accordingly.

Okay, ugly solution - final tick operation can store a buffer. If it knows that the buffered batch is the last one, it can use the current time instead of the upper bound. This covers the shift_to case.

This is ugly for several reasons:

  1. This is entirely dependent on the scan table stopping the read at the specified time AND the behavior of flushing the batch from shift_to.
  2. buffer

@jordanrfrazier
Copy link
Collaborator Author

Okay new idea - can I update the final tick operation to just know when it should produce an output? Essentially making it a tick_at operation.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant