-
Notifications
You must be signed in to change notification settings - Fork 465
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP Support real time timestamping with timely aggregate. #2647
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the code written makes sense, but I think the PR needs a bit of documentation love, to explain what is going on, and potentially why it is appropriate for the function that houses it. Otherwise, the intent seems totally fine, under the premise that decode_key_values_inner
is correctly interpreted as prepare_upsert_by_key
or something like that.
There are some spot comments about specific ambiguities or potential surprises, and I think they can be made less ambiguous or surprising, potentially just by explaining the intended contract with the function makes.
src/dataflow/decode/mod.rs
Outdated
stream | ||
.aggregate::<_, (Vec<u8>, Option<i64>), _, _, _>( | ||
|_key, val, agg| { | ||
if let Some(new_offset) = val.1 { | ||
if let Some(offset) = agg.1 { | ||
if offset < new_offset { | ||
*agg = val; | ||
} | ||
match key_decoder_state.decode_key(key) { | ||
Ok(key) => { | ||
if payload.is_empty() { | ||
session.give((key, None, *cap.time())); | ||
} else { | ||
value_decoder_state.give_key_value( | ||
key, | ||
payload, | ||
*aux_num, | ||
&mut session, | ||
*cap.time(), | ||
); | ||
} | ||
} else { | ||
*agg = val; | ||
} | ||
} | ||
}, | ||
|key, agg| (key, agg), | ||
|key| key.hashed(), | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This certainly does a thing, but I think we need to be clear about what it does. That should probably be in the form of some comments, that explain either (or both) what this does, and what decode_key_values_inner
does, because I this changes its semantics. It's probably a "good change" for upserts, but if this method is only meant to be used for upserts we should give it an appropriate name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, this first operator seems to drop records that do not have the optional i64
, which isn't what was done before. It would be great to explain the intent of the operator, as I'm not currently sure it does what it needs to do (as likely will be future people who might need to tweak or understand it).
If it shouldn't be discarding, you should just be able to keep and compare by the offset, for which None
is the least element.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The other potentially weird thing that it does is collapse values down by capability rather than logical timestamp. This might be ok for common uses of upserts, but it means that should we receive inputs that look like
(key, val1, time1)
(key, val2, time2)
(key, val3, time2)
we may lose the ability to recover the fine distinction of going from val1
to val3
. I understand at the moment that there is a 1:1 correspondence between timely capabilities and materialized logical timestamps, but differential doesn't require this and we may eventually want to remove it.
a3a1372
to
e349aeb
Compare
@@ -591,6 +597,42 @@ pub(crate) fn build_dataflow<A: Allocate>( | |||
}) | |||
} | |||
|
|||
/// `arrange_from_upsert` may not behave as intended if multiple rows |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved this out of dataflow/decode since no decoding is actually taking place in this part of the code.
f81a5e0
to
b5107a1
Compare
a84ff74
to
c053e35
Compare
c053e35
to
876b2ea
Compare
In contrast to #2646, here is the version of supporting real time timestamping with the aggregate operator.
This change is