Skip to content

Commit

Permalink
docs(how-to): add a guide on streaming operations (#9642)
Browse files Browse the repository at this point in the history
  • Loading branch information
chloeh13q authored Jul 22, 2024
1 parent 49ad408 commit f3ed10c
Showing 1 changed file with 97 additions and 0 deletions.
97 changes: 97 additions & 0 deletions docs/how-to/extending/streaming.qmd
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
---
title: Ibis for streaming
---

Ibis has support for streaming operations, which can be executed on Flink,
Spark Structured Streaming, and RisingWave.

## Setup

We demonstrate the streaming operations with a real-time fraud detection example.
If you have Kafka set up in your infrastructure, you can connect to your existing Kafka
topics as well.

You can find our code setup [here](https://github.com/ibis-project/realtime-fraud-detection).
Feel free to clone the repository if you want to follow along.

## Window aggregation
Computes aggregations over windows.

The output schema consists of `window_start`, `window_end`, the group
by column if applicable (optional), and the aggregation results.

Tumble and hop windows are supported. Tumbling windows have a fixed size and do not overlap.
Hopping windows (aka sliding windows) are configured by both window size and window slide. The
additional window slide parameter controls how frequently a sliding window is started.

For more, see [Flink's documentation on Windowing TVFs](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sql/queries/window-tvf/)
and [Spark's documentation on time windows](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#types-of-time-windows).

```python
t = con.table("payment") # table corresponding to the `payment` topic

# tumble window
expr = (
t.window_by(time_col=t.createTime)
.tumble(size=ibis.interval(seconds=30))
.agg(by=["provinceId"], avgPayAmount=_.payAmount.mean())
)

# hop window
expr = (
t.window_by(time_col=t.createTime)
.hop(size=ibis.interval(seconds=30), slide=ibis.interval(seconds=15))
.agg(by=["provinceId"], avgPayAmount=_.payAmount.mean())
)
```

## Over aggregation
Computes aggregate values for every input row, over either a row range or a time range.

::: {.callout-note}
Spark Structured Streaming does not support aggregation using the `OVER` syntax. You need to use
window aggregation to aggregate over time windows.
:::

```python
expr = (
t.select(
province_id=t.provinceId,
pay_amount=t.payAmount.sum().over(
range=(-ibis.interval(seconds=10), 0),
group_by=t.provinceId,
order_by=t.createTime,
),
)
)
```


## Stream-table join
Joining a stream with a static table.

```python
provinces = (
"Beijing",
"Shanghai",
"Hangzhou",
"Shenzhen",
"Jiangxi",
"Chongqing",
"Xizang",
)
province_id_to_name_df = pd.DataFrame(
enumerate(provinces), columns=["provinceId", "province"]
)
expr = t.join(province_id_to_name_df, ["provinceId"])
```

## Stream-stream join
Joining two streams.

```python
order = con.table("order") # table corresponding to the `order` topic
expr = t.join(
order, [t.orderId == order.orderId, t.createTime == order.createTime]
)
```

0 comments on commit f3ed10c

Please sign in to comment.