diff --git a/python/docs/examples/_metadata.yml b/python/docs/examples/_metadata.yml index dc16c1f14..eefef2046 100644 --- a/python/docs/examples/_metadata.yml +++ b/python/docs/examples/_metadata.yml @@ -1,3 +1,7 @@ +execute: + daemon: false + warning: false + filters: - include-code-files @@ -5,3 +9,5 @@ format: html: link-external-icon: true link-external-newwindow: true + +jupyter: python3 diff --git a/python/docs/examples/bluesky.py b/python/docs/examples/bluesky.py index 906a1c0aa..c260fc960 100644 --- a/python/docs/examples/bluesky.py +++ b/python/docs/examples/bluesky.py @@ -26,6 +26,7 @@ async def main(): # The firehose doesn't (currently) require authentication. at_client = AsyncFirehoseSubscribeReposClient() + # [start_setup] # Setup the data source. # This defintes (most of) the schema of the events we'll receive, # and tells Kaskada which fields to use for time and initial entity. @@ -61,7 +62,9 @@ async def main(): key_column="author", time_unit="s", ) + # [end_setup] + # [start_incoming] # Handler for newly-arrived messages from BlueSky. async def receive_at(message) -> None: # Extract the contents of the message and bail if it's not a "commit" @@ -89,6 +92,9 @@ async def receive_at(message) -> None: } ) + # [end_incoming] + + # [start_result] # Handler for values emitted by Kaskada. async def receive_outputs(): # We'll perform a very simple aggregation - key by language and count. @@ -98,8 +104,12 @@ async def receive_outputs(): async for row in posts_by_first_lang.count().run_iter(kind="row", mode="live"): print(f"{row['_key']} has posted {row['result']} times since startup") + # [end_result] + + # [start_run] # Kickoff the two async processes concurrently. await asyncio.gather(at_client.start(receive_at), receive_outputs()) + # [end_run] # Copied from https://raw.githubusercontent.com/MarshalX/atproto/main/examples/firehose/process_commits.py diff --git a/python/docs/examples/bluesky.md b/python/docs/examples/bluesky.qmd similarity index 95% rename from python/docs/examples/bluesky.md rename to python/docs/examples/bluesky.qmd index 4b6b97081..79468ce76 100644 --- a/python/docs/examples/bluesky.md +++ b/python/docs/examples/bluesky.qmd @@ -17,7 +17,7 @@ You can see the full example in the file [bluesky.py](https://github.com/kaskada Before we can receive events from Bluesky, we need to create a data source to tell Kaskada how to handle the events. We'll provide a schema and configure the time and entity fields. -```{.python include="bluesky.py" code-line-numbers="true" start-line=26 end-line=53 dedent=4} +```{.python include="bluesky.py" code-line-numbers="true" start-line=30 end-line=64 dedent=4} ``` ## Define the incoming event handler @@ -27,7 +27,7 @@ This handler parses the message to find [Commit](https://atproto.com/specs/repos For each Commit, we'll parse out any [Post](https://atproto.com/blog/create-post#post-record-structure) messages. Finally we do some schema munging to get the Post into the event format we described when creating the data source. -```{.python include="bluesky.py" code-line-numbers="true" start-line=55 end-line=79 dedent=4} +```{.python include="bluesky.py" code-line-numbers="true" start-line=68 end-line=93 dedent=4} ``` ## Construct a real-time query and result handler @@ -37,14 +37,14 @@ First we'll use `with_key` to regroup events by language, then we'll apply a sim Finally, we create a handler for the transformed results - here just printing them out. -```{.python include="bluesky.py" code-line-numbers="true" start-line=81 end-line=89 dedent=4} +```{.python include="bluesky.py" code-line-numbers="true" start-line=98 end-line=105 dedent=4} ``` ## Final touches Now we just need to kick it all off by calling `asyncio.gather` on the two handler coroutines. This kicks off all the async processing. -```{.python include="bluesky.py" code-line-numbers="true" start-line=91 end-line=92 dedent=4} +```{.python include="bluesky.py" code-line-numbers="true" start-line=110 end-line=111 dedent=4} ``` Try running it yourself and playing different transformations! diff --git a/python/docs/examples/time_centric.ipynb b/python/docs/examples/time_centric.ipynb deleted file mode 100644 index 5935cba1a..000000000 --- a/python/docs/examples/time_centric.ipynb +++ /dev/null @@ -1,357 +0,0 @@ -{ - "cells": [ - { - "cell_type": "raw", - "id": "cbf72170", - "metadata": {}, - "source": [ - "---\n", - "title: \"Time-centric Calculations\"\n", - "subtitle: \"Work with time and produce past training examples and recent results for applying models.\"\n", - "order: 1\n", - "---" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "5a20a51f", - "metadata": { - "id": "5a20a51f" - }, - "source": [ - "Kaskada was built to process and perform temporal calculations on event streams,\n", - "with real-time analytics and machine learning in mind. It is not exclusively for\n", - "real-time applications, but Kaskada excels at time-centric computations and\n", - "aggregations on event-based data.\n", - "\n", - "For example, let's say you're building a user analytics dashboard at an\n", - "ecommerce retailer. You have event streams showing all actions the user has\n", - "taken, and you'd like to include in the dashboard:\n", - "\n", - "* the total number of events the user has ever generated\n", - "* the total number of purchases the user has made\n", - "* the total revenue from the user\n", - "* the number of purchases made by the user today\n", - "* the total revenue from the user today\n", - "* the number of events the user has generated in the past hour\n", - "\n", - "Because the calculations needed here are a mix of hourly, daily, and over all of\n", - "history, more than one type of event aggregation needs to happen. Table-centric\n", - "tools like those based on SQL would require multiple JOINs and window functions,\n", - "which would be spread over multiple queries or CTEs. \n", - "\n", - "Kaskada was designed for these types of time-centric calculations, so we can do\n", - "each of the calculations in the list in one line:\n", - "\n", - "```python\n", - "record({\n", - " \"event_count_total\": DemoEvents.count(),\n", - " \"purchases_total_count\": DemoEvents.filter(DemoEvents.col(\"event_name\").eq(\"purchase\")).count(),\n", - " \"revenue_total\": DemoEvents.col(\"revenue\").sum(),\n", - " \"purchases_daily\": DemoEvents.filter(DemoEvents.col(\"event_name\").eq(\"purchase\")).count(window=Daily()),\n", - " \"revenue_daily\": DemoEvents.col(\"revenue\").sum(window=Daily()),\n", - " \"event_count_hourly\": DemoEvents.count(window=Hourly()),\n", - "})\n", - "```\n", - "\n", - "::: {.callout-warning}\n", - "The previous example demonstrates the use of `Daily()` and `Hourly()` windowing which aren't yet part of the new Python library.\n", - ":::\n", - "\n", - "Of course, a few more lines of code are needed to put these calculations to work,\n", - "but these six lines are all that is needed to specify the calculations\n", - "themselves. Each line may specify:\n", - "\n", - "* the name of a calculation (e.g. `event_count_total`)\n", - "* the input data to start with (e.g. `DemoEvents`)\n", - "* selecting event fields (e.g. `DemoEvents.col(\"revenue\")`)\n", - "* function calls (e.g. `count()`)\n", - "* event filtering (e.g. `filter(DemoEvents.col(\"event_name\").eq(\"purchase\"))`)\n", - "* time windows to calculate over (e.g. `window=Daily()`)\n", - "\n", - "...with consecutive steps chained together in a familiar way.\n", - "\n", - "Because Kaskada was built for time-centric calculations on event-based data, a\n", - "calculation we might describe as \"total number of purchase events for the user\"\n", - "can be defined in Kaskada in roughly the same number of terms as the verbal\n", - "description itself.\n", - "\n", - "Continue through the demo below to find out how it works.\n", - "\n", - "See [the Kaskada documentation](../guide/index) for lots more information." - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "BJ2EE9mSGtGB", - "metadata": { - "id": "BJ2EE9mSGtGB" - }, - "source": [ - "## Kaskada Client Setup\n", - "\n", - "```\n", - "%pip install kaskada>=0.6.0-a.0\n", - "```" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "37db47ba", - "metadata": { - "tags": [ - "hide-output" - ] - }, - "outputs": [], - "source": [ - "import kaskada as kd\n", - "\n", - "kd.init_session()" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "5b838eef", - "metadata": {}, - "source": [ - "## Example dataset\n", - "\n", - "For this demo, we'll use a very small example data set, which, for simplicity and portability of this demo notebook, we'll read from a string.\n", - "\n", - "```{note}\n", - "For simplicity, instead of a CSV file or other file format we read and then parse data from a CSV string.\n", - "You can load your own event data from many common sources, including Pandas DataFrames and Parquet files.\n", - "See {py:mod}`kaskada.sources` for more information on the available sources.\n", - "```" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ba4bb6b6", - "metadata": {}, - "outputs": [], - "source": [ - "import asyncio\n", - "\n", - "# For demo simplicity, instead of a CSV file, we read and then parse data from a\n", - "# CSV string. Kaskada\n", - "event_data_string = \"\"\"\n", - " event_id,event_at,entity_id,event_name,revenue\n", - " ev_00001,2022-01-01 22:01:00,user_001,login,0\n", - " ev_00002,2022-01-01 22:05:00,user_001,view_item,0\n", - " ev_00003,2022-01-01 22:20:00,user_001,view_item,0\n", - " ev_00004,2022-01-01 23:10:00,user_001,view_item,0\n", - " ev_00005,2022-01-01 23:20:00,user_001,view_item,0\n", - " ev_00006,2022-01-01 23:40:00,user_001,purchase,12.50\n", - " ev_00007,2022-01-01 23:45:00,user_001,view_item,0\n", - " ev_00008,2022-01-01 23:59:00,user_001,view_item,0\n", - " ev_00009,2022-01-02 05:30:00,user_001,login,0\n", - " ev_00010,2022-01-02 05:35:00,user_001,view_item,0\n", - " ev_00011,2022-01-02 05:45:00,user_001,view_item,0\n", - " ev_00012,2022-01-02 06:10:00,user_001,view_item,0\n", - " ev_00013,2022-01-02 06:15:00,user_001,view_item,0\n", - " ev_00014,2022-01-02 06:25:00,user_001,purchase,25\n", - " ev_00015,2022-01-02 06:30:00,user_001,view_item,0\n", - " ev_00016,2022-01-02 06:31:00,user_001,purchase,5.75\n", - " ev_00017,2022-01-02 07:01:00,user_001,view_item,0\n", - " ev_00018,2022-01-01 22:17:00,user_002,view_item,0\n", - " ev_00019,2022-01-01 22:18:00,user_002,view_item,0\n", - " ev_00020,2022-01-01 22:20:00,user_002,view_item,0\n", - "\"\"\"\n", - "\n", - "events = await kd.sources.CsvString.create(\n", - " event_data_string, time_column=\"event_at\", key_column=\"entity_id\"\n", - ")\n", - "\n", - "# Inspect the event data\n", - "events.preview()" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "568d1272", - "metadata": {}, - "source": [ - "## Define queries and calculations" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "c2c5a298", - "metadata": {}, - "source": [ - "Kaskada queries are defined in Python, using the `{py}Timestream` class.\n", - "Sources are Timestreams generally containing [records](../guide/data_types.qmd#record-types).\n", - "\n", - "Let's do a simple query for events for a specific entity ID.\n" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "bce22e47", - "metadata": {}, - "outputs": [], - "source": [ - "events.filter(events.col(\"entity_id\").eq(\"user_002\")).preview()" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "6b5f2725", - "metadata": {}, - "source": [ - "\n", - "Beyond querying for events, Kaskada has a powerful syntax for defining\n", - "calculations on events, temporally across history.\n", - "\n", - "The six calculations discussed at the top of this demo notebook are below.\n", - "\n", - "(Note that some functions return `NaN` if no events for that user have occurred\n", - "within the time window.)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "3ad6d596", - "metadata": {}, - "outputs": [], - "source": [ - "purchases = events.filter(events.col(\"event_name\").eq(\"purchase\"))\n", - "\n", - "features = kd.record(\n", - " {\n", - " \"event_count_total\": events.count(),\n", - " # \"event_count_hourly\": events.count(window=Hourly()),\n", - " \"purchases_total_count\": purchases.count(),\n", - " # \"purchases_today\": purchases.count(window=Since(Daily()),\n", - " # \"revenue_today\": events.col(\"revenue\").sum(window=Since(Daily())),\n", - " \"revenue_total\": events.col(\"revenue\").sum(),\n", - " }\n", - ")\n", - "features.preview()" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "1c315938", - "metadata": {}, - "source": [ - "## At Any Time\n", - "\n", - "A key feature of Kaskada's time-centric design is the ability to query for\n", - "calculation values at any point in time. Traditional query languages (e.g. SQL)\n", - "can only return data that already exists---if we want to return a row of\n", - "computed/aggregated data, we have to compute the row first, then return it. As a\n", - "specific example, suppose we have SQL queries that produce daily aggregations\n", - "over event data, and now we want to have the same aggregations on an hourly\n", - "basis. In SQL, we would need to write new queries for hourly aggregations; the\n", - "queries would look very similar to the daily ones, but they would still be\n", - "different queries.\n", - "\n", - "With Kaskada, we can define the calculations once, and then specify the points\n", - "in time at which we want to know the calculation values when we query them.\n", - "\n", - "In the examples so far, we have used `preview()` to get a DataFrame containing\n", - "some of the rows from the Timestreams we've defined. By default, this produces\n", - "a _history_ containing all the times the result changed. This is useful for\n", - "using past values to create training examples.\n", - "\n", - "We can also execute the query for the values at a specific point in time." - ] - }, - { - "cell_type": "markdown", - "id": "082e174d", - "metadata": { - "tags": [ - "hide-output" - ] - }, - "source": [ - "```\n", - "features.preview(at=\"2022-01-01 22:00\")\n", - "``````" - ] - }, - { - "cell_type": "markdown", - "id": "5a44c5f7", - "metadata": {}, - "source": [ - "You can also compose a query that produces values at specific points in time.\n", - "\n", - "```\n", - "features.when(hourly())\n", - "```\n", - "\n", - "Regardless of the time cadence of the calculations themselves, the query output\n", - "can contain rows for whatever time points you specify. You can define a set of\n", - "daily calculations and then get hourly updates during the day. Or, you can\n", - "publish the definitions of some features in a Python module and different users\n", - "can query those same calculations for hourly, daily, and monthly\n", - "values---without editing the calculation definitions themselves.\n", - "\n", - "## Adding more calculations to the query\n", - "\n", - "We can add two new calculations, also in one line each, representing:\n", - "\n", - "* the time of the user's first event\n", - "* the time of the user's last event\n" - ] - }, - { - "attachments": {}, - "cell_type": "markdown", - "id": "2ba09e77-0fdf-43f4-960b-50a126262ec7", - "metadata": { - "id": "2ba09e77-0fdf-43f4-960b-50a126262ec7" - }, - "source": [ - "This is only a small sample of possible Kaskada queries and capabilities. See\n", - "everything that's possible with [Timestreams](../reference/Timestream/index.qmd)." - ] - } - ], - "metadata": { - "colab": { - "collapsed_sections": [ - "6924ca3e-28b3-4f93-b0cf-5f8afddc11d8", - "936700a9-e042-401c-9156-7bb18652e109", - "08f5921d-36dc-41d1-a2a6-ae800b7a11de" - ], - "private_outputs": true, - "provenance": [] - }, - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.11.4" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/python/docs/examples/time_centric.qmd b/python/docs/examples/time_centric.qmd new file mode 100644 index 000000000..73bc56f5a --- /dev/null +++ b/python/docs/examples/time_centric.qmd @@ -0,0 +1,209 @@ +--- +title: "Time-centric Calculations" +subtitle: "Work with time and produce past training examples and recent results for applying models." +order: 1 +--- + +Kaskada was built to process and perform temporal calculations on event streams, +with real-time analytics and machine learning in mind. It is not exclusively for +real-time applications, but Kaskada excels at time-centric computations and +aggregations on event-based data. + +For example, let's say you're building a user analytics dashboard at an +ecommerce retailer. You have event streams showing all actions the user has +taken, and you'd like to include in the dashboard: + +* the total number of events the user has ever generated +* the total number of purchases the user has made +* the total revenue from the user +* the number of purchases made by the user today +* the total revenue from the user today +* the number of events the user has generated in the past hour + +Because the calculations needed here are a mix of hourly, daily, and over all of +history, more than one type of event aggregation needs to happen. Table-centric +tools like those based on SQL would require multiple JOINs and window functions, +which would be spread over multiple queries or CTEs. + +Kaskada was designed for these types of time-centric calculations, so we can do +each of the calculations in the list in one line: + +```{.python} +record({ + "event_count_total": DemoEvents.count(), + "purchases_total_count": DemoEvents.filter(DemoEvents.col("event_name").eq("purchase")).count(), + "revenue_total": DemoEvents.col("revenue").sum(), + "purchases_daily": DemoEvents.filter(DemoEvents.col("event_name").eq("purchase")).count(window=Daily()), + "revenue_daily": DemoEvents.col("revenue").sum(window=Daily()), + "event_count_hourly": DemoEvents.count(window=Hourly()), +}) +``` + +::: {.callout-warning} +The previous example demonstrates the use of `Daily()` and `Hourly()` windowing which aren't yet part of the new Python library. +::: + +Of course, a few more lines of code are needed to put these calculations to work, +but these six lines are all that is needed to specify the calculations +themselves. Each line may specify: + +* the name of a calculation (e.g. `event_count_total`) +* the input data to start with (e.g. `DemoEvents`) +* selecting event fields (e.g. `DemoEvents.col("revenue")`) +* function calls (e.g. `count()`) +* event filtering (e.g. `filter(DemoEvents.col("event_name").eq("purchase"))`) +* time windows to calculate over (e.g. `window=Daily()`) + +...with consecutive steps chained together in a familiar way. + +Because Kaskada was built for time-centric calculations on event-based data, a +calculation we might describe as "total number of purchase events for the user" +can be defined in Kaskada in roughly the same number of terms as the verbal +description itself. + +Continue through the demo below to find out how it works. + +See [the Kaskada documentation](../guide) for lots more information. + +## Kaskada Client Setup + +``` +%pip install kaskada +``` + +```{python} +import kaskada as kd + +kd.init_session() +``` + +## Example dataset + +For this demo, we'll use a very small example data set, which, for simplicity and portability of this demo notebook, we'll read from a string. + +:::{.callout-note} +For simplicity, instead of a CSV file or other file format we read and then parse data from a CSV string. +You can load your own event data from many common sources, including Pandas DataFrames and Parquet files. +See [sources](../reference/Sources/index.qmd) for more information on the available sources. +::: + +```{python} +import asyncio + +# For demo simplicity, instead of a CSV file, we read and then parse data from a +# CSV string. Kaskada +event_data_string = """ + event_id,event_at,entity_id,event_name,revenue + ev_00001,2022-01-01 22:01:00,user_001,login,0 + ev_00002,2022-01-01 22:05:00,user_001,view_item,0 + ev_00003,2022-01-01 22:20:00,user_001,view_item,0 + ev_00004,2022-01-01 23:10:00,user_001,view_item,0 + ev_00005,2022-01-01 23:20:00,user_001,view_item,0 + ev_00006,2022-01-01 23:40:00,user_001,purchase,12.50 + ev_00007,2022-01-01 23:45:00,user_001,view_item,0 + ev_00008,2022-01-01 23:59:00,user_001,view_item,0 + ev_00009,2022-01-02 05:30:00,user_001,login,0 + ev_00010,2022-01-02 05:35:00,user_001,view_item,0 + ev_00011,2022-01-02 05:45:00,user_001,view_item,0 + ev_00012,2022-01-02 06:10:00,user_001,view_item,0 + ev_00013,2022-01-02 06:15:00,user_001,view_item,0 + ev_00014,2022-01-02 06:25:00,user_001,purchase,25 + ev_00015,2022-01-02 06:30:00,user_001,view_item,0 + ev_00016,2022-01-02 06:31:00,user_001,purchase,5.75 + ev_00017,2022-01-02 07:01:00,user_001,view_item,0 + ev_00018,2022-01-01 22:17:00,user_002,view_item,0 + ev_00019,2022-01-01 22:18:00,user_002,view_item,0 + ev_00020,2022-01-01 22:20:00,user_002,view_item,0 +""" + +events = await kd.sources.CsvString.create( + event_data_string, time_column="event_at", key_column="entity_id" +) + +# Inspect the event data +events.preview() +``` + +## Define queries and calculations + +Kaskada queries are defined in Python, using the [](`kaskada.Timestream`) class. +Sources are Timestreams generally containing [records](../guide/data_types.qmd#record-types). + +Let's do a simple query for events for a specific entity ID. + +```{python} +events.filter(events.col("entity_id").eq("user_002")).preview() +``` + +Beyond querying for events, Kaskada has a powerful syntax for defining +calculations on events, temporally across history. + +The six calculations discussed at the top of this demo notebook are below. + +(Note that some functions return `NaN` if no events for that user have occurred +within the time window.) + +```{python} +purchases = events.filter(events.col("event_name").eq("purchase")) + +features = kd.record( + { + "event_count_total": events.count(), + # "event_count_hourly": events.count(window=Hourly()), + "purchases_total_count": purchases.count(), + # "purchases_today": purchases.count(window=Since(Daily()), + # "revenue_today": events.col("revenue").sum(window=Since(Daily())), + "revenue_total": events.col("revenue").sum(), + } +) +features.preview() +``` + +## At Any Time + +A key feature of Kaskada's time-centric design is the ability to query for +calculation values at any point in time. Traditional query languages (e.g. SQL) +can only return data that already exists---if we want to return a row of +computed/aggregated data, we have to compute the row first, then return it. As a +specific example, suppose we have SQL queries that produce daily aggregations +over event data, and now we want to have the same aggregations on an hourly +basis. In SQL, we would need to write new queries for hourly aggregations; the +queries would look very similar to the daily ones, but they would still be +different queries. + +With Kaskada, we can define the calculations once, and then specify the points +in time at which we want to know the calculation values when we query them. + +In the examples so far, we have used [](`kaskada.Timestream.preview`) to get a DataFrame containing +some of the rows from the Timestreams we've defined. By default, this produces +a _history_ containing all the times the result changed. This is useful for +using past values to create training examples. + +We can also execute the query for the values at a specific point in time. + +```{.python} +features.preview(at="2022-01-01 22:00") +``` + +You can also compose a query that produces values at specific points in time. + +```{.python} +features.when(hourly()) +``` + +Regardless of the time cadence of the calculations themselves, the query output +can contain rows for whatever time points you specify. You can define a set of +daily calculations and then get hourly updates during the day. Or, you can +publish the definitions of some features in a Python module and different users +can query those same calculations for hourly, daily, and monthly +values---without editing the calculation definitions themselves. + +## Adding more calculations to the query + +We can add two new calculations, also in one line each, representing: + +* the time of the user's first event +* the time of the user's last event + +This is only a small sample of possible Kaskada queries and capabilities. See +everything that's possible with [Timestreams](../reference/Timestream/index.qmd). \ No newline at end of file