Skip to content

Commit

Permalink
docs: write some
Browse files Browse the repository at this point in the history
  • Loading branch information
cpcloud committed Mar 27, 2024
1 parent 69c2540 commit 818f90f
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 19 deletions.

Large diffs are not rendered by default.

155 changes: 149 additions & 6 deletions docs/posts/unix-backend/index.qmd
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,28 @@ All production ready backends ship with amazing demos.

The Unix backend is no different. Let's see it in action.

First we'll download some data.

```{python}
!curl -LsS 'https://storage.googleapis.com/ibis-examples/penguins/20240322T125036Z-9aae2/penguins.csv.gz' | zcat > penguins.csv
```

```{python}
import ibis
from ibis.backends.unix import Backend
ibis.options.interactive = True
con = ibis.unix.connect({"p": "/data/penguins.csv"})
unix = Backend() # <1>
con = unix.connect({"p": "penguins.csv"})
t = con.table("p")
t
```

1. Create a Unix backend instance. This is not rolled into the usual
`ibis.${backend}` pattern because it won't see the light of day.

Sweet, huh?

Let's filter the and look at only the year 2009.
Expand All @@ -56,7 +68,31 @@ expr = (
expr
```

There's even support for joins!
There's even support for joins and aggregations!

Let's count the number of island, species pairs and sort descending by the count.

```{python}
expr = (
t.group_by("island", "species")
.agg(n=lambda t: t.count())
.order_by(ibis.desc("n"))
)
expr
```

For kicks, let's compare that to the DuckDB backend to make sure we're able to count stuff.

To be extra awesome, we'll *reuse the same expression to do the computation*.

```{python}
con = ibis.duckdb.connect()
con.create_table("p", con.read_csv("penguins.csv")) # <1>
con.execute(expr.unbind())
```

1. The `create_table` is necessary so that the expression's table
name--`p`--matches one inside the DuckDB database.

## How does it work?

Expand All @@ -65,11 +101,118 @@ Glad you asked!
The Unix backend for Ibis was built over the course of a few hours, which is
about the time it takes to make a production ready Ibis backend.

Broadly speaking, the unix backend:

1. Produces a shell command for each Ibis _table_ operation.
1. Produces a nominal output location for the output of that command, in the form of a [named pipe](https://en.wikipedia.org/wiki/Named_pipe) opened in write mode.
1. Reads output from the named pipe output location of the root of the expression tree.
1. Calls `pandas.read_csv` on that output.

::: {.callout-note collapse="true"}
# Why named pipes?

Shell commands only allow a single input from `stdin`.

However, joins accept > 1 input so we need a way to stream more than one input to a join operation.

Named pipes support the semantics of "unnamed" pipes (FIFO queue behavior) but
can be used in pipelines with nodes that have more a single input since they
exist as paths on the file system.
:::

### Expressions

Ibis expressions are an abstract representation of an analytics computation
over tabular data.

Ibis ships a public API, whose instances we call *expressions*.

Expressions have an associated type--accessible via their
[`type()`](../../reference/expression-generic.qmd#ibis.expr.types.generic.Value.type)
method--that determines what methods are available on them.

Expressions are ignorant of their underlying implementation: their
composability is determined solely by their type.

This type is determined by the expression's underlying *operation*.

The two-layer model makes it easy to describe operations in terms of the data
types produced by an expression, rather than as instances of a specific class
in a hierarchy.

This allows Ibis maintainers to alter expression API implementations without
changing those APIs making it easier to maintain and easier to keep stable than
if we had a complex (but not necessarily deep!) class hierarchy.

Operations, though, are really where the nitty gritty implementation details
start.

### Operations

Ibis _operations_ are lightweight classes that model the tree structure of a computation.

They have zero or more inputs, whose types and values are constrained by Ibis's _type system_.

Notably operations are *not* part of Ibis's public API.

When we talk about "compilation" in Ibis, we're talking about the process of
converting an _operation_ into something that the backend knows how to execute.

In this case of this wild Unix backend, each operation is compiled into a list
of strings that represent the shell command to run to execute the operation.

In other backends, like DuckDB, these compilation rules produce a sqlglot object.

When a user calls `Backend.compile`, the return type of that call will be
*specific to the backend*. For the Unix backend that's a list, and for DuckDB
and other SQL backends that'll be a sqlglot object.

Once you obtain the output of compile, it's entirely up to the backend what to do next.

### Backend implementation

At this point we've got out shell commands and output locations created as named pipes.

What next?

Well, we need to execute the commands and write their output to the corresponding named pipe.

You might think "I'll just loop over the operations, open the pipe in write mode and call `subprocess.Popen(cmd, stdout=named_pipe)`".

Not a bad thought, but the semantics of named pipes do not abide such thoughts :)

Named pipes, when opened in write mode, will block until a corresponding handle
is opened in *read* mode.

Futures using a scoped thread pool are a decent way to handle this.

The idea is to launch every node concurrently and then read from the last
node's output. This initial read of the root node's output pipe kicks off the
cascade of other reads necessary to move data through the pipeline.

The Unix backend thus constructs a scoped `ThreadPoolExecutor()` using
a context manager and submits a task for each operation to the executor.
Importantly, opening the named pipe in write mode happens **inside** the task,
to avoid blocking the main thread while waiting for a reader.

The final output task's path is then passed directly to `read_csv`, and we've
now got the result of our computation.

## Conclusion

If you've gotten this far hopefully you've had a good laugh. We aren't going to
ship a Unix backend, but there's a commit you find if you want to tinker with
it.
ship a Unix backend, but there's a commit you can find if you want to tinker
with it.

This is left as a very serious exercise for the reader.

Let's wrap up with some final thoughts.

### Things to do

- Join our Zulip!
- Open a GitHub issue or discussion!

### Things to avoid doing

Ibis is very flexible and powerful at both the user facing API level and as
seen in this wacky post also at the level of the backend implementation.
- Putting this into production
23 changes: 12 additions & 11 deletions ibis/backends/unix/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,6 @@ def compile(
pipe_dir: str | None = None,
**_: Any,
):
if pipe_dir is None:
pipe_dir = tempfile.mkdtemp()

if params is None:
params = dict()
else:
Expand All @@ -123,7 +120,7 @@ def fn(node, _, **kwargs):
# value expressions are strings that are passed to the command
# (likely awk for any non-trivial computation)
if isinstance(node, ops.Relation):
path = Path(pipe_dir, f"t{next(counter)}")
path = Path(*filter(None, (pipe_dir, f"t{next(counter)}")))
commands.append((source, path))
return path

Expand Down Expand Up @@ -176,8 +173,7 @@ def explain(
params: Mapping[ir.Expr, object] | None = None,
**kwargs: Any,
) -> str:
with tempfile.TemporaryDirectory() as pipe_dir:
plan = self.compile(expr, params=params, pipe_dir=pipe_dir, **kwargs)
plan = self.compile(expr, params=params, **kwargs)

return "\n".join(
f"{shlex.join(list(map(str, cmd)))} > {output.name}" for cmd, output in plan
Expand Down Expand Up @@ -205,15 +201,17 @@ def drop_view(self, *_, **__) -> ir.Table:
if __name__ == "__main__":
# Create a backend with some data
unix = Backend()
backend = unix.connect({"p": "/data/penguins.csv", "q": "/data/penguins.csv"})
backend = unix.connect({"p": "penguins.csv", "q": "penguins.csv"})
# Create an expression
t = backend.table("p")
q = backend.table("q")
expr = (
t.filter([t.year == 2009])
.select("year", "flipper_length_mm", island=lambda t: t.island.lower())
.group_by("island")
.agg(n=lambda t: t.count())
.select(
"year", "species", "flipper_length_mm", island=lambda t: t.island.lower()
)
.group_by("island", "species")
.agg(n=lambda t: t.count(), avg=lambda t: t.island.upper().length().mean())
.order_by("n")
.mutate(ilength=lambda t: t.island.length())
.limit(5)
Expand All @@ -224,5 +222,8 @@ def drop_view(self, *_, **__) -> ir.Table:
result = expr.execute()
print(result) # noqa: T201

result = t.join(t, ["year"]).select("year").execute()
join = t.join(t, ["year"]).select("year")
print(backend.explain(join)) # noqa: T201

result = join.execute()
print(result) # noqa: T201
6 changes: 6 additions & 0 deletions ibis/backends/unix/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def uppercase(op, *, arg, **_) -> str:
ops.Subtract: "-",
ops.Multiply: "*",
ops.Divide: "/",
ops.Power: "^",
}


Expand Down Expand Up @@ -158,6 +159,11 @@ def total(op, *, arg, where):
return f"+={arg}"


@translate.register(ops.Mean)
def mean(op, *, arg, where):
return f"+={arg}/NR"


@translate.register(ops.SortKey)
def sort_key(op, *, expr, ascending):
return expr
Expand Down

0 comments on commit 818f90f

Please sign in to comment.