diff --git a/python/docs/source/conf.py b/python/docs/source/conf.py index 2704bc2c0..159bf4f2c 100644 --- a/python/docs/source/conf.py +++ b/python/docs/source/conf.py @@ -102,6 +102,7 @@ "deflist", "smartquotes", "replacements", + "linkify", ] myst_heading_anchors = 3 diff --git a/python/docs/source/guide/aggregation.md b/python/docs/source/guide/aggregation.md index 25c274233..0c5bb7687 100644 --- a/python/docs/source/guide/aggregation.md +++ b/python/docs/source/guide/aggregation.md @@ -19,7 +19,13 @@ Aggregations may be configured to operate in a specific time window by providing If no window is specified, the aggregation is over all rows for the entity, up to and including the current time. If a window is provided, the result of an aggregation is the result for that entity in the current window up to and including the current time. Aggregations produce cumulative results up to each point in time, so the result at a given point in time may represent an incomplete window. -The current window is often incomplete. + +```{code-block} python +:caption: Cumulative aggregation since the start of the day. +Purchases.sum(window = kd.windows.Since.daily()) +``` + +The [windows reference](../reference/windows.md) has information on the supported kinds of windows. ## Repeated Aggregation @@ -29,6 +35,7 @@ The result of the first aggregation is the same — a sequence of timestamped da Applying an additional aggregation simply aggregates over those times. We can compute the maximum of the average purchase amounts. -``` +```{code-block} python +:caption: Repeated aggregation computing the maximum of the average purchases. Purchases.col("amount").mean().max() ``` \ No newline at end of file diff --git a/python/docs/source/guide/entities.md b/python/docs/source/guide/entities.md index 79f4a9cf9..83eb8cd46 100644 --- a/python/docs/source/guide/entities.md +++ b/python/docs/source/guide/entities.md @@ -1,3 +1,12 @@ +--- +file_format: mystnb +kernelspec: + name: python3 + disply_name: Python 3 +mystnb: + execution_mode: cache +--- + # Entities and Grouping Entities organize data for use in feature engineering. @@ -50,10 +59,48 @@ Entities provide an implicit grouping for the aggregation. When we write `sum(Purchases.amount)` it is an aggregation that returns the sum of purchases made _by each entity_. This is helpful since the _feature vector_ for an entity will depend only on events related to that entity. -```{todo} -Example of grouped streams and aggregation +% The input for this needs to be hidden, not removed. It seems that plotly won't +% render the right height otherwise (possibly because it's not attached to the DOM?). +% We could see if this worked better using a different library such as `bokeh` or if +% there were better options to pass to plotly to avoid the problem. +```{code-cell} +--- +tags: [hide-input] +--- +import kaskada as kd +kd.init_session() +data = "\n".join( + [ + "time,key,m", + "1996-12-19T16:39:57,A,5", + "1996-12-19T17:42:57,B,8", + "1996-12-20T16:39:59,A,17", + "1996-12-23T12:18:57,B,6", + "1996-12-23T16:40:01,A,12", + ] +) +multi_entity = kd.sources.CsvString(data, time_column="time", key_column="key") + +kd.plot.render( + kd.plot.Plot(multi_entity.col("m"), name="m"), + kd.plot.Plot(multi_entity.col("m").sum(), name="sum(m)") +) +``` + + +## Changing Keys + +The key associated with each point may be changed using {py:meth}`kaskada.Timestream.with_key`. +For example, given a stream of purchases associated with each user, we could create a Timestream associated with the purchased item: + +```{code-block} python +:caption: Using with-key to associate purchases with items +purchases_by_user.with_key(purchases_by_user.col("item_id")) ``` +This is particularly useful with the ability to [lookup](joins.md#explicit-lookups) values from associated with other keys. +For instance, we could re-key purchases as shown above to count the total spent on each item (across all users) and then lookup that information for each user's purchases. + ## Joining Joining with the same entity happens automatically. diff --git a/python/docs/source/guide/execution.md b/python/docs/source/guide/execution.md new file mode 100644 index 000000000..8f09f1baa --- /dev/null +++ b/python/docs/source/guide/execution.md @@ -0,0 +1,90 @@ +# Execution + +A [Timestream](./timestreams.md) may be executed and written to a [destination](#destinations) in a variety of ways. +The things to consider when deciding how to execute the Timestream are: + +1. Whether you want the _history_ of points or the _snapshot_ of values for each entity at a given time. +2. Whether you want to run a query _once_ or start a _live_ process continually materializing. +3. Whether you want to limit the output to points in a specific time range (for history) or entities that have changed since a specific time (for snapshots). +4. Whether you want to stop at a given point in time. + +[State](#state) can be used to provide fault-tolerance and allow incremental processing of only new events. + +```{admonition} Preview during development +:class: tip + +While developing queries, it is often useful to view a few rows from the result. +Using {py:meth}`kaskada.Timestream.preview` you can retrieve a small set of rows from the result set as a Pandas DataFrame. +``` + +## History vs. Snapshot + +Executing a Timestream for the history outputs every point in the Timestream. +This means that each entity likely appears in the output multiple times. +This is particularly useful for creating training examples from past data points, or visualizing how the Timestream has changed over time. + +Executing a Timestream for a snapshot produces a value for each entity at a specific point in time. +This means that each entity appears at-most once in the results. +This is useful for maintaining a feature store based on the latest values. + +```{todo} +Expose the configuration for snapshots. +See https://github.com/kaskada-ai/kaskada/issues/719 +``` + +## Query vs. Materialize +Every Timestream may be executed as a single query or used to start a materialization. +Single queries are useful when you want the results for some later batch process, such as fine-tuning a model or populating an in-memory feature store. +Materialization is useful when you want to stream new results out as quickly as possible, such as maintaining an in-memory feature store or reacting to specific conditions. + + +## Changed Since + +Configuring the _changed since time_ lets you control the points or entities included in the output. + +For a historic query, only points occurring after the changed since time are included in the output. +This allows incrementally outputting the entire history to some external store, by repeatedly performing a "changed since" query. + +For a snapshot query, only entities that have changed after this time are included in the output. +This reduces the amount of data written when the past snapshot is already present in the destination. + +```{todo} +Expose the configuration for changed since. +See https://github.com/kaskada-ai/kaskada/issues/719 +``` + +## Up To + +Configuring the _up to time_ lets you control the maximum points output (and in the case of snapshots, the time represented in the snapshot). + +For a historic query, only points occurring before or at the up to time are included in the output. +For a snapshot query, this corresponds to the time at which the snapshot will be taken. + +```{note} +Currently when not specified, the up to time is determined from the maximum event present in the data. +We have plans to change this to a parameter to `run` defaulting to the current time. +``` + +```{todo} +Expose the configuration for up-to. +See https://github.com/kaskada-ai/kaskada/issues/719 +``` + +## State + +Kaskada checkpoints state during and after execution. +This provides fault-tolerance, incremental querying and automatic handling of late-data. + +When a query is executed, Kaskada determines whether it can use any of the available states to reduce the amount of processing needed. +For instance, when producing a snapshot Kaskada can use any persisted state before the earliest new event and before the time to snapshot. +Similarly, when producing a history, Kaskada can use any persisted state before the earliest new event and before the "changed since" time. + +## Destinations + +The {py:class}`kaskada.Result` class provides a variety of ways of retrieving the results within Python. +These include Pandas, PyArrow and Python dictionaries, as well as iterators and async iterators over the same. +This allows you to run the entire retrieve-evaluate-respond loop within a single Python process. + +```{todo} +Expose and document additional destinations such as Parquet, Kafka, Pulsar, etc. +``` \ No newline at end of file diff --git a/python/docs/source/guide/index.md b/python/docs/source/guide/index.md index bbccb4e5c..c268785f5 100644 --- a/python/docs/source/guide/index.md +++ b/python/docs/source/guide/index.md @@ -44,4 +44,5 @@ entities aggregation joins sources +execution ``` \ No newline at end of file diff --git a/python/docs/source/guide/installation.md b/python/docs/source/guide/installation.md index 5882f92f2..71a49043f 100644 --- a/python/docs/source/guide/installation.md +++ b/python/docs/source/guide/installation.md @@ -14,9 +14,7 @@ It will not be installed by default if you `pip install kaskada`. You need to either use `pip install --pre kaskada` or specify a specific version, as shown in the example. ``` -```{admonition} Pip and pip3 and permissions -:class: tip - +```{tip} Depending on you Python installation and configuration you may have `pip3` instead of `pip` available in your terminal. If you do have `pip3` replace pip with `pip3` in your command, i.e., `pip3 install kaskada`. diff --git a/python/docs/source/guide/joins.md b/python/docs/source/guide/joins.md index ed46ab1c9..73d935342 100644 --- a/python/docs/source/guide/joins.md +++ b/python/docs/source/guide/joins.md @@ -1,7 +1,21 @@ # Joins +It is often necessary to use multiple Timestreams to define a query. +Understanding user behavior requires considering their activity across a variety of event streams. +Normalizing user behavior may require looking comparing the per-user values to the average values for the region. +Both of these are accomplished by joining the Timestreams. -## Domains and Implicit Joins +## Implicit Joins +Timestreams associated with the same kind of entity -- for instance, a user -- are automatically joined. +This makes it easy to write queries that consider multiple event streams in a single query. + +```{code-block} python +:caption: Joining two event streams to compute page-views per purchase + +page_views.count() / purchases.count() +``` + +### Domains It is sometimes useful to consider the _domain_ of an expression. This corresponds to the points in time and entities associated with the points in the expression. @@ -12,7 +26,9 @@ Whenever expressions with two (or more) different domains are used in the same e The join is an outer join that contains an event if either (any) of the input domains contained an event. For any input table that is continuous, the join is `as of` the time of the output, taking the latest value from that input. +## Explicit Lookups -## Implicit Joins +Values from associated with other entities may be retrieved using {py:meth}`kaskada.Timestream.lookup`. +`left.lookup(right)` does a left-join, looking up the value from `right` for each computed key in `left`. -## Explicit Lookups \ No newline at end of file +Lookups are _temporally correct_ -- the value retrieved corresponds to the `right` value at the time the key occurred in `left`. \ No newline at end of file diff --git a/python/docs/source/guide/timestreams.md b/python/docs/source/guide/timestreams.md index 77381dcdb..d935e6562 100644 --- a/python/docs/source/guide/timestreams.md +++ b/python/docs/source/guide/timestreams.md @@ -13,9 +13,13 @@ Kaskada is built on the concept of a _Timestream_. Each Timestream is ordered by _time_ and partitioned by _entity_. This makes it easy to focus on events happening over time and how aggregations change. +% The input for this needs to be hidden, not removed. It seems that plotly won't +% render the right height otherwise (possibly because it's not attached to the DOM?). +% We could see if this worked better using a different library such as `bokeh` or if +% there were better options to pass to plotly to avoid the problem. ```{code-cell} --- -tags: [remove-input] +tags: [hide-input] --- import kaskada as kd kd.init_session() @@ -32,7 +36,7 @@ data = "\n".join( multi_entity = kd.sources.CsvString(data, time_column="time", key_column="key") kd.plot.render( - kd.plot.Plot(multi_entity.col("m")) + kd.plot.Plot(multi_entity.col("m"), name="m") ) ``` @@ -50,5 +54,5 @@ For example, the result of aggregating a timestream produces a continuous stream tags: [remove-input] --- kd.plot.render( - kd.plot.Plot(multi_entity.col("m").sum()) + kd.plot.Plot(multi_entity.col("m").sum(), name="sum(m)") ) \ No newline at end of file diff --git a/python/docs/source/quickstart.md b/python/docs/source/quickstart.md index ab89c6120..d3eed43d5 100644 --- a/python/docs/source/quickstart.md +++ b/python/docs/source/quickstart.md @@ -9,10 +9,23 @@ mystnb: # Quick Start -```{todo} +This shows the bare minimum needed to get started with Kaskada. + +## Install + +Install the latest version. +This uses `kaskdaa>=0.6.0-a.0` to ensure the pre-release version is installed. -Write the quick start. ``` +pip install kaskada>=0.6.0-a.0 +``` + +See the [User Guide](./guide/installation.md) to learn more about installing Kaskada. + +## Write a query + +The following Python code imports the Kaskada library, creates a session, and loads some CSV data. +It then runs a query to produce a Pandas DataFrame. ```{code-cell} import kaskada as kd @@ -29,5 +42,9 @@ content = "\n".join( ] ) source = kd.sources.CsvString(content, time_column="time", key_column="key") -source.run().to_pandas() -``` \ No newline at end of file +source.select("m", "n").extend({"sum_m": source.col("m").sum() }).run().to_pandas() +``` + +## Next Steps +See the [User Guide](./guide/index.md) to learn more about Kaskada. +Among other things, it includes information on other kinds of [sources](./guide/sources.md), writing queries over [Timestreams](./guide/timestreams.md), [aggregations and windowing](./guide/aggregation.md) and [execution](./guide/execution.md). \ No newline at end of file diff --git a/python/poetry.lock b/python/poetry.lock index 40122c39e..200ea2092 100644 --- a/python/poetry.lock +++ b/python/poetry.lock @@ -1058,6 +1058,26 @@ traitlets = ">=5.3" docs = ["myst-parser", "sphinx-autodoc-typehints", "sphinxcontrib-github-alt", "sphinxcontrib-spelling", "traitlets"] test = ["ipykernel", "pre-commit", "pytest", "pytest-cov", "pytest-timeout"] +[[package]] +name = "linkify-it-py" +version = "2.0.2" +description = "Links recognition library with FULL unicode support." +optional = false +python-versions = ">=3.7" +files = [ + {file = "linkify-it-py-2.0.2.tar.gz", hash = "sha256:19f3060727842c254c808e99d465c80c49d2c7306788140987a1a7a29b0d6ad2"}, + {file = "linkify_it_py-2.0.2-py3-none-any.whl", hash = "sha256:a3a24428f6c96f27370d7fe61d2ac0be09017be5190d68d8658233171f1b6541"}, +] + +[package.dependencies] +uc-micro-py = "*" + +[package.extras] +benchmark = ["pytest", "pytest-benchmark"] +dev = ["black", "flake8", "isort", "pre-commit", "pyproject-flake8"] +doc = ["myst-parser", "sphinx", "sphinx-book-theme"] +test = ["coverage", "pytest", "pytest-cov"] + [[package]] name = "livereload" version = "2.6.3" @@ -2887,6 +2907,20 @@ files = [ {file = "tzdata-2023.3.tar.gz", hash = "sha256:11ef1e08e54acb0d4f95bdb1be05da659673de4acbd21bf9c69e94cc5e907a3a"}, ] +[[package]] +name = "uc-micro-py" +version = "1.0.2" +description = "Micro subset of unicode data files for linkify-it-py projects." +optional = false +python-versions = ">=3.7" +files = [ + {file = "uc-micro-py-1.0.2.tar.gz", hash = "sha256:30ae2ac9c49f39ac6dce743bd187fcd2b574b16ca095fa74cd9396795c954c54"}, + {file = "uc_micro_py-1.0.2-py3-none-any.whl", hash = "sha256:8c9110c309db9d9e87302e2f4ad2c3152770930d88ab385cd544e7a7e75f3de0"}, +] + +[package.extras] +test = ["coverage", "pytest", "pytest-cov"] + [[package]] name = "urllib3" version = "2.0.4" @@ -2985,4 +3019,4 @@ plot = ["plotly"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<4.0" -content-hash = "207cb0246f166fabdd6f69502dc050c632ffa07497da5220b04bbf54b581818e" +content-hash = "ad7b8f7618fbd17c6e8b3d2946fc2abffd99c06fe45ca5bdab4662bd71c1fdb6" diff --git a/python/pyproject.toml b/python/pyproject.toml index 440369cb2..7d9a432e0 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -90,6 +90,7 @@ myst-parser = {version = ">=0.16.1"} # https://github.com/executablebooks/MyST-NB/issues/530 myst-nb = { git = "https://github.com/executablebooks/MyST-NB.git", rev = "3d6a5d1"} plotly = {version = "^5.16.1"} +linkify-it-py = "^2.0.2" [tool.poetry.group.test] # Dependencies for testing diff --git a/python/pysrc/kaskada/plot.py b/python/pysrc/kaskada/plot.py index 9551f49b9..50006b14f 100644 --- a/python/pysrc/kaskada/plot.py +++ b/python/pysrc/kaskada/plot.py @@ -90,7 +90,7 @@ def render( fig["layout"][f"xaxis{row}"]["title"] = "Time" fig["layout"][f"yaxis{row}"]["title"] = name - fig.update_layout(height=400 * len(args), width=600, title_text=title_text) + fig.update_layout(height=300 * len(args), width=600, title_text=title_text) fig.show()