Breaking down Symmetric (Ripple) Join #396
jhellerstein
started this conversation in
Ideas
Replies: 1 comment
-
A note on the above: this may be our first instance of sharing a state module across operators. We need to get a better functional notation for that. |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
Symmetric join goes back to the work of Wilschut and Apers in the PRISMA/DB project, where it was called pipelined hash join.
Haas and Hellerstein generalized it to the Ripple Join. This allows for non-equijoins, rate control for sampling/estimation, and perhaps more importantly for our purposes gives a "cartesian space" interpretation that makes the monotonicity and correctness clear: each "probe" adds an "edge" to a "ripple" -- figure 6 of the ripple join paper shows how the cross-join space is mapped out when alternating inputs (a "square" ripple join):
Raman, Deshpande and Hellerstein show how to decouple the symmetric hash join into state modules and a routing policy. This is the approach we will follow.
Assume we have two relations to join,$R$ and $S$ . We will associate a collection-typed "state module" with each of $R$ and $S$ . Following database tradition, we refer to inserting tuples as "building", and doing lookups as "probing". We will have an operator to perform each in Hydroflow.
The unary$\Delta$ operator takes an incoming tuple, "builds" it into the state module, and passes the tuple along as output -- effectively generating a stream of deltas (insertions) to the state module. The binary $\triangleleft$ operator (we can call it "half-join") is asymmetric: $R \triangleleft S$ takes a tuple $t_R$ of $R$ and probes the state module of $S$ , returning concatenated join outputs $t_R \cdot t_S$ for all matching tuples $t_S$ that are found.
Here is a decoupled dataflow for the symmetric join:
Consider any output tuple$t_R \cdot t_S$ . We want to ensure it appears in the output exactly once. Assume we have *no control over the scheduling of the operators other than dataflow dependencies. Note that the dataflow ensures that each tuple goes through $\Delta$ before $\triangleleft$ . This guarantees that each output tuple is generated at least once, but it is not enough to guarantee each tuple is generated exactly once. There are six orders possible, four of which generate redundant copies.
We can filter tuples locally to avoid redundancy using a precedence ordering. One canonical ordering is to require "build happens-before probe" on the tuple pairs: whichever tuple is "built" with$\Delta$ earlier should not find a match when it probes through $\triangleleft$ . If the two sides arrive concurrently, we can "tiebreak" via the convention that the probe $\triangleleft(t_R)$ should not produce output on concurrent tuples.
We can implement happens-before in two ways, depending on whether the join is distributed or not:
Then we filter tuples based on the happens-before constraint mentioned above.
Alternatively, we can prevent concurrency entirely in the scheduler, in effect achieving the "coupled" symmetric hash join: we only allow schedules that couple$\Delta(t_R)\triangleleft(t_R)$ (resp. $t_S$ ) as an atomic pair of ops (as in the "exactly one" schedules). This allows us to avoid the overhead of maintaining timestamps.
Ideally we could have a compiler optimization called "timestamp elision" that can choose to take a freely-scheduled flow (e.g. with vc-timestamps) and reduce timing metadata by constraining the scheduler. This would be something we want to generalize.
Beta Was this translation helpful? Give feedback.
All reactions