From 659dcd2a62f4ae53027d7a69470a226938042898 Mon Sep 17 00:00:00 2001
From: Ben Chambers <35960+bjchambers@users.noreply.github.com>
Date: Mon, 28 Aug 2023 13:36:17 -0700
Subject: [PATCH 1/4] docs: Change to Google Doc Style
This produces better rendering in the eventual docs, including using
the type hints to determine the parameter and return types.
---
python/docs/source/conf.py | 7 +
python/docs/source/index.md | 6 +-
.../reference/timestream/aggregation.md | 16 +-
.../source/reference/timestream/arithmetic.md | 2 +
.../source/reference/timestream/collection.md | 4 +-
.../source/reference/timestream/comparison.md | 2 +
.../docs/source/reference/timestream/index.md | 1 +
.../docs/source/reference/timestream/misc.md | 1 +
.../source/reference/timestream/records.md | 3 +-
python/docs/source/reference/windows.md | 1 +
python/poetry.lock | 21 +-
python/pyproject.toml | 1 +
python/pysrc/kaskada/__init__.py | 2 +
python/pysrc/kaskada/_execution.py | 17 +-
python/pysrc/kaskada/_result.py | 82 +-
python/pysrc/kaskada/_session.py | 20 +-
python/pysrc/kaskada/_timestream.py | 1117 +++++------------
python/pysrc/kaskada/plot.py | 15 +-
python/pysrc/kaskada/sources/arrow.py | 191 +--
python/pysrc/kaskada/windows.py | 164 +--
20 files changed, 485 insertions(+), 1188 deletions(-)
diff --git a/python/docs/source/conf.py b/python/docs/source/conf.py
index 6e9f1ad56..b46c5201e 100644
--- a/python/docs/source/conf.py
+++ b/python/docs/source/conf.py
@@ -20,6 +20,7 @@
# "myst_parser",
"myst_nb",
"sphinx_copybutton",
+ "sphinx_autodoc_typehints", # must be after napoleon
"_extensions.gallery_directive",
]
autodoc_typehints = "description"
@@ -110,6 +111,7 @@
# Automatically extract typehints when specified and place them in
# descriptions of the relevant function/method.
autodoc_typehints = "description"
+autodoc_type_aliases = {"kaskada.Arg": "kaskada.Arg"}
# Don't show class signature with the class' name.
autodoc_class_signature = "separated"
@@ -117,5 +119,10 @@
autosummary_generate = True
napoleon_preprocess_types = True
+napoleon_attr_annotations = True
+napoleon_use_rtype = False
+typehints_use_rtype = False
+typehints_document_rtype = False
+typehints_defaults = 'comma'
suppress_warnings = ["mystnb.unknown_mime_type"]
diff --git a/python/docs/source/index.md b/python/docs/source/index.md
index c04b6fb9b..7b089adaa 100644
--- a/python/docs/source/index.md
+++ b/python/docs/source/index.md
@@ -1,7 +1,11 @@
---
hide-toc: true
html_theme.sidebar_secondary.remove: true
+sd_hide_title: true
---
+
+# Real-Time AI without the fuss.
+
@@ -12,7 +16,7 @@ html_theme.sidebar_secondary.remove: true
-# Kaskada completes the Real-Time AI stack, providing...
+## Kaskada completes the Real-Time AI stack, providing...
```{gallery-grid}
:grid-columns: 1 2 2 3
diff --git a/python/docs/source/reference/timestream/aggregation.md b/python/docs/source/reference/timestream/aggregation.md
index 3f5740fce..7d2ff14ee 100644
--- a/python/docs/source/reference/timestream/aggregation.md
+++ b/python/docs/source/reference/timestream/aggregation.md
@@ -2,12 +2,16 @@
Timestream aggregations are:
-Cumulative:
- They reflect all values up to and including the current time.
-Grouped:
- They reflect the values for each entity separately.
-Windowed:
- They reflect the values within a specific [window](../windows.md).
+Cumulative
+: They reflect all values up to and including the current time.
+
+Grouped
+: They reflect the values for each entity separately.
+
+Windowed
+: They reflect the values within a specific [window](../windows.md).
+
+## Aggregation Methods
```{eval-rst}
.. currentmodule:: kaskada
diff --git a/python/docs/source/reference/timestream/arithmetic.md b/python/docs/source/reference/timestream/arithmetic.md
index 669b7c810..8bc527b7e 100644
--- a/python/docs/source/reference/timestream/arithmetic.md
+++ b/python/docs/source/reference/timestream/arithmetic.md
@@ -8,6 +8,8 @@ For instance, `a.add(b)` may be written as `a + b`.
See the notes on the specific functions for more information.
```
+## Arithmetic Methods
+
```{eval-rst}
.. currentmodule:: kaskada
diff --git a/python/docs/source/reference/timestream/collection.md b/python/docs/source/reference/timestream/collection.md
index 99e579692..e9f9f177c 100644
--- a/python/docs/source/reference/timestream/collection.md
+++ b/python/docs/source/reference/timestream/collection.md
@@ -1,7 +1,9 @@
-# Arithmetic
+# Collection
Timestreams allow each point to contain a collection -- a `list` or `map` -- of elements.
+## Collection Methods
+
```{eval-rst}
.. currentmodule:: kaskada
diff --git a/python/docs/source/reference/timestream/comparison.md b/python/docs/source/reference/timestream/comparison.md
index d25e3d28f..bdf014ba9 100644
--- a/python/docs/source/reference/timestream/comparison.md
+++ b/python/docs/source/reference/timestream/comparison.md
@@ -10,6 +10,8 @@ See the notes on the specific functions for more information.
To respect the semantics of `__eq__` and `__ne__`, `a == b` and `a != b` are *not* overloaded.
```
+## Comparison Methods
+
```{eval-rst}
.. currentmodule:: kaskada
diff --git a/python/docs/source/reference/timestream/index.md b/python/docs/source/reference/timestream/index.md
index 93dbf9ce9..5e861cacb 100644
--- a/python/docs/source/reference/timestream/index.md
+++ b/python/docs/source/reference/timestream/index.md
@@ -12,6 +12,7 @@ html_theme.sidebar_secondary.remove:
:exclude-members: __init__
.. autoproperty:: data_type
+.. autoclass:: kaskada.Arg
```
```{toctree}
diff --git a/python/docs/source/reference/timestream/misc.md b/python/docs/source/reference/timestream/misc.md
index c3d6b6434..482f89826 100644
--- a/python/docs/source/reference/timestream/misc.md
+++ b/python/docs/source/reference/timestream/misc.md
@@ -13,4 +13,5 @@
Timestream.if_
Timestream.lag
Timestream.null_if
+ Timestream.pipe
```
\ No newline at end of file
diff --git a/python/docs/source/reference/timestream/records.md b/python/docs/source/reference/timestream/records.md
index 7fe5b10c1..24a2b4743 100644
--- a/python/docs/source/reference/timestream/records.md
+++ b/python/docs/source/reference/timestream/records.md
@@ -1,7 +1,8 @@
# Records
Record operations create, extract or manipulate Timestreams of records.
-Comparison operations produce boolean Timestreams.
+
+## Record Methods
```{eval-rst}
.. currentmodule:: kaskada
diff --git a/python/docs/source/reference/windows.md b/python/docs/source/reference/windows.md
index 007f26a12..d14513a98 100644
--- a/python/docs/source/reference/windows.md
+++ b/python/docs/source/reference/windows.md
@@ -6,6 +6,7 @@
.. autosummary::
:toctree: apidocs/windows/
+ Window
Since
Sliding
Trailing
diff --git a/python/poetry.lock b/python/poetry.lock
index 8d6ac9ebd..e2e69d81c 100644
--- a/python/poetry.lock
+++ b/python/poetry.lock
@@ -2484,6 +2484,25 @@ sphinx = "*"
[package.extras]
test = ["pytest", "pytest-cov"]
+[[package]]
+name = "sphinx-autodoc-typehints"
+version = "1.23.0"
+description = "Type hints (PEP 484) support for the Sphinx autodoc extension"
+optional = false
+python-versions = ">=3.7"
+files = [
+ {file = "sphinx_autodoc_typehints-1.23.0-py3-none-any.whl", hash = "sha256:ac099057e66b09e51b698058ba7dd76e57e1fe696cd91b54e121d3dad188f91d"},
+ {file = "sphinx_autodoc_typehints-1.23.0.tar.gz", hash = "sha256:5d44e2996633cdada499b6d27a496ddf9dbc95dd1f0f09f7b37940249e61f6e9"},
+]
+
+[package.dependencies]
+sphinx = ">=5.3"
+
+[package.extras]
+docs = ["furo (>=2022.12.7)", "sphinx (>=6.1.3)", "sphinx-autodoc-typehints (>=1.23.4)"]
+testing = ["covdefaults (>=2.2.2)", "coverage (>=7.2.2)", "diff-cover (>=7.5)", "nptyping (>=2.5)", "pytest (>=7.2.2)", "pytest-cov (>=4)", "sphobjinv (>=2.3.1)", "typing-extensions (>=4.5)"]
+type-comment = ["typed-ast (>=1.5.4)"]
+
[[package]]
name = "sphinx-book-theme"
version = "1.0.1"
@@ -2977,4 +2996,4 @@ plot = ["plotly"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.8.1,<4.0"
-content-hash = "855debf781d9b061faaf6481cce7e377bcf9416bfeecd9411c104a119d2e66b7"
+content-hash = "e4cda7b9883ae598e5e16e2426dac30292ed867520937da9e1c867b76eadda52"
diff --git a/python/pyproject.toml b/python/pyproject.toml
index 7e2625579..4263f924a 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -81,6 +81,7 @@ optional = true
[tool.poetry.group.docs.dependencies]
sphinx = ">=6.0.0"
sphinx-autobuild = ">=2021.3.14"
+sphinx-autodoc-typehints = ">=1.23.0"
sphinx-book-theme = "^1.0.1"
sphinx-copybutton = "^0.5.2"
sphinx-design = "^0.5.0"
diff --git a/python/pysrc/kaskada/__init__.py b/python/pysrc/kaskada/__init__.py
index bc5121cc2..2fc4eda1b 100644
--- a/python/pysrc/kaskada/__init__.py
+++ b/python/pysrc/kaskada/__init__.py
@@ -7,6 +7,7 @@
from ._execution import ExecutionOptions
from ._result import Result
from ._session import init_session
+from ._timestream import Arg
from ._timestream import Literal
from ._timestream import Timestream
from ._timestream import record
@@ -14,6 +15,7 @@
__all__ = [
+ "Arg",
"ExecutionOptions",
"init_session",
"Literal",
diff --git a/python/pysrc/kaskada/_execution.py b/python/pysrc/kaskada/_execution.py
index 4829264dd..6fde445d4 100644
--- a/python/pysrc/kaskada/_execution.py
+++ b/python/pysrc/kaskada/_execution.py
@@ -6,18 +6,11 @@
class ExecutionOptions:
"""Execution options for a query.
- Attributes
- ----------
- row_limit : Optional[int]
- The maximum number of rows to return.
- If not specified, all rows are returned.
-
- max_batch_size : Optional[int]
- The maximum batch size to use when returning results.
- If not specified, the default batch size will be used.
-
- materialize : bool
- If true, the query will be a continuous materialization.
+ Attributes:
+ row_limit: The maximum number of rows to return. If not specified, all rows are returned.
+ max_batch_size: The maximum batch size to use when returning results.
+ If not specified, the default batch size will be used.
+ materialize: If true, the query will be a continuous materialization.
"""
row_limit: Optional[int] = None
diff --git a/python/pysrc/kaskada/_result.py b/python/pysrc/kaskada/_result.py
index eea4e04ac..c5a138141 100644
--- a/python/pysrc/kaskada/_result.py
+++ b/python/pysrc/kaskada/_result.py
@@ -17,34 +17,26 @@ def __init__(self, ffi_execution: _ffi.Execution) -> None:
self._ffi_execution = ffi_execution
def to_pandas(self) -> pd.DataFrame:
- """
- Convert the result to a Pandas DataFrame.
+ """Convert the result to a Pandas DataFrame.
- Returns
- -------
- pd.DataFrame
+ Returns:
The result as a Pandas DataFrame.
- Warnings
- --------
- This method will block on the complete results of the query and collect
- all results into memory. If this is not desired, use `iter_pandas` instead.
+ Warnings:
+ This method will block on the complete results of the query and collect
+ all results into memory. If this is not desired, use `iter_pandas` instead.
"""
return self.to_pyarrow().to_pandas()
def to_pyarrow(self) -> pa.Table:
- """
- Convert the result to a PyArrow Table.
+ """Convert the result to a PyArrow Table.
- Returns
- -------
- pa.Table
+ Returns:
The result as a PyArrow Table.
- Warnings
- --------
- This method will block on the complete results of the query and collect
- all results into memory. If this is not desired, use `iter_pyarrow` instead.
+ Warnings:
+ This method will block on the complete results of the query and collect
+ all results into memory. If this is not desired, use `iter_pyarrow` instead.
"""
batches = self._ffi_execution.collect_pyarrow()
if len(batches) == 0:
@@ -55,14 +47,7 @@ def to_pyarrow(self) -> pa.Table:
return table
def iter_pyarrow(self) -> Iterator[pa.RecordBatch]:
- """
- Iterate over the results as PyArrow RecordBatches.
-
- Yields
- ------
- pa.RecordBatch
- The next RecordBatch.
- """
+ """Yield the results as PyArrow RecordBatches."""
next_batch = self._ffi_execution.next_pyarrow()
while next_batch is not None:
# Annoyingly, PyArrow doesn't suport `drop_columns` on batches.
@@ -75,39 +60,18 @@ def iter_pyarrow(self) -> Iterator[pa.RecordBatch]:
next_batch = self._ffi_execution.next_pyarrow()
def iter_pandas(self) -> Iterator[pd.DataFrame]:
- """
- Iterate over the results as Pandas DataFrames.
-
- Yields
- ------
- pd.DataFrame
- The next Pandas DataFrame.
- """
+ """Yield the resulting Pandas DataFrames."""
for batch in self.iter_pyarrow():
yield batch.to_pandas()
def iter_rows(self) -> Iterator[dict]:
- """
- Iterate over the results as row dictionaries.
-
- Yields
- ------
- dict
- The next row as a dictionary.
- """
+ """Yield the resulting rows as dictionaries."""
for batch in self.iter_pyarrow():
for row in batch.to_pylist():
yield row
async def iter_pyarrow_async(self) -> AsyncIterator[pa.RecordBatch]:
- """
- Asynchronously iterate over the results as PyArrow RecordBatches.
-
- Yields
- ------
- pa.RecordBatch
- The next RecordBatch.
- """
+ """Yield the resulting PyArrow RecordBatches asynchronously."""
next_batch = await self._ffi_execution.next_pyarrow_async()
while next_batch is not None:
# Annoyingly, PyArrow doesn't suport `drop_columns` on batches.
@@ -120,26 +84,12 @@ async def iter_pyarrow_async(self) -> AsyncIterator[pa.RecordBatch]:
next_batch = await self._ffi_execution.next_pyarrow_async()
async def iter_pandas_async(self) -> AsyncIterator[pd.DataFrame]:
- """
- Asynchronously iterate over the results as Pandas DataFrames.
-
- Yields
- ------
- pd.DataFrame
- The next Pandas DataFrame.
- """
+ """Yield the resulting Pandas DataFrames asynchronously."""
async for batch in self.iter_pyarrow_async():
yield batch.to_pandas()
async def iter_rows_async(self) -> AsyncIterator[dict]:
- """
- Asycnchronously iterate over the results as row dictionaries.
-
- Yields
- ------
- dict
- The next row as a dictionary.
- """
+ """Yield the resulting row dictionaries asynchronously."""
async for batch in self.iter_pyarrow_async():
for row in batch.to_pylist():
yield row
diff --git a/python/pysrc/kaskada/_session.py b/python/pysrc/kaskada/_session.py
index 8cfa0b970..2018af9ce 100644
--- a/python/pysrc/kaskada/_session.py
+++ b/python/pysrc/kaskada/_session.py
@@ -9,16 +9,13 @@
def init_session() -> None:
- """
- Initialize the Kaskada session for this Python process.
+ """Initialize the Kaskada session for this Python process.
This must only be called once per session. It must be called before
any other Kaskada functions are called.
- Raises
- ------
- RuntimeError
- If the session has already been initialized.
+ Raises:
+ RuntimeError: If the session has already been initialized.
"""
global _SESSION
if _SESSION is not None:
@@ -30,15 +27,10 @@ def _get_session() -> _ffi.Session:
"""
Assert that the session has been initialized.
- Returns
- -------
- _ffi.Session
- The FFI session handle.
+ Returns: The FFI session handle.
- Raises
- ------
- AssertionError
- If the session has not been initialized.
+ Raises:
+ AssertionError: If the session has not been initialized.
"""
global _SESSION
assert _SESSION is not None, "Session has not been initialized"
diff --git a/python/pysrc/kaskada/_timestream.py b/python/pysrc/kaskada/_timestream.py
index fc9c92bce..ba5c8496d 100644
--- a/python/pysrc/kaskada/_timestream.py
+++ b/python/pysrc/kaskada/_timestream.py
@@ -71,32 +71,22 @@ def _call(
*args: Union[Timestream, Literal],
session: Optional[_ffi.Session] = None,
) -> Timestream:
- """
- Construct a new Timestream by calling the given function.
-
- Parameters
- ----------
- func : str
- Name of the function to apply.
- *args : Timestream | int | str | float | bool | None
- List of arguments to the expression.
- session : FFI Session
- FFI Session to create the expression in.
- If unspecified, will infer from the arguments.
- Will fail if all arguments are literals and the session is not provided.
-
- Returns
- -------
- Timestream
+ """Construct a new Timestream by calling the given function.
+
+ Args:
+ func: Name of the function to apply.
+ *args: List of arguments to the expression.
+ session: FFI Session to create the expression in.
+ If unspecified, will infer from the arguments.
+ Will fail if all arguments are literals and the session is not provided.
+
+ Returns:
Timestream representing the result of the function applied to the arguments.
- Raises
- ------
- # noqa: DAR401 _augment_error
- TypeError
- If the argument types are invalid for the given function.
- ValueError
- If the argument values are invalid for the given function.
+ Raises:
+ # noqa: DAR401 _augment_error
+ TypeError: If the argument types are invalid for the given function.
+ ValueError: If the argument values are invalid for the given function.
"""
if session is None:
session = next(
@@ -146,56 +136,45 @@ def pipe(
*args: Union[Timestream, Literal],
**kwargs: Union[Timestream, Literal],
) -> Timestream:
- """
- Apply chainable functions that produce Timestreams.
-
- Parameters
- ----------
- func : Callable[..., Timestream] | Tuple[Callable[..., Timestream], str]
- Function to apply to this Timestream. Alternatively a `(func,
- keyword)` tuple where `keyword` is a string indicating the keyword
- of `func` that expects the Timestream.
- args : iterable, optional
- Positional arguments passed into ``func``.
- kwargs : mapping, optional
- A dictionary of keyword arguments passed into ``func``.
-
- Returns
- -------
- Timestream
+ """Apply chainable functions that produce Timestreams.
+
+ Args:
+ func: Function to apply to this Timestream.
+ Alternatively a `(func, keyword)` tuple where `keyword` is a string
+ indicating the keyword of `func` that expects the Timestream.
+ *args: Positional arguments passed into ``func``.
+ **kwargs: A dictionary of keyword arguments passed into ``func``.
+
+ Returns:
The result of applying `func` to the arguments.
- Raises
- ------
- ValueError
- When using `self` with a specific `keyword` if the `keyword` also
- appears on in the `kwargs`.
+ Raises:
+ ValueError: When using `self` with a specific `keyword` if the `keyword` also
+ appears on in the `kwargs`.
- Notes
- -----
- Use ``.pipe`` when chaining together functions that expect Timestreams.
+ Notes:
+ Use ``.pipe`` when chaining together functions that expect Timestreams.
- Examples
- --------
- Instead of writing
+ Examples:
+ Instead of writing
- >>> func(g(h(df), arg1=a), arg2=b, arg3=c) # doctest: +SKIP
+ >>> func(g(h(df), arg1=a), arg2=b, arg3=c) # doctest: +SKIP
- You can write
+ You can write
- >>> (df.pipe(h)
- >>> .pipe(g, arg1=a)
- >>> .pipe(func, arg2=b, arg3=c)
- >>> ) # doctest: +SKIP
+ >>> (df.pipe(h)
+ >>> .pipe(g, arg1=a)
+ >>> .pipe(func, arg2=b, arg3=c)
+ >>> ) # doctest: +SKIP
- If you have a function that takes the data as (say) the second
- argument, pass a tuple indicating which keyword expects the
- data. For example, suppose ``func`` takes its data as ``arg2``:
+ If you have a function that takes the data as (say) the second
+ argument, pass a tuple indicating which keyword expects the
+ data. For example, suppose ``func`` takes its data as ``arg2``:
- >>> (df.pipe(h)
- >>> .pipe(g, arg1=a)
- >>> .pipe((func, 'arg2'), arg1=a, arg3=c)
- >>> ) # doctest: +SKIP
+ >>> (df.pipe(h)
+ >>> .pipe(g, arg1=a)
+ >>> .pipe((func, 'arg2'), arg1=a, arg3=c)
+ >>> ) # doctest: +SKIP
"""
if isinstance(func, tuple):
func, target = func
@@ -207,23 +186,14 @@ def pipe(
else:
return func(self, *args, **kwargs)
- def add(self, rhs: Union[Timestream, Literal]) -> Timestream:
- """
- Create a Timestream adding this and `rhs`.
-
- Parameters
- ----------
- rhs : Union[Timestream, Literal]
- The Timestream or literal value to add to this.
+ def add(self, rhs: Arg) -> Timestream:
+ """Return a Timestream adding this and `rhs`.
- Returns
- -------
- Timestream
- The Timestream resulting from `self + rhs`.
+ Args:
+ rhs: The Timestream or literal value to add to this.
- Notes
- -----
- You can also write `a.add(b)` as `a + b`.
+ Notes:
+ You can also write `a.add(b)` as `a + b`.
"""
if isinstance(rhs, timedelta):
# Right now, we can't convert a time delta directly to a scalar value (literal).
@@ -249,14 +219,7 @@ def __radd__(self, lhs: Union[Timestream, Literal]) -> Timestream:
return lhs.add(self)
def ceil(self) -> Timestream:
- """
- Create a Timestream for the number rounded up to the next largest integer.
-
- Returns
- -------
- Timestream
- The Timestream resulting from the numeric ceiling of this.
- """
+ """Return a Timestream rounding self up to the next largest integer."""
return Timestream._call("ceil", self)
def clamp(
@@ -264,40 +227,22 @@ def clamp(
min: Union[Timestream, Literal, None] = None,
max: Union[Timestream, Literal, None] = None,
) -> Timestream:
- """
- Create a Timestream clamped between the bounds of min and max.
-
- Parameters
- ----------
- min : Union[Timestream, Literal, None]
- The literal value to set as the lower bound
- max : Union[Timestream, Literal, None]
- The literal value to set as the upper bound
-
- Returns
- -------
- Timestream
- The Timestream resulting from the clamped bounds between min and max.
+ """Return a Timestream from `self` clamped between `min` and `max`.
+
+ Args:
+ min: The literal value to set as the lower bound.
+ max: The literal value to set as the upper bound.
"""
return Timestream._call("clamp", self, min, max)
def sub(self, rhs: Union[Timestream, Literal]) -> Timestream:
- """
- Create a Timestream substracting `rhs` from this.
-
- Parameters
- ----------
- rhs : Union[Timestream, Literal]
- The Timestream or literal value to subtract from this.
+ """Return a Timestream subtracting `rhs` from this.
- Returns
- -------
- Timestream
- The Timestream resulting from `self - rhs`.
+ Args:
+ rhs: The Timestream or literal value to subtract from this.
- Notes
- -----
- You can also write `a.sub(b)` as `a - b`.
+ Notes:
+ You can also write `a.sub(b)` as `a - b`.
"""
return Timestream._call("sub", self, rhs)
@@ -312,44 +257,21 @@ def __rsub__(self, lhs: Union[Timestream, Literal]) -> Timestream:
return lhs.sub(self)
def exp(self) -> Timestream:
- """
- Create a Timestream raising `e` to the power of this.
-
- Returns
- -------
- Timestream
- The Timestream resulting from `e^this`.
- """
+ """Return a Timestream raising `e` to the power of `self`."""
return Timestream._call("exp", self)
def floor(self) -> Timestream:
- """
- Create a Timestream of the values rounded down to the nearest integer.
-
- Returns
- -------
- Timestream
- The Timestream resulting from the numeric floor of this.
- """
+ """Return a Timestream of self rounded down to the nearest integer."""
return Timestream._call("floor", self)
def mul(self, rhs: Union[Timestream, Literal]) -> Timestream:
- """
- Create a Timestream multiplying this and `rhs`.
-
- Parameters
- ----------
- rhs : Union[Timestream, Literal]
- The Timestream or literal value to multiply with this.
+ """Return a Timestream multiplying this and `rhs`.
- Returns
- -------
- Timestream
- The Timestream resulting from `self * rhs`.
+ Args:
+ rhs: The Timestream or literal value to multiply with this.
- Notes
- -----
- You can also write `a.mul(b)` as `a * b`.
+ Notes:
+ You can also write `a.mul(b)` as `a * b`.
"""
return Timestream._call("mul", self, rhs)
@@ -364,37 +286,21 @@ def __rmul__(self, lhs: Union[Timestream, Literal]) -> Timestream:
return lhs.mul(self)
def powf(self, power: Union[Timestream, Literal]) -> Timestream:
- """
- Create a Timestream raising `this` to the power of `power`.
-
- Parameters
- ----------
- power : Union[Timestream, Literal]
- The Timestream or literal value to raise this by.
+ """Return a Timestream raising `self` to the power of `power`.
- Returns
- -------
- Timestream: The Timestream resulting from `self ^ power`.
+ Args:
+ power: The Timestream or literal value to raise `self` to.
"""
return Timestream._call("powf", self, power)
def div(self, divisor: Union[Timestream, Literal]) -> Timestream:
- """
- Create a Timestream by dividing this and `divisor`.
-
- Parameters
- ----------
- divisor : Union[Timestream, Literal]
- The Timestream or literal value to divide this by.
+ """Return a Timestream by dividing this and `divisor`.
- Returns
- -------
- Timestream
- The Timestream resulting from `self / divisor`.
+ Args:
+ divisor: The Timestream or literal value to divide this by.
- Notes
- -----
- You can also write `a.div(b)` as `a / b`.
+ Notes:
+ You can also write `a.div(b)` as `a / b`.
"""
return Timestream._call("div", self, divisor)
@@ -409,22 +315,13 @@ def __rtruediv__(self, dividend: Union[Timestream, Literal]) -> Timestream:
return dividend.div(self)
def lt(self, rhs: Union[Timestream, Literal]) -> Timestream:
- """
- Create a Timestream that is true if this is less than `rhs`.
-
- Parameters
- ----------
- rhs : Union[Timestream, Literal]
- The Timestream or literal value to compare to.
+ """Return a Timestream that is true if this is less than `rhs`.
- Returns
- -------
- Timestream
- The Timestream resulting from `self < rhs`.
+ Args:
+ rhs: The Timestream or literal value to compare to.
- Notes
- -----
- You can also write `a.lt(b)` as `a < b`.
+ Notes:
+ You can also write `a.lt(b)` as `a < b`.
"""
return Timestream._call("lt", self, rhs)
@@ -433,22 +330,13 @@ def __lt__(self, rhs: Union[Timestream, Literal]) -> Timestream:
return self.lt(rhs)
def le(self, rhs: Union[Timestream, Literal]) -> Timestream:
- """
- Create a Timestream that is true if this is less than or equal to `rhs`.
-
- Parameters
- ----------
- rhs : Union[Timestream, Literal]
- The Timestream or literal value to compare to.
+ """Return a Timestream that is true if this is less than or equal to `rhs`.
- Returns
- -------
- Timestream
- The Timestream resulting from `self <= rhs`.
+ Args:
+ rhs: The Timestream or literal value to compare to.
- Notes
- -----
- You can also write `a.le(b)` as `a <= b`.
+ Notes:
+ You can also write `a.le(b)` as `a <= b`.
"""
return Timestream._call("lte", self, rhs)
@@ -457,22 +345,13 @@ def __le__(self, rhs: Union[Timestream, Literal]) -> Timestream:
return self.le(rhs)
def gt(self, rhs: Union[Timestream, Literal]) -> Timestream:
- """
- Create a Timestream that is true if this is greater than `rhs`.
-
- Parameters
- ----------
- rhs : Union[Timestream, Literal]
- The Timestream or literal value to compare to.
+ """Return a Timestream that is true if this is greater than `rhs`.
- Returns
- -------
- Timestream
- The Timestream resulting from `self > rhs`.
+ Args:
+ rhs: The Timestream or literal value to compare to.
- Notes
- -----
- You can also write `a.gt(b)` as `a > b`.
+ Notes:
+ You can also write `a.gt(b)` as `a > b`.
"""
return Timestream._call("gt", self, rhs)
@@ -481,22 +360,13 @@ def __gt__(self, rhs: Union[Timestream, Literal]) -> Timestream:
return self.gt(rhs)
def ge(self, rhs: Union[Timestream, Literal]) -> Timestream:
- """
- Create a Timestream that is true if this is greater than or equal to `rhs`.
-
- Parameters
- ----------
- rhs : Union[Timestream, Literal]
- The Timestream or literal value to compare to.
+ """Return a TimeStream that is true if this is greater than or equal to `rhs`.
- Returns
- -------
- Timestream
- The Timestream resulting from `self >= rhs`.
+ Args:
+ rhs: The Timestream or literal value to compare to.
- Notes
- -----
- You can also write `a.ge(b)` as `a >= b`.
+ Notes:
+ You can also write `a.ge(b)` as `a >= b`.
"""
return Timestream._call("gte", self, rhs)
@@ -505,85 +375,44 @@ def __ge__(self, rhs: Union[Timestream, Literal]) -> Timestream:
return self.ge(rhs)
def and_(self, rhs: Union[Timestream, Literal]) -> Timestream:
- """
- Create the logical conjunction of this Timestream and `rhs`.
+ """Return the logical conjunction of this Timestream and `rhs`.
- Parameters
- ----------
- rhs : Union[Timestream, Literal]
- The Timestream or literal value to conjoin with.
-
- Returns
- -------
- Timestream
- The Timestream resulting from `self and rhs`.
+ Args:
+ rhs: The Timestream or literal value to conjoin with.
"""
return Timestream._call("logical_and", self, rhs)
def or_(self, rhs: Union[Timestream, Literal]) -> Timestream:
- """
- Create the logical disjunction of this Timestream and `rhs`.
+ """Return the logical disjunction of this Timestream and `rhs`.
- Parameters
- ----------
- rhs : Union[Timestream, Literal]
- The Timestream or literal value to disjoin with.
-
- Returns
- -------
- Timestream
- The Timestream resulting from `self or rhs`.
+ Args:
+ rhs: The Timestream or literal value to disjoin with.
"""
return Timestream._call("logical_or", self, rhs)
def not_(self) -> Timestream:
- """
- Create the logical negation of this Timestream.
-
- Returns
- -------
- Timestream
- The Timestream resulting from `not self`.
- """
+ """Return the logical negation of this Timestream."""
return Timestream._call("not", self)
def eq(self, other: Union[Timestream, Literal]) -> Timestream:
- """
- Create a Timestream that is true if this is equal to `other`.
-
- Parameters
- ----------
- other : Union[Timestream, Literal]
- The Timestream or literal value to compare to.
+ """Return a Timestream that is true if this is equal to `other`.
- Returns
- -------
- Timestream
- The Timestream indicating whether the `self` and `other` are equal.
+ Args:
+ other: The Timestream or literal value to compare to.
- Note
- ----
- Equality is *not* available as `a == b`.
+ Note:
+ Equality is *not* available as `a == b`.
"""
return Timestream._call("eq", self, other)
def ne(self, other: Union[Timestream, Literal]) -> Timestream:
- """
- Create a Timestream that is true if this is not equal to `other`.
-
- Parameters
- ----------
- other : Union[Timestream, Literal]
- The Timestream or literal value to compare to.
+ """Return a Timestream that is true if this is not equal to `other`.
- Returns
- -------
- Timestream
- The Timestream indicating whether `self` and `other` are not equal.
+ Args:
+ other: The Timestream or literal value to compare to.
- Note
- ----
- Inequality is *not* available as `a != b`.
+ Note:
+ Inequality is *not* available as `a != b`.
"""
return Timestream._call("neq", self, other)
@@ -604,31 +433,23 @@ def __ne__(self, other: object) -> bool:
return super().__ne__(other)
def index(self, key: Union[Timestream, Literal]) -> Timestream:
- """
- Index into the elements of a Timestream.
+ """Return a Timestream indexing into the elements of `self`.
If the Timestream contains lists, the key should be an integer index.
If the Timestream contains maps, the key should be the same type as the map keys.
- Parameters
- ----------
- key : Union[Timestream, Literal]
- The key to index into the expression.
+ Args:
+ key: The key to index into the expression.
- Raises
- ------
- TypeError
- When the Timestream is not a record, list, or map.
+ Raises:
+ TypeError: When the Timestream is not a record, list, or map.
- Returns
- -------
- Timestream
+ Returns:
Timestream with the resulting value (or `null` if absent) at each point.
- Note
- ----
- Indexing may be written using the operator `self[key]` instead of `self.index(key)`.
+ Note:
+ Indexing may be written using the operator `self[key]` instead of `self.index(key)`.
"""
data_type = self.data_type
if isinstance(data_type, pa.MapType):
@@ -639,43 +460,21 @@ def index(self, key: Union[Timestream, Literal]) -> Timestream:
raise TypeError(f"Cannot index into {data_type}")
def __getitem__(self, key: Union[Timestream, Literal]) -> Timestream:
- """
- Index into a list or map Timestrem.
+ """Implement `self[key]` using `index`.
- Parameters
- ----------
- key : Union[Timestream, Literal]
- The key to index into the expression.
-
- Returns
- -------
- Timestream
- Timestream with the resulting value (or `null` if absent) at each point.
-
- See Also
- --------
- index
+ See Also:
+ index
"""
return self.index(key)
def col(self, name: str) -> Timestream:
- """
- Access a named column or field of a Timestream.
+ """Return a Timestream accessing the named column or field of `self`.
- Parameters
- ----------
- name : str
- The name of the column or field to access.
-
- Returns
- -------
- Timestream
- Timestream with the resulting value (or `null` if absent) at each point.
+ Args:
+ name: The name of the column or field to access.
- Raises
- ------
- TypeError
- When the Timestream is not a record.
+ Raises:
+ TypeError: When the Timestream is not a record.
"""
data_type = self.data_type
if isinstance(data_type, pa.StructType) or isinstance(data_type, pa.ListType):
@@ -686,55 +485,31 @@ def col(self, name: str) -> Timestream:
)
def select(self, *args: str) -> Timestream:
- """
- Select the given fields from a Timestream of records.
-
- Parameters
- ----------
- args : list[str]
- List of field names to select.
+ """Return a Timestream selecting the given fields from `self`.
- Returns
- -------
- Timestream
- Timestream with the same records limited to the specified fields.
+ Args:
+ *args: The field names to select.
"""
return Timestream._call("select_fields", self, *args)
def remove(self, *args: str) -> Timestream:
- """
- Remove the given fileds from a Timestream of records.
-
- Parameters
- ----------
- args : list[str]
- List of field names to exclude.
+ """Return a Timestream removing the given fields from `self`.
- Returns
- -------
- Timestream
- Timestream with the same records and the given fields excluded.
+ Args:
+ *args: The field names to remove.
"""
return Timestream._call("remove_fields", self, *args)
def extend(
self, fields: Mapping[str, Arg] | Callable[[Timestream], Mapping[str, Arg]]
) -> Timestream:
- """
- Extend this Timestream of records with additional fields.
+ """Return a Timestream containing fields from `self` and `fields`.
If a field exists in the base Timestream and the `fields`, the value
from the `fields` will be taken.
- Parameters
- ----------
- fields : Mapping[str, Arg] | Callable[[Timestream], Mapping[str, Arg]]
- Fields to add to each record in the Timestream.
-
- Returns
- -------
- Timestream
- Timestream with the given fields added.
+ Args:
+ fields: Fields to add to each record in the Timestream.
"""
# This argument order is weird, and we shouldn't need to make a record
# in order to do the extension.
@@ -744,51 +519,22 @@ def extend(
return Timestream._call("extend_record", extension, self)
def neg(self) -> Timestream:
- """
- Create a Timestream from the numeric negation of self.
-
- Returns
- -------
- Timestream
- Timestream of the numeric negation of self.
- """
+ """Return a Timestream from the numeric negation of self."""
return Timestream._call("neg", self)
def is_null(self) -> Timestream:
- """
- Create a boolean Timestream containing `true` when self is `null`.
-
- Returns
- -------
- Timestream
- Timestream with `true` when self is `null` and `false` when it isn't.
- """
+ """Return a boolean Timestream containing `true` when `self` is `null`."""
return self.is_not_null().not_()
def is_not_null(self) -> Timestream:
- """
- Create a boolean Timestream containing `true` when self is not `null`.
-
- Returns
- -------
- Timestream
- Timestream with `true` when self is not `null` and `false` when it is.
- """
+ """Return a boolean Timestream containing `true` when `self` is not `null`."""
return Timestream._call("is_valid", self)
def filter(self, condition: Timestream) -> Timestream:
- """
- Create a Timestream containing only the points where `condition` is `true`.
-
- Parameters
- ----------
- condition : Timestream
- The condition to filter on.
+ """Return a Timestream containing only the points where `condition` is `true`.
- Returns
- -------
- Timestream
- Timestream containing `self` where `condition` is `true`.
+ Args:
+ condition: The condition to filter on.
"""
return Timestream._call("when", condition, self)
@@ -799,27 +545,20 @@ def collect(
min: Optional[int] = 0,
window: Optional[kd.windows.Window] = None,
) -> Timestream:
- """
- Create a Timestream collecting up to the last `max` values in the `window`.
+ """Return a Timestream collecting up to the last `max` values in the `window`.
Collects the values for each key separately.
- Parameters
- ----------
- max : Optional[int]
- The maximum number of values to collect.
- If `None` all values are collected.
- min: Optional[int]
- The minimum number of values to collect before
- producing a value. Defaults to 0.
- window : Optional[Window]
- The window to use for the aggregation.
- If not specified, the entire Timestream is used.
-
- Returns
- -------
- Timestream
- Timestream containing the collected list at each point.
+ Args:
+ max: The maximum number of values to collect.
+ If `None` all values are collected.
+ min: The minimum number of values to collect before producing a value.
+ Defaults to 0.
+ window: The window to use for the aggregation. If not specified,
+ the entire Timestream is used.
+
+ Returns:
+ A Timestream containing the list of collected elements at each point.
"""
if pa.types.is_list(self.data_type):
return (
@@ -831,80 +570,39 @@ def collect(
return _aggregation("collect", self, window, max, min)
def time(self) -> Timestream:
- """
- Create a Timestream containing the time of each point.
-
- Returns
- -------
- Timestream
- Timestream containing the time of each point.
- """
+ """Return a Timestream containing the time of each point."""
return Timestream._call("time_of", self)
def lag(self, n: int) -> Timestream:
- """
- Create a Timestream containing the value `n` points before each point.
+ """Return a Timestream containing the value `n` points before each point.
- Parameters
- ----------
- n : int
- The number of points to lag by.
-
- Returns
- -------
- Timestream
- Timestream containing the value `n` points before each point.
+ Args:
+ n: The number of points to lag by.
"""
# hack to support structs/lists (as collect supports lists)
return self.collect(max=n + 1, min=n + 1)[0]
def if_(self, condition: Union[Timestream, Literal]) -> Timestream:
- """
- Return `self` where `condition` is `true`, or `null` otherwise.
-
- Parameters
- ----------
- condition : Union[Timestream, Literal]
- The condition to check.
-
- Returns
- -------
- Timestream
- Timestream containing the value of `self` where `condition` is `true`, or
- `null` otherwise.
+ """Return a `Timestream` from `self` at points where `condition` is `true`, and `null` otherwise.
+
+ Args:
+ condition: The condition to check.
"""
return Timestream._call("if", condition, self)
def null_if(self, condition: Union[Timestream, Literal]) -> Timestream:
- """
- Return `self` where `condition` is `false`, or `null` otherwise.
-
- Parameters
- ----------
- condition : Union[Timestream, Literal]
- The condition to check.
-
- Returns
- -------
- Timestream
- Timestream containing the value of `self` where `condition` is `false`, or
- `null` otherwise.
+ """Return a `Timestream` from `self` at points where `condition` is not `false`, and `null` otherwise.
+
+ Args:
+ condition: The condition to check.
"""
return Timestream._call("null_if", condition, self)
def length(self) -> Timestream:
- """
- Create a Timestream containing the length of `self`.
+ """Return a Timestream containing the length of `self`.
- Returns
- -------
- Timestream
- Timestream containing the length of `self`.
-
- Raises
- ------
- TypeError
- When the Timestream is not a string or list.
+ Raises:
+ TypeError: When the Timestream is not a string or list.
"""
if self.data_type.equals(pa.string()):
return Timestream._call("len", self)
@@ -914,89 +612,54 @@ def length(self) -> Timestream:
raise TypeError(f"length not supported for {self.data_type}")
def with_key(self, key: Timestream, grouping: Optional[str] = None) -> Timestream:
- """
- Create a Timestream with a new grouping by `key`.
-
- Parameters
- ----------
- key : Timestream
- The new key to use for the grouping.
- grouping : Optional[str]
- A string literal naming the new grouping. If no `grouping` is specified,
- one will be computed from the type of the `key`.
-
- Returns
- -------
- Timestream
- Timestream with a new grouping by `key`.
+ """Return a Timestream with a new grouping by `key`.
+
+ Args:
+ key: The new key to use for the grouping.
+ grouping: A string literal naming the new grouping. If no `grouping` is specified,
+ one will be computed from the type of the `key`.
"""
return Timestream._call("with_key", key, self, grouping)
def lookup(self, key: Union[Timestream, Literal]) -> Timestream:
- """
- Lookup the value of `self` for each `key` at the times in `key`.
+ """Return a Timestream looking up the value of `self` for each `key`.
For each non-`null` point in the `key` timestream, returns the value
from `self` at that time and associated with that `key`. Returns `null`
if the `key` is `null` or if there is no `value` computed for that key
at the corresponding time.
- Parameters
- ----------
- key : Union[Timestream, Literal]
- The foreign key to lookup.
- This must match the type of the keys in `self`.
-
- Returns
- -------
- Timestream
- Timestream containing the lookup join between the `key` and `self`.
+ Args:
+ key: The foreign key to lookup. This must match the type of the keys in `self`.
"""
return Timestream._call("lookup", key, self)
def coalesce(
self, arg: Union[Timestream, Literal], *args: Union[Timestream, Literal]
) -> Timestream:
- """
- Create a Timestream for returning the first non-null value or null if all values are null.
-
- Parameters
- ----------
- arg : Union[Timestream, Literal]
- The next value to be coalesced (required).
- args : Union[Timestream, Literal]
- Additional values to be coalesced (optional).
-
- Returns
- -------
- Timestream
- Timestream containing the first non-null value from that row.
+ """Return a Timestream containing the first non-null point from self and the arguments.
+
+ Args:
+ arg: The next value to be coalesced (required).
+ args: Additional values to be coalesced (optional).
+
+ Returns:
+ Timestream containing the first non-null value from each point.
If all values are null, then returns null.
"""
return Timestream._call("coalesce", self, arg, *args)
def shift_to(self, time: Union[Timestream, datetime]) -> Timestream:
- """
- Create a Timestream shifting each point forward to `time`.
+ """Return a Timestream shifting each point forward to `time`.
If multiple values are shifted to the same time, they will be emitted in
the order in which they originally occurred.
- Parameters
- ----------
- time : Union[Timestream, datetime]
- The time to shift to.
- This must be a datetime or a Timestream of timestamp_ns.
-
- Returns
- -------
- Timestream
- Timestream containing the shifted points.
-
- Raises
- ------
- NotImplementedError
- When `time` is a datetime (shift_to literal not yet implemented).
+ Args:
+ time: The time to shift to. This must be a datetime or a Timestream of timestamp_ns.
+
+ Raises:
+ NotImplementedError: When `time` is a datetime (shift_to literal not yet implemented).
"""
if isinstance(time, datetime):
# session = self._ffi_expr.session()
@@ -1009,21 +672,13 @@ def shift_to(self, time: Union[Timestream, datetime]) -> Timestream:
return Timestream._call("shift_to", time, self)
def shift_by(self, delta: Union[Timestream, timedelta]) -> Timestream:
- """
- Create a Timestream shifting each point forward by the `delta`.
+ """Return a Timestream shifting each point forward by the `delta`.
If multiple values are shifted to the same time, they will be emitted in
the order in which they originally occurred.
- Parameters
- ----------
- delta : Union[Timestream, timedelta]
- The delta to shift the point forward by.
-
- Returns
- -------
- Timestream
- Timestream containing the shifted points.
+ Args:
+ delta: The delta to shift the point forward by.
"""
if isinstance(delta, timedelta):
session = self._ffi_expr.session()
@@ -1035,8 +690,7 @@ def shift_by(self, delta: Union[Timestream, timedelta]) -> Timestream:
return Timestream._call("shift_by", delta, self)
def shift_until(self, predicate: Timestream) -> Timestream:
- """
- Shift points from `self` forward to the next time `predicate` is true.
+ """Return a Timestream shifting each point forward to the next time `predicate` is true.
Note that if the `predicate` evaluates to true at the same time as `self`,
the point will be emitted at that time.
@@ -1044,268 +698,138 @@ def shift_until(self, predicate: Timestream) -> Timestream:
If multiple values are shifted to the same time, they will be emitted in
the order in which they originally occurred.
- Parameters
- ----------
- predicate : Timestream
- The predicate to determine whether to emit shifted rows.
-
- Returns
- -------
- Timestream
- Timestream containing the shifted points.
+ Args:
+ predicate: The predicate to determine whether to emit shifted rows.
"""
return Timestream._call("shift_until", predicate, self)
def sum(self, *, window: Optional[kd.windows.Window] = None) -> Timestream:
- """
- Create a Timestream summing the values in the `window`.
+ """Return a Timestream summing the values in the `window`.
Computes the sum for each key separately.
- Parameters
- ----------
- window : Optional[Window]
- The window to use for the aggregation.
- If not specified, the entire Timestream is used.
-
- Returns
- -------
- Timestream
- Timestream containing the sum up to and including each point.
+ Args:
+ window: The window to use for the aggregation. Defaults to the entire Timestream.
"""
return _aggregation("sum", self, window)
def first(self, *, window: Optional[kd.windows.Window] = None) -> Timestream:
- """
- Create a Timestream containing the first value in the `window`.
+ """Return a Timestream containing the first value in the `window`.
Computed for each key separately.
- Parameters
- ----------
- window : Optional[Window]
- The window to use for the aggregation.
- If not specified, the entire Timestream is used.
-
- Returns
- -------
- Timestream
- Timestream containing the first value for the key in the window for
- each point.
+ Args:
+ window: The window to use for the aggregation. Defaults to the entire Timestream.
"""
return _aggregation("first", self, window)
def last(self, window: Optional[kd.windows.Window] = None) -> Timestream:
- """
- Create a Timestream containing the last value in the `window`.
+ """Return a Timestream containing the last value in the `window`.
Computed for each key separately.
- Parameters
- ----------
- window : Optional[Window]
- The window to use for the aggregation.
- If not specified, the entire Timestream is used.
-
- Returns
- -------
- Timestream
- Timestream containing the last value for the key in the window for
- each point.
+ Args:
+ window: The window to use for the aggregation. Defaults to the entire Timestream.
"""
return _aggregation("last", self, window)
def count(self, window: Optional[kd.windows.Window] = None) -> Timestream:
- """
- Create a Timestream containing the count value in the `window`.
+ """Return a Timestream containing the count value in the `window`.
Computed for each key separately.
- Parameters
- ----------
- window : Optional[Window]
- The window to use for the aggregation.
- If not specified, the entire Timestream is used.
-
- Returns
- -------
- Timestream
- Timestream containing the count value for the key in the window for
- each point.
+ Args:
+ window: The window to use for the aggregation. Defaults to the entire Timestream.
"""
return _aggregation("count", self, window)
def count_if(self, window: Optional[kd.windows.Window] = None) -> Timestream:
- """
- Create a Timestream containing the count of `true` values in `window`.
+ """Return a Timestream containing the count of `true` values in `window`.
Computed for each key separately.
- Parameters
- ----------
- window : Optional[Window]
- The window to use for the aggregation.
- If not specified, the entire Timestream is used.
-
- Returns
- -------
- Timestream
- Timestream containing the count value if true for the key in the window for
- each point.
+ Args:
+ window: The window to use for the aggregation. Defaults to the entire Timestream.
"""
return _aggregation("count_if", self, window)
def max(self, window: Optional[kd.windows.Window] = None) -> Timestream:
- """
- Create a Timestream containing the max value in the `window`.
+ """Return a Timestream containing the max value in the `window`.
Computed for each key separately.
- Parameters
- ----------
- window : Optional[Window]
- The window to use for the aggregation.
- If not specified, the entire Timestream is used.
-
- Returns
- -------
- Timestream
- Timestream containing the max value for the key in the window for
- each point.
+ Args:
+ window: The window to use for the aggregation. Defaults to the entire Timestream.
"""
return _aggregation("max", self, window)
def min(self, window: Optional[kd.windows.Window] = None) -> Timestream:
- """
- Create a Timestream containing the min value in the `window`.
+ """Return a Timestream containing the min value in the `window`.
Computed for each key separately.
- Parameters
- ----------
- window : Optional[Window]
- The window to use for the aggregation.
- If not specified, the entire Timestream is used.
-
- Returns
- -------
- Timestream
- Timestream containing the min value for the key in the window for
- each point.
+ Args:
+ window: The window to use for the aggregation. Defaults to the entire Timestream.
"""
return _aggregation("min", self, window)
def mean(self, window: Optional[kd.windows.Window] = None) -> Timestream:
- """
- Create a Timestream containing the mean value in the `window`.
+ """Return a Timestream containing the mean value in the `window`.
Computed for each key separately.
- Parameters
- ----------
- window : Optional[Window]
- The window to use for the aggregation.
- If not specified, the entire Timestream is used.
-
- Returns
- -------
- Timestream
- Timestream containing the mean value for the key in the window for
- each point.
+ Args:
+ window: The window to use for the aggregation. Defaults to the entire Timestream.
"""
return _aggregation("mean", self, window)
def stddev(self, window: Optional[kd.windows.Window] = None) -> Timestream:
- """
- Create a Timestream containing the standard deviation in the `window`.
+ """Return a Timestream containing the standard deviation in the `window`.
Computed for each key separately.
- Parameters
- ----------
- window : Optional[Window]
- The window to use for the aggregation.
- If not specified, the entire Timestream is used.
-
- Returns
- -------
- Timestream
- Timestream containing the standard deviation for the key in the window for
- each point.
+ Args:
+ window: The window to use for the aggregation. Defaults to the entire Timestream.
"""
return _aggregation("stddev", self, window)
def variance(self, window: Optional[kd.windows.Window] = None) -> Timestream:
- """
- Create a Timestream containing the variance in the `window`.
+ """Return a Timestream containing the variance in the `window`.
Computed for each key separately.
- Parameters
- ----------
- window : Optional[Window]
- The window to use for the aggregation.
- If not specified, the entire Timestream is used.
-
- Returns
- -------
- Timestream
- Timestream containing the variance for the key in the window for
- each point.
+ Args:
+ window: The window to use for the aggregation. Defaults to the entire Timestream.
"""
return _aggregation("variance", self, window)
def cast(self, data_type: pa.DataType) -> Timestream:
- """
- Cast the type of this Timestream to the given data type.
-
- Parameters
- ----------
- data_type : pa.DataType
- The data type to cast to.
+ """Return this Timestream cast to the given data type.
- Returns
- -------
- Timestream
- Timestream with the given data type.
+ Args:
+ data_type: The DataType to cast to.
"""
return Timestream(self._ffi_expr.cast(data_type))
def else_(self, other: Timestream) -> Timestream:
- """
- Return `self` if not `null` otherwise `other`.
-
- Parameters
- ----------
- other : Timestream
- The Timestream to use if self is `null`.
+ """Return a Timestream containing `self` when not `null`, and `other` otherwise.
- Returns
- -------
- Timestream
- Timestream containing the value of `self` not `null` otherwise `other`.
+ Args:
+ other: The Timestream to use if self is `null`.
"""
return Timestream._call("else", other, self)
def seconds_since(self, time: Union[Timestream, Literal]) -> Timestream:
- """
- Return a Timestream containing seconds between `time` and `self`.
-
- Parameters
- ----------
- time : Union[Timestream, Literal]
- The time to compute the seconds since.
+ """Return a Timestream containing seconds between `time` and `self`.
- This can be either a stream of timestamps or a datetime literal.
- If `time` is a Timestream, the result will contain the seconds
- from `self.time()` to `time.time()` for each point.
+ If `self.time()` is greater than `time`, the result will be positive.
- Returns
- -------
- Timestream
- Timestream containing the number of seconds since `time`.
+ Args:
+ time: The time to compute the seconds since.
- If `self.time()` is greater than `time`, the result will be positive.
+ This can be either a stream of timestamps or a datetime literal.
+ If `time` is a Timestream, the result will contain the seconds
+ from `self.time()` to `time.time()` for each point.
"""
if isinstance(time, datetime):
session = self._ffi_expr.session()
@@ -1316,22 +840,13 @@ def seconds_since(self, time: Union[Timestream, Literal]) -> Timestream:
return Timestream._call("seconds_between", time, self)
def seconds_since_previous(self, n: int = 1) -> Timestream:
- """
- Return a Timestream containing seconds between `self` and the time `n` points ago.
-
- Parameters
- ----------
- n : int
- The number of points to look back. For example, `n=1` refers to
- the previous point.
+ """Return a Timestream containing seconds between `self` and the time `n` points ago.
- Defaults to 1 (the previous point).
+ Args:
+ n: The number of points to look back. For example, `n=1` refers to
+ the previous point.
- Returns
- -------
- Timestream
- Timestream containing the number of seconds since the time `n`
- points ago.
+ Defaults to 1 (the previous point).
"""
time_of_current = Timestream._call("time_of", self).cast(pa.int64())
time_of_previous = Timestream._call("time_of", self).lag(n).cast(pa.int64())
@@ -1344,87 +859,57 @@ def flatten(self) -> Timestream:
return Timestream._call("flatten", self)
def union(self, other: Timestream) -> Timestream:
- """
- Union the lists in this timestream with the lists in the other Timestream.
-
- This correspons to a pair-wise union within each row of the timestreams.
+ """Union the lists in this timestream with the lists in the other Timestream.
- Parameters
- ----------
- other : Timestream
- The Timestream of lists to union with.
+ This corresponds to a pair-wise union within each row of the timestreams.
- Returns
- -------
- Timestream
- Timestream containing the union of the lists.
+ Args:
+ other: The Timestream of lists to union with.
"""
return Timestream._call("union", self, other)
def record(self, fields: Callable[[Timestream], Mapping[str, Arg]]) -> Timestream:
- """
- Create a record Timestream from fields computed from this timestream.
-
- Parameters
- ----------
- fields : Callable[[Timestream], Mapping[str, Arg]]
- The fields to include in the record.
-
- Returns
- -------
- Timestream
- Timestream containing records with the given fields.
-
- See Also
- --------
- kaskada.record: Function for creating a record from one or more
- timestreams.
+ """Return a record Timestream from fields computed from this timestream.
+
+ Args:
+ fields: The fields to include in the record.
+
+ See Also:
+ kaskada.record: Function for creating a record from one or more
+ timestreams.
"""
return record(fields(self))
def preview(self, limit: int = 100) -> pd.DataFrame:
- """
- Return the first N rows of the result as a Pandas DataFrame.
+ """Return the first N rows of the result as a Pandas DataFrame.
This makes it easy to preview the content of the Timestream.
- Parameters
- ----------
- limit : int
- Maximum number of rows to print.
+ Args:
+ limit: Maximum number of rows to print.
- Returns
- -------
- pd.DataFrame
+ Returns:
The Pandas DataFrame containing the first `limit` points.
"""
return self.run(row_limit=limit).to_pandas()
def run(
self,
+ *,
row_limit: Optional[int] = None,
max_batch_size: Optional[int] = None,
materialize: bool = False,
) -> Result:
- """
- Run the Timestream once.
+ """Run the Timestream.
- Parameters
- ----------
- row_limit : Optional[int]
- The maximum number of rows to return.
- If not specified all rows are returned.
+ Args:
+ row_limit: The maximum number of rows to return.
+ If not specified all rows are returned.
+ max_batch_size: The maximum number of rows to return in each batch.
+ If not specified the default is used.
+ materialize: If true, the execution will be a continuous materialization.
- max_batch_size : Optional[int]
- The maximum number of rows to return in each batch.
- If not specified the default is used.
-
- materialize : bool
- If true, the execution will be a continuous materialization.
-
- Returns
- -------
- Result
+ Returns:
The `Result` object to use for accessing the results.
"""
expr = self
@@ -1444,29 +929,16 @@ def _aggregation(
window: Optional[kd.windows.Window],
*args: Union[Timestream, Literal],
) -> Timestream:
- """
- Create the aggregation `op` with the given `input`, `window` and `args`.
-
- Parameters
- ----------
- op : str
- The operation to create.
- input : Timestream
- The input to the aggregation.
- window : Optional[Window]
- The window to use for the aggregation.
- *args : Union[Timestream, Literal]
- Additional arguments to provide after `input` and before the flattened window.
-
- Returns
- -------
- Timestream
- The resulting Timestream.
-
- Raises
- ------
- NotImplementedError
- If the window is not a known type.
+ """Return the aggregation `op` with the given `input`, `window` and `args`.
+
+ Args:
+ op: The operation to create.
+ input: The input to the aggregation.
+ window: The window to use for the aggregation.
+ *args: Additional arguments to provide after `input` and before the flattened window.
+
+ Raises:
+ NotImplementedError: If the window is not a known type.
"""
if window is None:
return Timestream._call(op, input, *args, None, None)
@@ -1507,23 +979,14 @@ def _aggregation(
def record(fields: Mapping[str, Arg]) -> Timestream:
- """
- Create a record Timestream from the given fields.
-
- Parameters
- ----------
- fields : dict[str, Timestream]
- The fields to include in the record.
-
- Returns
- -------
- Timestream
- Timestream containing records with the given fields.
-
- See Also
- --------
- Timestream.record: Method for creating a record from fields computed from
- a timestream.
+ """Return a record Timestream from the given fields.
+
+ Args:
+ fields: The fields to include in the record.
+
+ See Also:
+ Timestream.record: Method for creating a record from fields computed from
+ a timestream.
"""
import itertools
diff --git a/python/pysrc/kaskada/plot.py b/python/pysrc/kaskada/plot.py
index 68208db09..e4446fa8c 100644
--- a/python/pysrc/kaskada/plot.py
+++ b/python/pysrc/kaskada/plot.py
@@ -25,16 +25,11 @@ def _require_plotly() -> None:
@dataclass
class Plot(object):
- """
- Configuration for a single plot to render.
-
- Parameters
- ----------
- stream : Timestream
- The Timestream to render.
- name: str
- The name of the plot to render.
- Defaults to `Result` if not set.
+ """Configuration for a single plot to render.
+
+ Attributes:
+ stream: The Timestream to render.
+ name: The name of the plot to render. Defaults to `Result` if not set.
"""
stream: Timestream
diff --git a/python/pysrc/kaskada/sources/arrow.py b/python/pysrc/kaskada/sources/arrow.py
index 48ddb11dd..c5835546e 100644
--- a/python/pysrc/kaskada/sources/arrow.py
+++ b/python/pysrc/kaskada/sources/arrow.py
@@ -28,31 +28,20 @@ def __init__(
grouping_name: Optional[str] = None,
time_unit: Optional[TimeUnit] = None,
) -> None:
- """
- Create a source reading Pandas DataFrames.
-
- Parameters
- ----------
- dataframe : pd.DataFrame
- The DataFrame to start from.
- time_column : str
- The name of the column containing the time.
- key_column : str
- The name of the column containing the key.
- subsort_column : str, optional
- The name of the column containing the subsort.
- If not provided, the subsort will be assigned by the system.
- schema : pa.Schema, optional
- The schema to use.
- If not specified, it will be inferred from the `dataframe`.
- grouping_name : str, optional
- The name of the groups associated with each key.
- This is used to ensure implicit joins are only performed between
- sources with compatible groupings.
- time_unit : str, optional
- The unit of the time column.
- One of `ns`, `us`, `ms`, or `s`.
- If not specified (and not specified in the `dataframe`), nanosecond will be assumed.
+ """Create a source reading Pandas DataFrames.
+
+ Args:
+ dataframe: The DataFrame to start from.
+ time_column: The name of the column containing the time.
+ key_column: The name of the column containing the key.
+ subsort_column: The name of the column containing the subsort.
+ If not provided, the subsort will be assigned by the system.
+ schema: The schema to use. If not provided, it will be inferred from the input.
+ grouping_name: The name of the groups associated with each key.
+ This is used to ensure implicit joins are only performed between data grouped
+ by the same entity.
+ time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`.
+ If not specified (and not specified in the data), nanosecond will be assumed.
"""
if schema is None:
schema = pa.Schema.from_pandas(dataframe)
@@ -87,31 +76,20 @@ def __init__(
grouping_name: Optional[str] = None,
time_unit: Optional[TimeUnit] = None,
) -> None:
- """
- Create a source reading from rows represented as dicts.
-
- Parameters
- ----------
- rows : dict | list[dict]
- One or more represented as dicts.
- time_column : str
- The name of the column containing the time.
- key_column : str
- The name of the column containing the key.
- subsort_column : str, optional
- The name of the column containing the subsort.
- If not provided, the subsort will be assigned by the system.
- schema : pa.Schema, optional
- The schema to use.
- If not specified, it will be inferred from the `rows`.
- grouping_name : str, optional
- The name of the groups associated with each key.
- This is used to ensure implicit joins are only performed between
- sources with compatible groupings.
- time_unit : str, optional
- The unit of the time column.
- One of `ns`, `us`, `ms`, or `s`.
- If not specified nanosecond will be assumed.
+ """Create a source reading from rows represented as dicts.
+
+ Args:
+ rows: One or more rows represented as dicts.
+ time_column: The name of the column containing the time.
+ key_column: The name of the column containing the key.
+ subsort_column: The name of the column containing the subsort.
+ If not provided, the subsort will be assigned by the system.
+ schema: The schema to use. If not provided, it will be inferred from the input.
+ grouping_name: The name of the groups associated with each key.
+ This is used to ensure implicit joins are only performed between data grouped
+ by the same entity.
+ time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`.
+ If not specified (and not specified in the data), nanosecond will be assumed.
"""
if schema is None:
schema = pa.Table.from_pylist(rows).schema
@@ -152,31 +130,20 @@ def __init__(
grouping_name: Optional[str] = None,
time_unit: Optional[TimeUnit] = None,
) -> None:
- """
- Create a CSV String Source.
-
- Parameters
- ----------
- csv_string : str
- The CSV string to start from.
- time_column : str
- The name of the column containing the time.
- key_column : str
- The name of the column containing the key.
- subsort_column : str, optional
- The name of the column containing the subsort.
- If not provided, the subsort will be assigned by the system.
- schema : pa.Schema, optional
- The schema to use.
- If not specified, it will be inferred from the `csv_string`.
- grouping_name : str, optional
- The name of the groups associated with each key.
- This is used to ensure implicit joins are only performed between
- sources with compatible groupings.
- time_unit : str, optional
- The unit of the time column.
- One of `ns`, `us`, `ms`, or `s`.
- If not specified nanosecond will be assumed.
+ """Create a CSV String Source.
+
+ Args:
+ csv_string: The CSV string to start from.
+ time_column: The name of the column containing the time.
+ key_column: The name of the column containing the key.
+ subsort_column: The name of the column containing the subsort.
+ If not provided, the subsort will be assigned by the system.
+ schema: The schema to use. If not provided, it will be inferred from the input.
+ grouping_name: The name of the groups associated with each key.
+ This is used to ensure implicit joins are only performed between data grouped
+ by the same entity.
+ time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`.
+ If not specified (and not specified in the data), nanosecond will be assumed.
"""
if isinstance(csv_string, str):
csv_string = BytesIO(csv_string.encode("utf-8"))
@@ -221,31 +188,20 @@ def __init__(
grouping_name: Optional[str] = None,
time_unit: Optional[TimeUnit] = None,
) -> None:
- """
- Create a JSON String Source.
-
- Parameters
- ----------
- json_string : str
- The line-delimited JSON string to start from.
- time_column : str
- The name of the column containing the time.
- key_column : str
- The name of the column containing the key.
- subsort_column : str, optional
- The name of the column containing the subsort.
- If not provided, the subsort will be assigned by the system.
- schema : pa.Schema, optional
- The schema to use.
- If not specified, it will be inferred from the JSON records.
- grouping_name : str, optional
- The name of the groups associated with each key.
- This is used to ensure implicit joins are only performed between
- sources with compatible groupings.
- time_unit : str, optional
- The unit of the time column.
- One of `ns`, `us`, `ms`, or `s`.
- If not specified nanosecond will be assumed.
+ """Create a JSON String Source.
+
+ Args:
+ json_string: The line-delimited JSON string to start from.
+ time_column: The name of the column containing the time.
+ key_column: The name of the column containing the key.
+ subsort_column: The name of the column containing the subsort.
+ If not provided, the subsort will be assigned by the system.
+ schema: The schema to use. If not provided, it will be inferred from the input.
+ grouping_name: The name of the groups associated with each key.
+ This is used to ensure implicit joins are only performed between data grouped
+ by the same entity.
+ time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`.
+ If not specified (and not specified in the data), nanosecond will be assumed.
"""
if isinstance(json_string, str):
json_string = BytesIO(json_string.encode("utf-8"))
@@ -290,28 +246,19 @@ def __init__(
"""
Create a Parquet source.
- Parameters
- ----------
- path : str
- The path to the Parquet file to add.
- time_column : str
- The name of the column containing the time.
- key_column : str
- The name of the column containing the key.
- subsort_column : str, optional
- The name of the column containing the subsort.
- If not provided, the subsort will be assigned by the system.
- schema : pa.Schema, optional
- The schema to use.
- If not specified, it will be inferred from the Parquet file.
- grouping_name : str, optional
- The name of the groups associated with each key.
- This is used to ensure implicit joins are only performed between
- sources with compatible groupings.
- time_unit : str, optional
- The unit of the time column.
- One of `ns`, `us`, `ms`, or `s`.
- If not specified nanosecond will be assumed.
+ Args:
+ path: The path to the Parquet file to add.
+ dataframe: The DataFrame to start from.
+ time_column: The name of the column containing the time.
+ key_column: The name of the column containing the key.
+ subsort_column: The name of the column containing the subsort.
+ If not provided, the subsort will be assigned by the system.
+ schema: The schema to use. If not provided, it will be inferred from the input.
+ grouping_name: The name of the groups associated with each key.
+ This is used to ensure implicit joins are only performed between data grouped
+ by the same entity.
+ time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`.
+ If not specified (and not specified in the data), nanosecond will be assumed.
"""
if schema is None:
schema = pa.parquet.read_schema(path)
diff --git a/python/pysrc/kaskada/windows.py b/python/pysrc/kaskada/windows.py
index 7bbfc9f25..905ca4fe6 100644
--- a/python/pysrc/kaskada/windows.py
+++ b/python/pysrc/kaskada/windows.py
@@ -16,104 +16,56 @@ class Window(object):
@dataclass(frozen=True)
class Since(Window):
- """
- Window since the last time a predicate was true.
+ """Window since the last time a predicate was true.
Aggregations will contain all values starting from the last time the predicate
evaluated to true (inclusive).
-
- Parameters
- ----------
- predicate : Timestream | Callable[..., Timestream] | bool
- The boolean Timestream to use as predicate for the window.
- Each time the predicate evaluates to true the window will be cleared.
-
- The predicate may be a callable which returns the boolean Timestream, in
- which case it is applied to the Timestream being aggregated.
"""
+ #: The boolean Timestream to use as predicate for the window.
+ #: Each time the predicate evaluates to true the window will be cleared.
+#:
+ #: The predicate may be a callable which returns the boolean Timestream, in
+ #: which case it is applied to the Timestream being aggregated.
predicate: Timestream | Callable[..., Timestream] | bool
@staticmethod
def minutely() -> Since:
- """
- Return a window since the start of each minute.
-
- Returns
- -------
- Since
- Window since the start of each minute.
- """
+ """Return a window since the start of each minute."""
return Since(predicate=lambda domain: Timestream._call("minutely", domain))
@staticmethod
def hourly() -> Since:
- """
- Return a window since the start of each hour.
-
- Returns
- -------
- Since
- Window since the start of each hour.
- """
+ """Return a window since the start of each hour."""
return Since(predicate=lambda domain: Timestream._call("hourly", domain))
@staticmethod
def daily() -> Since:
- """
- Return a window since the start of each day.
-
- Returns
- -------
- Since
- Window since the start of each day.
- """
+ """Return a window since the start of each day."""
return Since(predicate=lambda domain: Timestream._call("daily", domain))
@staticmethod
def monthly() -> Since:
- """
- Return a window since the start of each month.
-
- Returns
- -------
- Since
- Window since the start of each month.
- """
+ """Return a window since the start of each month."""
return Since(predicate=lambda domain: Timestream._call("monthly", domain))
@staticmethod
def yearly() -> Since:
- """
- Return a window since the start of each year.
-
- Returns
- -------
- Since
- Window since the start of each year.
- """
+ """Return a window since the start of each year."""
return Since(predicate=lambda domain: Timestream._call("yearly", domain))
@dataclass(frozen=True)
class Sliding(Window):
- """
- Window for the last `duration` intervals of some `predicate`.
-
- Parameters
- ----------
- duration : int
- The number of sliding intervals to use in the window.
-
- predicate : Timestream | Callable[..., Timestream] | bool
- The boolean Timestream to use as predicate for the window
- Each time the predicate evaluates to true the window starts a new interval.
-
- The predicate may be a callable which returns the boolean Timestream, in
- which case it is applied to the Timestream being aggregated.
- """
+ """Window for the last `duration` intervals of some `predicate`."""
+ #: The number of sliding intervals to use in the window.
duration: int
+ #: The boolean Timestream to use as a predicate for the window.
+ #: Each time the predicate evaluates to true the window starts a new interval.
+ #:
+ #: The predicate may be a callable which returns the boolean Timestream, in
+ #: which case it is applied to the Timestream being aggregated.
predicate: Timestream | Callable[..., Timestream] | bool
def __post_init__(self):
@@ -123,18 +75,10 @@ def __post_init__(self):
@staticmethod
def minutely(duration: int) -> Sliding:
- """
- Return a sliding window containing `duration` minutes.
+ """Return a sliding window containing `duration` minutes.
- Parameters
- ----------
- duration : int
- The number of minutes to use in the window.
-
- Returns
- -------
- Sliding
- Sliding window with `duration` minutes, advancing every minute.
+ Args:
+ duration: The number of minutes to use in the window.
"""
return Sliding(
duration=duration,
@@ -143,18 +87,10 @@ def minutely(duration: int) -> Sliding:
@staticmethod
def hourly(duration: int) -> Sliding:
- """
- Return a sliding window containing `duration` hours.
+ """Return a sliding window containing `duration` hours.
- Parameters
- ----------
- duration : int
- The number of hours to use in the window.
-
- Returns
- -------
- Sliding
- Sliding window with `duration` hours, advancing every hour.
+ Args:
+ duration: The number of hours to use in the window.
"""
return Sliding(
duration=duration,
@@ -163,18 +99,10 @@ def hourly(duration: int) -> Sliding:
@staticmethod
def daily(duration: int) -> Sliding:
- """
- Return a sliding window containing `duration` daily.
+ """Return a sliding window containing `duration` daily.
- Parameters
- ----------
- duration : int
- The number of days to use in the window.
-
- Returns
- -------
- Sliding
- Sliding window with `duration` days, advancing every day.
+ Args:
+ duration: The number of days to use in the window.
"""
return Sliding(
duration=duration,
@@ -183,18 +111,10 @@ def daily(duration: int) -> Sliding:
@staticmethod
def monthly(duration: int) -> Sliding:
- """
- Return a sliding window containing `duration` months.
+ """Return a sliding window containing `duration` months.
- Parameters
- ----------
- duration : int
- The number of months to use in the window.
-
- Returns
- -------
- Sliding
- Sliding window with `duration` months, advancing every month.
+ Args:
+ duration: The number of months to use in the window.
"""
return Sliding(
duration=duration,
@@ -203,18 +123,10 @@ def monthly(duration: int) -> Sliding:
@staticmethod
def yearly(duration: int) -> Sliding:
- """
- Return a sliding window containing `duration` years.
+ """Return a sliding window containing `duration` years.
- Parameters
- ----------
- duration : int
- The number of years to use in the window.
-
- Returns
- -------
- Sliding
- Sliding window with `duration` years, advancing every year.
+ Args:
+ duration: The number of years to use in the window.
"""
return Sliding(
duration=duration,
@@ -224,15 +136,13 @@ def yearly(duration: int) -> Sliding:
@dataclass(frozen=True)
class Trailing(Window):
- """
- Window the last `duration` time period.
+ """Window the last `duration` time period.
- Parameters
- ----------
- duration : timedelta
- The duration of the window.
+ Args:
+ duration: The duration of the window.
"""
+ #: The duration of the window.
duration: timedelta
def __post_init__(self):
From b151c39ffcf006e4530824d3903b1cad537fa277 Mon Sep 17 00:00:00 2001
From: Ben Chambers <35960+bjchambers@users.noreply.github.com>
Date: Mon, 28 Aug 2023 16:49:49 -0700
Subject: [PATCH 2/4] fix lint
---
.github/workflows/ci_python.yml | 3 ---
python/.darglint | 5 -----
python/docs/source/conf.py | 2 +-
python/noxfile.py | 4 +---
python/poetry.lock | 13 +------------
python/pyproject.toml | 1 -
python/pysrc/kaskada/_session.py | 3 +--
python/pysrc/kaskada/sources/arrow.py | 3 +--
python/pysrc/kaskada/windows.py | 2 +-
9 files changed, 6 insertions(+), 30 deletions(-)
delete mode 100644 python/.darglint
diff --git a/.github/workflows/ci_python.yml b/.github/workflows/ci_python.yml
index 04c8ffb1e..11a0262a0 100644
--- a/.github/workflows/ci_python.yml
+++ b/.github/workflows/ci_python.yml
@@ -68,9 +68,6 @@ jobs:
- name: isort
run: |
poetry run isort --filter-files --diff pysrc pytests docs/source
- - name: darglint
- run: |
- poetry run darglint pysrc
- name: pydocstyle
run: |
poetry run pydocstyle pysrc
diff --git a/python/.darglint b/python/.darglint
deleted file mode 100644
index 89956bb7a..000000000
--- a/python/.darglint
+++ /dev/null
@@ -1,5 +0,0 @@
-[darglint]
-strictness = short
-docstring_style=numpy
-ignore_regex=^(test)?_(.*),
-message_template={path}:{line} in {obj}: {msg_id} {msg}
\ No newline at end of file
diff --git a/python/docs/source/conf.py b/python/docs/source/conf.py
index b46c5201e..2704bc2c0 100644
--- a/python/docs/source/conf.py
+++ b/python/docs/source/conf.py
@@ -123,6 +123,6 @@
napoleon_use_rtype = False
typehints_use_rtype = False
typehints_document_rtype = False
-typehints_defaults = 'comma'
+typehints_defaults = "comma"
suppress_warnings = ["mystnb.unknown_mime_type"]
diff --git a/python/noxfile.py b/python/noxfile.py
index 906d93967..27cf7bdef 100644
--- a/python/noxfile.py
+++ b/python/noxfile.py
@@ -31,9 +31,7 @@ def check_lint(session: nox.Session) -> None:
session.run("flake8", *args)
session.run("isort", "--filter-files", "--check-only", *args)
- # Only do darglint and pydocstyle on pysrc (source)
- session.run("darglint", "pysrc")
- session.run("pydocstyle", "--convention=numpy", "pysrc")
+ session.run("pydocstyle", "--convention=google", "pysrc")
# No way to run this as a check.
# session.run("pyupgrade", "--py38-plus")
diff --git a/python/poetry.lock b/python/poetry.lock
index e2e69d81c..40122c39e 100644
--- a/python/poetry.lock
+++ b/python/poetry.lock
@@ -490,17 +490,6 @@ tomli = {version = "*", optional = true, markers = "python_full_version <= \"3.1
[package.extras]
toml = ["tomli"]
-[[package]]
-name = "darglint"
-version = "1.8.1"
-description = "A utility for ensuring Google-style docstrings stay up to date with the source code."
-optional = false
-python-versions = ">=3.6,<4.0"
-files = [
- {file = "darglint-1.8.1-py3-none-any.whl", hash = "sha256:5ae11c259c17b0701618a20c3da343a3eb98b3bc4b5a83d31cdd94f5ebdced8d"},
- {file = "darglint-1.8.1.tar.gz", hash = "sha256:080d5106df149b199822e7ee7deb9c012b49891538f14a11be681044f0bb20da"},
-]
-
[[package]]
name = "debugpy"
version = "1.6.7.post1"
@@ -2996,4 +2985,4 @@ plot = ["plotly"]
[metadata]
lock-version = "2.0"
python-versions = ">=3.8.1,<4.0"
-content-hash = "e4cda7b9883ae598e5e16e2426dac30292ed867520937da9e1c867b76eadda52"
+content-hash = "207cb0246f166fabdd6f69502dc050c632ffa07497da5220b04bbf54b581818e"
diff --git a/python/pyproject.toml b/python/pyproject.toml
index 4263f924a..b8db4d361 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -49,7 +49,6 @@ optional = true
[tool.poetry.group.lint.dependencies]
black = { version = ">=21.10b0", extras = ["jupyter"] }
-darglint = ">=1.8.1"
flake8 = ">=4.0.1"
flake8-bugbear = ">=21.9.2"
flake8-rst-docstrings = ">=0.2.5"
diff --git a/python/pysrc/kaskada/_session.py b/python/pysrc/kaskada/_session.py
index 2018af9ce..fcbc5fb1a 100644
--- a/python/pysrc/kaskada/_session.py
+++ b/python/pysrc/kaskada/_session.py
@@ -24,8 +24,7 @@ def init_session() -> None:
def _get_session() -> _ffi.Session:
- """
- Assert that the session has been initialized.
+ """Assert that the session has been initialized.
Returns: The FFI session handle.
diff --git a/python/pysrc/kaskada/sources/arrow.py b/python/pysrc/kaskada/sources/arrow.py
index c5835546e..afaefd1bf 100644
--- a/python/pysrc/kaskada/sources/arrow.py
+++ b/python/pysrc/kaskada/sources/arrow.py
@@ -243,8 +243,7 @@ def __init__(
grouping_name: Optional[str] = None,
time_unit: Optional[TimeUnit] = None,
) -> None:
- """
- Create a Parquet source.
+ """Create a Parquet source.
Args:
path: The path to the Parquet file to add.
diff --git a/python/pysrc/kaskada/windows.py b/python/pysrc/kaskada/windows.py
index 905ca4fe6..726f1b2e0 100644
--- a/python/pysrc/kaskada/windows.py
+++ b/python/pysrc/kaskada/windows.py
@@ -24,7 +24,7 @@ class Since(Window):
#: The boolean Timestream to use as predicate for the window.
#: Each time the predicate evaluates to true the window will be cleared.
-#:
+ #:
#: The predicate may be a callable which returns the boolean Timestream, in
#: which case it is applied to the Timestream being aggregated.
predicate: Timestream | Callable[..., Timestream] | bool
From be37f34efab6a1bdc26e3b538b9173523d0d0d51 Mon Sep 17 00:00:00 2001
From: Ben Chambers <35960+bjchambers@users.noreply.github.com>
Date: Mon, 28 Aug 2023 16:58:59 -0700
Subject: [PATCH 3/4] fixes
---
python/pysrc/kaskada/sources/arrow.py | 10 +++++-----
python/pysrc/kaskada/windows.py | 2 +-
2 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/python/pysrc/kaskada/sources/arrow.py b/python/pysrc/kaskada/sources/arrow.py
index afaefd1bf..40ea1f7a1 100644
--- a/python/pysrc/kaskada/sources/arrow.py
+++ b/python/pysrc/kaskada/sources/arrow.py
@@ -37,7 +37,7 @@ def __init__(
subsort_column: The name of the column containing the subsort.
If not provided, the subsort will be assigned by the system.
schema: The schema to use. If not provided, it will be inferred from the input.
- grouping_name: The name of the groups associated with each key.
+ grouping_name: The name of the group associated with each key.
This is used to ensure implicit joins are only performed between data grouped
by the same entity.
time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`.
@@ -85,7 +85,7 @@ def __init__(
subsort_column: The name of the column containing the subsort.
If not provided, the subsort will be assigned by the system.
schema: The schema to use. If not provided, it will be inferred from the input.
- grouping_name: The name of the groups associated with each key.
+ grouping_name: The name of the group associated with each key.
This is used to ensure implicit joins are only performed between data grouped
by the same entity.
time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`.
@@ -139,7 +139,7 @@ def __init__(
subsort_column: The name of the column containing the subsort.
If not provided, the subsort will be assigned by the system.
schema: The schema to use. If not provided, it will be inferred from the input.
- grouping_name: The name of the groups associated with each key.
+ grouping_name: The name of the group associated with each key.
This is used to ensure implicit joins are only performed between data grouped
by the same entity.
time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`.
@@ -197,7 +197,7 @@ def __init__(
subsort_column: The name of the column containing the subsort.
If not provided, the subsort will be assigned by the system.
schema: The schema to use. If not provided, it will be inferred from the input.
- grouping_name: The name of the groups associated with each key.
+ grouping_name: The name of the group associated with each key.
This is used to ensure implicit joins are only performed between data grouped
by the same entity.
time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`.
@@ -253,7 +253,7 @@ def __init__(
subsort_column: The name of the column containing the subsort.
If not provided, the subsort will be assigned by the system.
schema: The schema to use. If not provided, it will be inferred from the input.
- grouping_name: The name of the groups associated with each key.
+ grouping_name: The name of the group associated with each key.
This is used to ensure implicit joins are only performed between data grouped
by the same entity.
time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`.
diff --git a/python/pysrc/kaskada/windows.py b/python/pysrc/kaskada/windows.py
index 726f1b2e0..e269ca3d1 100644
--- a/python/pysrc/kaskada/windows.py
+++ b/python/pysrc/kaskada/windows.py
@@ -64,7 +64,7 @@ class Sliding(Window):
#: The boolean Timestream to use as a predicate for the window.
#: Each time the predicate evaluates to true the window starts a new interval.
#:
- #: The predicate may be a callable which returns the boolean Timestream, in
+ #: The predicate may be a callable which returns the boolean Timestream, in
#: which case it is applied to the Timestream being aggregated.
predicate: Timestream | Callable[..., Timestream] | bool
From 92f48bf38dd59501b58047223ad47c13415497d1 Mon Sep 17 00:00:00 2001
From: Ben Chambers <35960+bjchambers@users.noreply.github.com>
Date: Mon, 28 Aug 2023 18:39:49 -0700
Subject: [PATCH 4/4] bump version
---
python/Cargo.toml | 2 +-
python/docs/source/examples/time_centric.ipynb | 4 ++--
python/pyproject.toml | 4 ++--
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/python/Cargo.toml b/python/Cargo.toml
index e681c28d0..8cc93b7e9 100644
--- a/python/Cargo.toml
+++ b/python/Cargo.toml
@@ -3,7 +3,7 @@ name = "kaskada"
authors = ["Kaskada Developers"]
edition = "2021"
license = "Apache-2.0"
-version = "0.6.0-a.1"
+version = "0.6.0-a.2"
description = """
Python library for building and executing temporal queries.
"""
diff --git a/python/docs/source/examples/time_centric.ipynb b/python/docs/source/examples/time_centric.ipynb
index c125588de..243f537ea 100644
--- a/python/docs/source/examples/time_centric.ipynb
+++ b/python/docs/source/examples/time_centric.ipynb
@@ -81,7 +81,7 @@
"## Kaskada Client Setup\n",
"\n",
"```\n",
- "%pip install kaskada>=0.6.0-a.1\n",
+ "%pip install kaskada>=0.6.0-a.0\n",
"```"
]
},
@@ -339,4 +339,4 @@
},
"nbformat": 4,
"nbformat_minor": 5
-}
\ No newline at end of file
+}
diff --git a/python/pyproject.toml b/python/pyproject.toml
index b8db4d361..440369cb2 100644
--- a/python/pyproject.toml
+++ b/python/pyproject.toml
@@ -1,6 +1,6 @@
[project]
name = "kaskada"
-version = "0.6.0-a.1"
+version = "0.6.0-a.2"
description = "Kaskada query builder and local execution engine."
requires-python = ">=3.8.1,<4.0"
classifiers = [
@@ -23,7 +23,7 @@ plot = [
name = "kaskada"
description = "Kaskada query builder and local execution engine."
authors = []
-version = "0.6.0-a.1"
+version = "0.6.0-a.2"
[tool.poetry.dependencies]
# Dependencies to install for using the project.