Skip to content

Commit

Permalink
Book/doc edits (hydro-project#823)
Browse files Browse the repository at this point in the history
* docs: Doc edits WIP

* fix: infinite time testing

* fix: use batch instead of cross_join

* fix: correct usage of batch

* docs: address feedback
  • Loading branch information
jhellerstein authored Jul 8, 2023
1 parent f60053f commit 4bdd556
Show file tree
Hide file tree
Showing 29 changed files with 242 additions and 136 deletions.
49 changes: 49 additions & 0 deletions docs/docs/hydroflow/concepts/cyclic_flows.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
---
sidebar_position: 2
---

import CodeBlock from '@theme/CodeBlock';
import exampleCode from '!!raw-loader!../../../../hydroflow/examples/example_5_reachability.rs';
import exampleCode2 from '!!raw-loader!../../../../hydroflow/examples/example_naturals.rs';
import { getLines, extractOutput, extractMermaid } from '../../../src/util';

# Dataflow Cycles and Fixpoints
Many dataflow libraries only support acyclic flow graphs (DAGs); Hydroflow goes further and supports cycles. Hydroflow's semantics for cyclic flows are based on the formal foundations of recursive queries in the [Datalog](https://en.wikipedia.org/wiki/Datalog) language, which also influenced the design of recursive query features in SQL.

The basic pattern for cycles in Hydroflow looks something like this:
```
base = source_<XXX>() -> ... -> [base]cycle;
cycle = union()
-> ...
-> [next]cycle;
```
That is, we can trace a cycle of operators in the graph, where one operator is a `union()` that accepts two inputs, one of which is the "back edge" that closes the cycle.

For a concrete example, we can revisit the code in the [Graph Reachability](../quickstart/example_5_reachability.mdx) quickstart program:

<CodeBlock language="rust">{getLines(exampleCode, 7, 22)}</CodeBlock>

The cycle in that program matches our rough pattern as follows:
```
origin = source_iter(vec![0]) -> [base]reached_vertices;
reached_vertices = union() -> map(...)
-> [0]my_join_tee
-> ...
-> [next]reached_vertices;
```

How should we think about a cycle like this? Intuitively, we can think of the cycle beginning to compute on the data from `base` that comes in via `[0]cycle`. In the Graph Reachability example, this base data corresponds to the origin vertex, `0`. By joining [0] with the `stream_of_edges`, we generate neighbors (1 hop away) and pass them back into the cycle. When one of these is joined again with `stream_of_edges` we get a neighbor of a neighbor (2 hops away). When one of *these* is joined with `stream_of_edges` we get a vertex 3 hops away, and so on.

If you prefer to think about this program as logic, it represents [Mathematical Induction](https://en.wikipedia.org/wiki/Mathematical_induction) via dataflow: the data from `base` going into `[0]cycle` (i.e. the origin vertex, 0 hops away) is like a "base case", and the data going into `[1]cycle` represents the "induction step" (a vertex *k+1* hops away). (A graph with multiple cycles represents multiple induction, which is a relatively uncommon design pattern in both mathematics and Hydroflow!)

When does this process end? As with most Hydroflow questions, the answer is not in terms of control flow, but rather in terms of dataflow: *the cycle terminates when it produces no new data*, a notion called a [fixpoint](https://en.wikipedia.org/wiki/Fixed_point_(mathematics)). Our graph reachability example, it terminates when there are no new vertices to visit. Note that the `[join()](../syntax/surface_ops_gen#join)` operator is defined over the *sets* of inputs on each side, and sets
by definition do not contain duplicate values. This prevents the Reachability dataflow from regenerating the same value multiple times.

Like many looping constructs, it is possible to write a cyclic Hydroflow program that never ``terminates``, in the sense that it produces an unbounded stream of data. If we use `[join_multiset()](../syntax/surface_ops_gen#join_multiset)` instead of `[join()](../syntax/surface_ops_gen#join)` in our Reachability dataflow, the call to `flow.run_available()` never terminates, because each time the same vertex is visited, new data is generated!

A simpler example of a non-terminating cycle is the following, which specifies the natural numbers:

<CodeBlock language="rust" showLineNumbers>{exampleCode2}</CodeBlock>

Like any sufficiently powerful language, Hydroflow cannot guarantee that your programs terminate. If you're debugging a non-terminating Hydroflow program, it's a good idea to identify the dataflow cycles and insert an
`[inspect()](../syntax/surface_ops_gen#inspect)` operator along an edge of the cycle to see if it's generating unbounded amounts of duplicated data. You can use the `[unique()](../syntax/surface_ops_gen#unique)` operator to ameliorate the problem.
2 changes: 1 addition & 1 deletion docs/docs/hydroflow/concepts/debugging.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
sidebar_position: 4
sidebar_position: 5
---

# Debugging
2 changes: 1 addition & 1 deletion docs/docs/hydroflow/concepts/distributed_time.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
sidebar_position: 3
sidebar_position: 4
---

# Distributed Time
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/hydroflow/concepts/error_handling.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
sidebar_position: 5
sidebar_position: 6
---

# Error Handling
2 changes: 1 addition & 1 deletion docs/docs/hydroflow/concepts/stratification.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
sidebar_position: 2
sidebar_position: 3
---

# Streaming, Blocking and Stratification
Expand Down
3 changes: 1 addition & 2 deletions docs/docs/hydroflow/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import HydroflowDocs from '../../../hydroflow/README.md'
## This Book
This book will teach you how to set up your environment to get started with Hydroflow, and how to program in the Hydroflow surface syntax.

Keep in mind that Hydroflow is under active development and is constantly
changing. However the code in this book is tested with the Hydroflow library so should always be up-to-date.
Keep in mind that Hydroflow is under active development. However the code in this book is tested with the Hydroflow library so should always be up-to-date.

If you have any questions, feel free to [create an issue on Github](https://github.com/hydro-project/hydroflow/issues/new).
9 changes: 7 additions & 2 deletions docs/docs/hydroflow/quickstart/example_1_simplest.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,17 @@ item passed in.

The Hydroflow surface syntax is merely a *specification*; it does not actually do anything
until we run it.
We run the flow from within Rust via the [`run_available()` method](https://hydro-project.github.io/hydroflow/doc/hydroflow/scheduled/graph/struct.Hydroflow.html#method.run_available).
We can run this flow from within Rust via the [`run_available()` method](https://hydro-project.github.io/hydroflow/doc/hydroflow/scheduled/graph/struct.Hydroflow.html#method.run_available).

<CodeBlock language="rust">{getLines(exampleCode, 8)}</CodeBlock>

Note that `run_available()` runs the Hydroflow graph until no more work is immediately
available. In this example flow, running the graph drains the iterator completely, so no
more work will *ever* be available. In future examples we will use external inputs such as
network ingress, in which case more work might appear at any time.
network ingress, in which case more work might appear at any time. In those examples we may need a different method than `run_available()`,
e.g. the [`run_async()`](https://hydro-project.github.io/hydroflow/doc/hydroflow/scheduled/graph/struct.Hydroflow.html#method.run_async) method,
which we'll see
in [the EchoServer example](./example_7_echo_server).

### A Note on Project Structure
The template project is intended to be a starting point for your own Hydroflow project, and you can add files and directories as you see fit. The only requirement is that the `src/main.rs` file exists and contains a `main()` function.
Expand Down
6 changes: 6 additions & 0 deletions docs/docs/hydroflow/quickstart/example_2_simple.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ Replace the contents of `src/main.rs` with the following:

<CodeBlock language="rust" showLineNumbers>{exampleCode2}</CodeBlock>

Here the `filter_map` operator takes a map closure that returns a Rust [`Option`](https://doc.rust-lang.org/std/option/enum.Option.html).
If the value is `Some(...)`, it is passed to the output; if it is `None` it is filtered.

The `flat_map` operator takes a map closure that generates a collection type (in this case a `Vec`)
which is flattened.

Results:

<CodeBlock language="console">{extractOutput(exampleOutput2)}</CodeBlock>
2 changes: 1 addition & 1 deletion docs/docs/hydroflow/quickstart/example_3_stream.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import { getLines, extractOutput } from '../../../src/util';
> - the [`source_stream`](../syntax/surface_ops_gen.md#source_stream) operator that brings channel input into Hydroflow
> - Rust syntax to programmatically send data to a (local) channel
In our previous examples, data came from within the Hydroflow spec, via Rust iterators and the [`source_iter`](../syntax/surface_ops_gen.md#source_iter) operator. In most cases, however, data comes from outside the Hydroflow spec. In this example, we'll see a simple version of this idea, with data being generated on the same machine and sent into the channel programmatically via Rust.
In our previous examples, data came from within the Hydroflow spec, via Rust iterators and the [`source_iter`](../syntax/surface_ops_gen.md#source_iter) operator. In most cases, however, data comes from outside the Hydroflow spec. In this example, we'll see a simple version of this idea, with data being generated on the same thread and sent into the channel programmatically via Rust.

For discussion, we start with a skeleton much like before:

Expand Down
4 changes: 2 additions & 2 deletions docs/docs/hydroflow/quickstart/example_4_neighbors.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { getLines, extractOutput, extractMermaid } from '../../../src/util';
> * The [`unique`](../syntax/surface_ops_gen.md#unique) operator for removing duplicates from a stream
> * Visualizing hydroflow code via `flow.meta_graph().expect(...).to_mermaid()`
So far all the operators we've used have one input and one output and therefore
So far, all the operators we've used have one input and one output and therefore
create a linear flow of operators. Let's now take a look at a Hydroflow program containing
an operator which has multiple inputs; in the following examples we'll extend this to
multiple outputs.
Expand Down Expand Up @@ -67,7 +67,7 @@ Run the program and focus on the last three lines of output, which come from `fl
That looks right: the edges we "sent" into the flow that start at `0` are
`(0, 1)` and `(0, 3)`, so the nodes reachable from `0` in 0 or 1 hops are `0, 1, 3`.

> Note: When you run the program you may see the lines printed out in a different order. That's OK; the flow we're defining here is producing a `set` of nodes, so the order in which they are printed out is not specified. The [`sort_by_key`](../syntax/surface_ops_gen.md#sort_by_key) operator can be used to sort the output of a flow.
> Note: When you run the program you may see the lines printed out in a different order. That's OK; the flow we're defining here uses the [`join()`](../syntax/surface_ops_gen.md#join) operator, which deals in `sets` of data items, so the order in which a `join()`'s output items are generated is not specified or guaranteed. The [`sort_by_key`](../syntax/surface_ops_gen.md#sort_by_key) operator can always be used to sort the output of a flow if needed.
## Examining the Hydroflow Code
In the code, we want to start out with the origin vertex, `0`,
Expand Down
28 changes: 12 additions & 16 deletions docs/docs/hydroflow/quickstart/example_5_reachability.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@ import { getLines, extractOutput, extractMermaid } from '../../../src/util';
> * Implementing a recursive algorithm (graph reachability) via cyclic dataflow
> * Operators to union data from multiple inputs ([`union`](../syntax/surface_ops_gen.md#union)), and send data to multiple outputs ([`tee`](../syntax/surface_ops_gen.md#tee))
> * Indexing multi-output operators by appending a bracket expression
> * An example of how a cyclic dataflow in one stratum executes to completion before starting the next stratum.
> * A first example of a cyclic (recursive) flow and the concept of fixpoint.
To expand from graph neighbors to graph reachability, we want to find vertices that are connected not just to `origin`,
but also to vertices reachable *transitively* from `origin`. Said differently, a vertex is reachable from `origin` if it is
Expand Down Expand Up @@ -67,34 +66,31 @@ We route the `origin` vertex into it as one input right away:

<CodeBlock language="rust">{getLines(exampleCode, 8, 12)}</CodeBlock>

Note the square-bracket syntax for differentiating the multiple inputs to `union()`
is the same as that of `join()` (except that union can have an unbounded number of inputs,
whereas `join()` is defined to only have two.)
Note the square-bracket syntax for assigning index names to the multiple inputs to `union()`; this is similar
to the indexes for `join()`, except that (a) union can have an arbitrary number of inputs, (b) the index names can be arbitrary strings, and (c) the indexes are optional can be omitted entirely. (By contrast, recall that
`join()` is defined to take 2 required input indexes, `[0]` and `[1]`). The only reason to assign index names to the inputs of `union()` is for labeling edges in the generated (e.g. Mermaid) graphs.

Now, `join()` is defined to only have one output. In our program, we want to copy the joined
output to two places: to the original `for_each` from above to print output, and *also*
back to the `union` operator we called `reached_vertices`. We feed the `join()` output
The next group of statements lays out the join of `reached_vertices` and the `stream_of_edges`. The `join()` operator is defined to only have one output, but in our program, we need its output twice: once to feed the original `for_each` from above to print output, and also to feed
back to the `union` operator that we called `reached_vertices`. We pass the `join()` output
through a `flat_map()` as before, and then we feed the result into a [`tee()`](../syntax/surface_ops_gen.md#tee) operator,
which is the mirror image of `union()`: instead of merging many inputs to one output,
it copies one input to many different outputs. Each input element is _cloned_, in Rust terms, and
given to each of the outputs. The syntax for the outputs of `tee()` mirrors that of union: we *append*
an output index in square brackets to the `tee` or variable. In this example we have
`my_join_tee[0] ->` and `my_join_tee[1] ->`.
it copies one input to many different outputs. Each input element is _cloned_, in Rust terms, and separate copy is given to each of the outputs. The syntax for the outputs of `tee()` mirrors that of the inputs to union: we can (optionally) *append*
an arbitrary output index name in square brackets to the `tee` or variable. In this example we have `my_join_tee[cycle] ->` and `my_join_tee[print] ->`.

Finally, we process the output of the `join` as passed through the `tee`.
One branch pushes reached vertices back up into the `reached_vertices` variable (which begins with a `union`), while the other
prints out all the reached vertices as in the simple program.

<CodeBlock language="rust">{getLines(exampleCode, 14, 17)}</CodeBlock>

Note the syntax for differentiating the *outputs* of a `tee()` is symmetric to that of `union()`,
showing up to the right of the variable rather than the left.

Below is the diagram rendered by [mermaid](https://mermaid-js.github.io/) showing
the structure of the full flow:

<mermaid value={extractMermaid(exampleOutput)}></mermaid>

This is similar to the flow for graph neighbors, but has a few more operators that make it look
more complex. In particular, it includes the `union` and `tee` operators, and a cycle-forming back-edge.
There is also an auto-generated `handoff` operator that enforces the rule that a push producer and a pull consumer must be separated by a `handoff`.
There is also an auto-generated `handoff` operator that enforces the rule that a push producer and a pull consumer must be separated by a `handoff` (see the [Architecture section](../architecture/handoffs)).

# Cyclic Dataflow
Many dataflow and workflow systems are restricted to acyclic graphs (DAGs), but Hydroflow supports cycles, as we see in this example.
12 changes: 7 additions & 5 deletions docs/docs/hydroflow/quickstart/example_6_unreachability.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { getLines, extractOutput, extractMermaid } from '../../../src/util';
> * Extending a program with additional downstream logic.
> * Hydroflow's ([`difference`](../syntax/surface_ops_gen.md#difference)) operator
> * A first exposure to the concepts of _strata_ and _ticks_
> * An example of how a cyclic dataflow in one stratum executes to completion before starting the next stratum.
Our next example builds on the previous by finding vertices that are _not_ reachable. To do this, we need to capture the set `all_vertices`, and use a [difference](../syntax/surface_ops_gen.md#difference) operator to form the difference between that set of vertices and `reachable_vertices`.

Expand Down Expand Up @@ -74,16 +75,17 @@ in order, one at a time, ensuring all values are computed
before moving on to the next stratum. Between strata we see a _handoff_, which logically buffers the
output of the first stratum, and delineates the separation of execution between the 2 strata.

If you look carefully, you'll see two subgraphs labeled with `stratum 0`. The reason that stratum 0 was broken into subgraphs has nothing to do with
correctness, but rather the way that Hydroflow graphs are compiled and scheduled (as
discussed in the section on [In-Out Trees](../architecture/in-out_trees). We need not concern ourselves with this detail other than to look carefully at the `stratum` labels on the grey boxes in our Mermaid diagrams.

All the subgraphs labeled `stratum 0` are run first to completion,
and then all the subgraphs labeled `stratum 1` are run. This captures the requirements of the `difference` operator: it has to wait for its full negative input before it can start producing output. Note
how the `difference` operator has two inputs (labeled `pos` and `neg`), and only the `neg` input shows up as blocking (with the bold edge ending in a ball).

Meanwhile, note stratum 0 has a recursive loop, and stratum 1 that computes `difference`, with the blocking input. This means that Hydroflow will first run the loop of stratum 0 repeatedly until all the transitive reached vertices are found, before moving on to compute the unreached vertices.
In this Mermaid graph, note that stratum 0 has a recursive loop back through `my_join`, and `tee`s off output to the `difference` operator in stratum 1 via the handoff and the blocking `neg` input. This means that Hydroflow will first run the loop of stratum 0 repeatedly until all the transitive reached vertices are passed to the handoff (a [fixpoint](../concepts/cyclic_flows)), before moving on to compute the unreached vertices via stratum 1.

After all strata are run, Hydroflow returns to the stratum 0; this begins the next _tick_. This doesn't really matter for this example, but it is important for long-running Hydroflow services that accept input from the outside world. More on this topic in the chapter on [time](../concepts/life_and_times.md).
After all strata are run, Hydroflow returns to stratum 0; this begins the next _tick_. This doesn't really matter for this example, but it is important for long-running Hydroflow services that accept input from the outside world. More on this topic in the chapter on [time](../concepts/life_and_times.md).

If you look carefully, you'll see two subgraphs labeled with `stratum 0`. The reason that stratum 0 was broken into subgraphs has nothing to do with
correctness, but rather the way that Hydroflow graphs are compiled and scheduled, as
discussed in the chapter on [Architecture](../architecture/index.mdx).


Loading

0 comments on commit 4bdd556

Please sign in to comment.