persistence semantics discussion (WIP) #351
Replies: 7 comments
-
With the log output------------------- Tick 0 -------------------
symmetric join: (0, (a, x))
gb-streamjoin: (0, (a, x))
------------------- Tick 1 -------------------
symmetric join: (0, (b, x))
symmetric join: (1, (a, x))
symmetric join: (0, (a, y))
symmetric join: (0, (b, y))
gb-streamjoin: (0, (a, x))
gb-streamjoin: (0, (b, x))
gb-streamjoin: (1, (a, x))
gb-streamjoin: (0, (a, y))
gb-streamjoin: (0, (b, y))
------------------- Tick 2 -------------------
symmetric join: (1, (b, x))
symmetric join: (2, (a, x))
symmetric join: (1, (a, y))
symmetric join: (1, (b, y))
gb-streamjoin: (1, (b, x))
gb-streamjoin: (1, (a, x))
gb-streamjoin: (2, (a, x))
gb-streamjoin: (0, (a, x))
gb-streamjoin: (0, (b, x))
gb-streamjoin: (0, (a, y))
gb-streamjoin: (0, (b, y))
gb-streamjoin: (1, (b, y))
gb-streamjoin: (1, (a, y))
------------------- Tick 3 -------------------
symmetric join: (2, (b, x))
symmetric join: (3, (a, x))
symmetric join: (2, (a, y))
symmetric join: (2, (b, y))
gb-streamjoin: (3, (a, x))
gb-streamjoin: (1, (b, x))
gb-streamjoin: (1, (a, x))
gb-streamjoin: (2, (b, x))
gb-streamjoin: (2, (a, x))
gb-streamjoin: (2, (b, y))
gb-streamjoin: (2, (a, y))
gb-streamjoin: (0, (a, x))
gb-streamjoin: (0, (b, x))
gb-streamjoin: (0, (a, y))
gb-streamjoin: (0, (b, y))
gb-streamjoin: (1, (b, y))
gb-streamjoin: (1, (a, y))
------------------- Tick 4 -------------------
symmetric join: (3, (b, x))
symmetric join: (4, (a, x))
symmetric join: (3, (a, y))
symmetric join: (3, (b, y))
gb-streamjoin: (1, (a, x))
gb-streamjoin: (1, (b, x))
gb-streamjoin: (4, (a, x))
gb-streamjoin: (3, (a, y))
gb-streamjoin: (3, (b, y))
gb-streamjoin: (2, (b, y))
gb-streamjoin: (2, (a, y))
gb-streamjoin: (0, (a, x))
gb-streamjoin: (0, (b, x))
gb-streamjoin: (2, (b, x))
gb-streamjoin: (2, (a, x))
gb-streamjoin: (0, (a, y))
gb-streamjoin: (0, (b, y))
gb-streamjoin: (3, (a, x))
gb-streamjoin: (3, (b, x))
gb-streamjoin: (1, (a, y))
gb-streamjoin: (1, (b, y))
------------------- Tick 5 -------------------
symmetric join: (4, (b, x))
symmetric join: (5, (a, x))
symmetric join: (4, (a, y))
symmetric join: (4, (b, y))
gb-streamjoin: (4, (a, y))
gb-streamjoin: (4, (b, y))
gb-streamjoin: (1, (a, x))
gb-streamjoin: (1, (b, x))
gb-streamjoin: (4, (a, x))
gb-streamjoin: (4, (b, x))
gb-streamjoin: (3, (a, y))
gb-streamjoin: (3, (b, y))
gb-streamjoin: (2, (b, y))
gb-streamjoin: (2, (a, y))
gb-streamjoin: (0, (a, x))
gb-streamjoin: (0, (b, x))
gb-streamjoin: (5, (a, x))
gb-streamjoin: (2, (b, x))
gb-streamjoin: (2, (a, x))
gb-streamjoin: (0, (a, y))
gb-streamjoin: (0, (b, y))
gb-streamjoin: (3, (a, x))
gb-streamjoin: (3, (b, x))
gb-streamjoin: (1, (a, y))
gb-streamjoin: (1, (b, y))
------------------- Tick 6 -------------------
symmetric join: (5, (b, x))
symmetric join: (6, (a, x))
symmetric join: (5, (a, y))
symmetric join: (5, (b, y))
gb-streamjoin: (4, (a, y))
gb-streamjoin: (4, (b, y))
gb-streamjoin: (1, (a, x))
gb-streamjoin: (1, (b, x))
gb-streamjoin: (4, (a, x))
gb-streamjoin: (4, (b, x))
gb-streamjoin: (6, (a, x))
gb-streamjoin: (3, (a, y))
gb-streamjoin: (3, (b, y))
gb-streamjoin: (2, (b, y))
gb-streamjoin: (2, (a, y))
gb-streamjoin: (0, (a, x))
gb-streamjoin: (0, (b, x))
gb-streamjoin: (5, (b, x))
gb-streamjoin: (5, (a, x))
gb-streamjoin: (2, (b, x))
gb-streamjoin: (2, (a, x))
gb-streamjoin: (5, (b, y))
gb-streamjoin: (5, (a, y))
gb-streamjoin: (0, (a, y))
gb-streamjoin: (0, (b, y))
gb-streamjoin: (3, (a, x))
gb-streamjoin: (3, (b, x))
gb-streamjoin: (1, (a, y))
gb-streamjoin: (1, (b, y) log output------------------- Tick 0 -------------------
symmetric join: (0, (a, x))
stream-table join: (0, (a, x))
stream-stream join: (0, (a, x))
------------------- Tick 1 -------------------
symmetric join: (0, (b, x))
symmetric join: (1, (a, x))
symmetric join: (0, (a, y))
symmetric join: (0, (b, y))
stream-table join: (0, (b, x))
stream-table join: (1, (a, x))
stream-table join: (0, (b, y))
stream-stream join: (1, (a, x))
stream-stream join: (0, (b, y))
------------------- Tick 2 -------------------
symmetric join: (1, (b, x))
symmetric join: (2, (a, x))
symmetric join: (1, (a, y))
symmetric join: (1, (b, y))
stream-table join: (1, (b, x))
stream-table join: (2, (a, x))
stream-table join: (1, (b, y))
stream-stream join: (2, (a, x))
stream-stream join: (1, (b, y))
------------------- Tick 3 -------------------
symmetric join: (2, (b, x))
symmetric join: (3, (a, x))
symmetric join: (2, (a, y))
symmetric join: (2, (b, y))
stream-table join: (2, (b, x))
stream-table join: (3, (a, x))
stream-table join: (2, (b, y))
stream-stream join: (3, (a, x))
stream-stream join: (2, (b, y))
------------------- Tick 4 -------------------
symmetric join: (3, (b, x))
symmetric join: (4, (a, x))
symmetric join: (3, (a, y))
symmetric join: (3, (b, y))
stream-table join: (3, (b, x))
stream-table join: (4, (a, x))
stream-table join: (3, (b, y))
stream-stream join: (4, (a, x))
stream-stream join: (3, (b, y))
------------------- Tick 5 -------------------
symmetric join: (4, (b, x))
symmetric join: (5, (a, x))
symmetric join: (4, (a, y))
symmetric join: (4, (b, y))
stream-table join: (4, (b, x))
stream-table join: (5, (a, x))
stream-table join: (4, (b, y))
stream-stream join: (5, (a, x))
stream-stream join: (4, (b, y))
------------------- Tick 6 -------------------
symmetric join: (5, (b, x))
symmetric join: (6, (a, x))
symmetric join: (5, (a, y))
symmetric join: (5, (b, y))
stream-table join: (5, (b, x))
stream-table join: (6, (a, x))
stream-table join: (5, (b, y))
stream-stream join: (6, (a, x))
stream-stream join: (5, (b, y)) log output------------------- Tick 0 -------------------
------------------- Tick 1 -------------------
------------------- Tick 2 -------------------
symmetric join: (0, (a, x))
symmetric join: (0, (a, y))
stream-table join: (0, (a, x))
stream-table join: (0, (a, y))
------------------- Tick 3 -------------------
symmetric join: (1, (a, x))
stream-table join: (1, (a, x))
------------------- Tick 4 -------------------
symmetric join: (0, (b, x))
symmetric join: (0, (b, y))
stream-table join: (0, (b, x))
stream-table join: (0, (b, y))
------------------- Tick 5 -------------------
symmetric join: (2, (a, x))
symmetric join: (1, (a, y))
stream-table join: (2, (a, x))
------------------- Tick 6 -------------------
symmetric join: (2, (a, y))
------------------- Tick 7 -------------------
symmetric join: (1, (b, x))
symmetric join: (1, (b, y))
stream-table join: (1, (b, x))
stream-table join: (1, (b, y))
------------------- Tick 8 -------------------
------------------- Tick 9 -------------------
symmetric join: (3, (a, x))
symmetric join: (2, (b, x))
symmetric join: (2, (b, y))
stream-table join: (3, (a, x))
stream-table join: (2, (b, x))
stream-table join: (2, (b, y))
------------------- Tick 10 -------------------
------------------- Tick 11 -------------------
symmetric join: (3, (a, y))
------------------- Tick 12 -------------------
symmetric join: (4, (a, x))
symmetric join: (4, (a, y))
stream-table join: (4, (a, x))
stream-table join: (4, (a, y))
------------------- Tick 13 -------------------
symmetric join: (3, (b, x))
symmetric join: (3, (b, y))
stream-table join: (3, (b, x))
stream-table join: (3, (b, y))
------------------- Tick 14 -------------------
symmetric join: (5, (a, x))
stream-table join: (5, (a, x))
------------------- Tick 15 -------------------
------------------- Tick 16 -------------------
------------------- Tick 17 -------------------
symmetric join: (4, (b, x))
symmetric join: (4, (b, y))
stream-table join: (4, (b, x))
stream-table join: (4, (b, y))
------------------- Tick 18 ------------------- |
Beta Was this translation helpful? Give feedback.
-
Thinking about rescanning logic today. Playing around with introducing more rescanning. It was easy to hack Results of the first example program above with a replaying Playing with this, I’m trying to think both about correctness and about how complex our API is… At the client API boundary:
Inside a dataflow:
Optimizations: In certain cases we may have
Whatever the right answers here, I'm increasingly convinced that there are 3 bits of information here:
log output------------------- Tick 0 -------------------
stream-table join: (0, (a, x))
table-stream join: (0, (a, x))
symmetric join: (0, (a, x))
gb-streamjoin: (0, (a, x))
------------------- Tick 1 -------------------
symmetric join: (0, (b, x))
symmetric join: (1, (a, x))
symmetric join: (0, (a, y))
symmetric join: (0, (b, y))
stream-table join: (0, (b, x))
stream-table join: (1, (a, x))
stream-table join: (0, (b, y))
table-stream join: (1, (a, x))
table-stream join: (0, (a, y))
table-stream join: (0, (b, y))
gb-streamjoin: (1, (a, x))
gb-streamjoin: (0, (b, x))
gb-streamjoin: (0, (a, x))
gb-streamjoin: (0, (b, y))
gb-streamjoin: (0, (a, y))
------------------- Tick 2 -------------------
symmetric join: (1, (b, x))
symmetric join: (2, (a, x))
symmetric join: (1, (a, y))
symmetric join: (1, (b, y))
stream-table join: (1, (b, x))
stream-table join: (2, (a, x))
stream-table join: (1, (b, y))
table-stream join: (2, (a, x))
table-stream join: (1, (a, y))
table-stream join: (1, (b, y))
gb-streamjoin: (1, (a, x))
gb-streamjoin: (1, (b, x))
gb-streamjoin: (2, (a, x))
gb-streamjoin: (0, (a, y))
gb-streamjoin: (0, (b, y))
gb-streamjoin: (1, (a, y))
gb-streamjoin: (1, (b, y))
gb-streamjoin: (0, (a, x))
gb-streamjoin: (0, (b, x))
------------------- Tick 3 -------------------
symmetric join: (2, (b, x))
symmetric join: (3, (a, x))
symmetric join: (2, (a, y))
symmetric join: (2, (b, y))
stream-table join: (2, (b, x))
stream-table join: (3, (a, x))
stream-table join: (2, (b, y))
table-stream join: (3, (a, x))
table-stream join: (2, (a, y))
table-stream join: (2, (b, y))
gb-streamjoin: (1, (a, x))
gb-streamjoin: (1, (b, x))
gb-streamjoin: (2, (a, x))
gb-streamjoin: (2, (b, x))
gb-streamjoin: (0, (a, y))
gb-streamjoin: (0, (b, y))
gb-streamjoin: (1, (a, y))
gb-streamjoin: (1, (b, y))
gb-streamjoin: (3, (a, x))
gb-streamjoin: (2, (a, y))
gb-streamjoin: (2, (b, y))
gb-streamjoin: (0, (a, x))
gb-streamjoin: (0, (b, x))
------------------- Tick 4 -------------------
symmetric join: (3, (b, x))
symmetric join: (4, (a, x))
symmetric join: (3, (a, y))
symmetric join: (3, (b, y))
stream-table join: (3, (b, x))
stream-table join: (4, (a, x))
stream-table join: (3, (b, y))
table-stream join: (4, (a, x))
table-stream join: (3, (a, y))
table-stream join: (3, (b, y))
gb-streamjoin: (1, (b, x))
gb-streamjoin: (1, (a, x))
gb-streamjoin: (3, (a, x))
gb-streamjoin: (3, (b, x))
gb-streamjoin: (0, (a, y))
gb-streamjoin: (0, (b, y))
gb-streamjoin: (2, (b, y))
gb-streamjoin: (2, (a, y))
gb-streamjoin: (4, (a, x))
gb-streamjoin: (0, (a, x))
gb-streamjoin: (0, (b, x))
gb-streamjoin: (3, (a, y))
gb-streamjoin: (3, (b, y))
gb-streamjoin: (2, (b, x))
gb-streamjoin: (2, (a, x))
gb-streamjoin: (1, (b, y))
gb-streamjoin: (1, (a, y))
------------------- Tick 5 -------------------
symmetric join: (4, (b, x))
symmetric join: (5, (a, x))
symmetric join: (4, (a, y))
symmetric join: (4, (b, y))
stream-table join: (4, (b, x))
stream-table join: (5, (a, x))
stream-table join: (4, (b, y))
table-stream join: (5, (a, x))
table-stream join: (4, (a, y))
table-stream join: (4, (b, y))
gb-streamjoin: (1, (b, x))
gb-streamjoin: (1, (a, x))
gb-streamjoin: (3, (a, x))
gb-streamjoin: (3, (b, x))
gb-streamjoin: (0, (a, y))
gb-streamjoin: (0, (b, y))
gb-streamjoin: (2, (b, y))
gb-streamjoin: (2, (a, y))
gb-streamjoin: (4, (a, y))
gb-streamjoin: (4, (b, y))
gb-streamjoin: (4, (a, x))
gb-streamjoin: (4, (b, x))
gb-streamjoin: (0, (a, x))
gb-streamjoin: (0, (b, x))
gb-streamjoin: (3, (a, y))
gb-streamjoin: (3, (b, y))
gb-streamjoin: (2, (b, x))
gb-streamjoin: (2, (a, x))
gb-streamjoin: (5, (a, x))
gb-streamjoin: (1, (b, y))
gb-streamjoin: (1, (a, y))
------------------- Tick 6 -------------------
symmetric join: (5, (b, x))
symmetric join: (6, (a, x))
symmetric join: (5, (a, y))
symmetric join: (5, (b, y))
stream-table join: (5, (b, x))
stream-table join: (6, (a, x))
stream-table join: (5, (b, y))
table-stream join: (6, (a, x))
table-stream join: (5, (a, y))
table-stream join: (5, (b, y))
gb-streamjoin: (1, (b, x))
gb-streamjoin: (1, (a, x))
gb-streamjoin: (3, (a, x))
gb-streamjoin: (3, (b, x))
gb-streamjoin: (0, (a, y))
gb-streamjoin: (0, (b, y))
gb-streamjoin: (2, (b, y))
gb-streamjoin: (2, (a, y))
gb-streamjoin: (4, (a, y))
gb-streamjoin: (4, (b, y))
gb-streamjoin: (4, (a, x))
gb-streamjoin: (4, (b, x))
gb-streamjoin: (0, (a, x))
gb-streamjoin: (0, (b, x))
gb-streamjoin: (3, (a, y))
gb-streamjoin: (3, (b, y))
gb-streamjoin: (2, (b, x))
gb-streamjoin: (2, (a, x))
gb-streamjoin: (5, (a, x))
gb-streamjoin: (5, (b, x))
gb-streamjoin: (6, (a, x))
gb-streamjoin: (1, (b, y))
gb-streamjoin: (1, (a, y))
gb-streamjoin: (5, (a, y))
gb-streamjoin: (5, (b, y)) |
Beta Was this translation helpful? Give feedback.
-
Recall the 3 bits of information in the previous post:
Let's try to get notation for all this:
Note that Going back to our 3 bits, we can convert between their settings as follows:
It would seem that |
Beta Was this translation helpful? Give feedback.
-
I think the above is enough to capture lifetime semantics. It models our The one thing we probably have wrong now is that our |
Beta Was this translation helpful? Give feedback.
-
Remaining issues:
|
Beta Was this translation helpful? Give feedback.
-
Proposal for Consistency Analysis Under Dedalus TimeBackground on the Dedalus Time ModelIn this doc we're going to embrace the Dedalus/Bloom notion of time in which:
It's interesting to try and unpack this model and remix it in different ways, but that is outside the scope of this document. Notation
Edge PropertiesWe focus on two semantic properties of edges: monotonicity and non-determinism.
Definition: An edge that is both non-monotonic and non-deterministic is inconsistent: it may produce different values across runs and replicas, not just different orders/associations OperatorsWe will assume all operators consume streams of lattice points (i.e. multiple per tick) as in current Hydroflow. Definition: We say an operator is monotonic if its output edges are monotonic whenever all of its input edges are monotonic. Observation: Composability: Monotonicity and Determinism are both composable properties: we can assemble "pipelines" of monotonic edges and operators and they can be encapsulated as a monotonic operator. Similar for determinism. Network input operators These operators perform "demonic" permutation and association
Deterministic generators These operators generate the same data across runs and replicas. They are deterministic.
Stateless (Standard) operators map from one lattice point to one or more other points (perhaps in another lattice). They may be monotone or non-monotone, deterministic or non-deterministic.
Aggregation operators are "stateful" macro-operators above. That is, formally we can model them as composite operators involving either a map(|x| into_accumulator_lattice) -> ⊔ -> map(|a| out_of_accumulator)
map(|x| into_hash_lattice) -> ⊔ -> ♭ If we represent hash-lattice map as Optionally replace These ops are monotone iff they use
Ordering operators are per-tick stateful operators. (They'd be non-terminating if they persisted across ticks). Effectively they operate on a single lattice point per tick, so require a
Joins generate elements of the cross-product space of their inputs. While they are well-defined as stateless operators (e.g. joining up singleton sets) across two streams, they are almost never intended to be used that way. We will assume accumulators Join is a monotone binary function of its inputs, so if both inputs are monotone
Negation operators are non-monotonic, deterministic operators. Like join they are well-defined as stateless operators (e.g. the
Stateful Ops and Persistence AnnotationsFor stateful ops (aggregations, ordering, and the typical uses of join and negation), we will "embed" the lattice-join operators into the operator's type signature, which contains a list of persistence annotations corresponding to the list of input edges; each annotation is either As a corollary to the discussion above, if any input to a stateful operator is marked Property InferenceHere are the basic rules for inferring monotonicity and determinism of operators from their inputs:
Given a Hydroflow graph, we can use the three rules above and the operator definitions to infer the monotonicity and determinism of each edge in the graph. The inference pseudocode is as follows:
Delta OptimizationIn a fully monotonic subgraph that has multiple We'll set aside this topic for another discussion, but it's worth mentioning here if only to motivate the need for the |
Beta Was this translation helpful? Give feedback.
-
Copying over from Slack for better, uh, persistence: throwing this out there to gauge if this is an interesting idea or it’s already obvious… So right now it’s up to the developer to determine when to use Of course there are scenarios where you want to accumulate data across ticks for, let’s say a join. Consider a snippet of a chat example… message_distributor = join() -> messages_out
messages_in -> [0] message_distributor
recipients_in -> [1] message_distributor Now let’s assume we do not have access to persistence lifetimes, then to ensure that messages are actually sent to the entire group, we need to accumulate them. Let’s say we have an operator message_distributor = join() -> messages_out
messages_in -> [0] message_distributor
recipients_in -> persist() -> [1] message_distributor Of course, this will be very inefficient because the join state is reset every tick so we have rehash all the elements. But this program is behaviorally equivalent to one with lifetimes that is much faster! message_distributor = join::<'tick, 'static>() -> messages_out
messages_in -> [0] message_distributor
recipients_in -> [1] message_distributor If our application wants new recipients to be able to see old messages, we can persist both sides of the join: message_distributor = join() -> messages_out
messages_in -> persist() -> [0] message_distributor
recipients_in -> persist() -> [1] message_distributor But this will actually result in confusing behavior, as there will be redundant packets sent to each recipient on each tick. What we want is to send out new join results compared to the previous tick. So we can introduce another operator that is the opposite of message_distributor = join() -> unpersist() -> messages_out
messages_in -> persist() -> [0] message_distributor
recipients_in -> persist() -> [1] message_distributor This program is horrendously slow if implemented naively, but once again it is equivalent to lifetimes! message_distributor = join::<'static, 'static>() -> messages_out
messages_in -> [0] message_distributor
recipients_in -> [1] message_distributor The cool thing here is that a developer can reason about what |
Beta Was this translation helpful? Give feedback.
-
In Hydroflow we currently allow persistence to be specified per operator. This is causing some confusion on a few fronts.
To rescan or not to rescan
Consider the following code. It prints:
'static
) join of two streams'tick
) join of two streams, each of which is persisted in a'static
group_by operator upstream.Hypothesis: these should have the same results.
We might think these would behave the same, but they don't. The
'static
group_by
op is rescanned each tick but thejoin
is not, so thegroup_by
outputs duplicates across ticks and thejoin
does not:produces:
log output
(as an aside, I'm not sure how the scheduler decides to dequeue channel inputs per tick. for debugging it would be nice if it drained all channels each time we call
run_tick
.)Monotonicity and Lifetimes
Consider a flow containing just a
join
of two streams, but with different lifetimes:With the same inputs from
main
as above, we get:log output
Now let's add random delays:
Result (all but symmetric join vary across runs due to randomness of delays):
log output
Clearly the examples here that are deterministic are the ones that persist both sides, but that persistence is a transitive property along the graph as with the above
group_by
.Getting the transitivity rules right is tricky. Here's an attempt. We have four properties:
monotone
/non-monotone
): is the operator's output a monotone function of its input? E.g.join
is monotone in both inputs;difference
is non-monotone in itsneg
input.'static
/'tick
): does the operator accumulate its input over time? if so that input is'static
else it's'tick
.cumulative
/differential
) over time: does the edge represent the upstream computation over all ticks, or just incremental change since the prior tick? If all it'scumulative
, if just the last it'sdifferential
.t-monotone
/t-non-monotone
): does the edge deliver a monotone stream (in either cumulative or differential delivery mode)?Axioms:
monotone
and all input persistence over time isstatic
: output edge ist-monotone
regardless of input edge properties.'static
: If operator input[i]
with persistencetick
is 'cumulative' andt-monotone
: can treat input[i]
as having'static
semantics even though it does not persist (it gets repopulated each tick).To be continued...
Reference Behavior: Bloom
For reference, in Bloom we have persistence on sources (table vs scratch/channel). In each Bloom tick we scan the tables/scratches/channels and rerun all the operators. At the end of the tick, the scratches and channels and operator state are all zeroed.
Beta Was this translation helpful? Give feedback.
All reactions