diff --git a/.github/workflows/ci_python.yml b/.github/workflows/ci_python.yml index 04c8ffb1e..11a0262a0 100644 --- a/.github/workflows/ci_python.yml +++ b/.github/workflows/ci_python.yml @@ -68,9 +68,6 @@ jobs: - name: isort run: | poetry run isort --filter-files --diff pysrc pytests docs/source - - name: darglint - run: | - poetry run darglint pysrc - name: pydocstyle run: | poetry run pydocstyle pysrc diff --git a/python/.darglint b/python/.darglint deleted file mode 100644 index 89956bb7a..000000000 --- a/python/.darglint +++ /dev/null @@ -1,5 +0,0 @@ -[darglint] -strictness = short -docstring_style=numpy -ignore_regex=^(test)?_(.*), -message_template={path}:{line} in {obj}: {msg_id} {msg} \ No newline at end of file diff --git a/python/Cargo.toml b/python/Cargo.toml index e681c28d0..8cc93b7e9 100644 --- a/python/Cargo.toml +++ b/python/Cargo.toml @@ -3,7 +3,7 @@ name = "kaskada" authors = ["Kaskada Developers"] edition = "2021" license = "Apache-2.0" -version = "0.6.0-a.1" +version = "0.6.0-a.2" description = """ Python library for building and executing temporal queries. """ diff --git a/python/docs/source/conf.py b/python/docs/source/conf.py index 6e9f1ad56..2704bc2c0 100644 --- a/python/docs/source/conf.py +++ b/python/docs/source/conf.py @@ -20,6 +20,7 @@ # "myst_parser", "myst_nb", "sphinx_copybutton", + "sphinx_autodoc_typehints", # must be after napoleon "_extensions.gallery_directive", ] autodoc_typehints = "description" @@ -110,6 +111,7 @@ # Automatically extract typehints when specified and place them in # descriptions of the relevant function/method. autodoc_typehints = "description" +autodoc_type_aliases = {"kaskada.Arg": "kaskada.Arg"} # Don't show class signature with the class' name. autodoc_class_signature = "separated" @@ -117,5 +119,10 @@ autosummary_generate = True napoleon_preprocess_types = True +napoleon_attr_annotations = True +napoleon_use_rtype = False +typehints_use_rtype = False +typehints_document_rtype = False +typehints_defaults = "comma" suppress_warnings = ["mystnb.unknown_mime_type"] diff --git a/python/docs/source/examples/time_centric.ipynb b/python/docs/source/examples/time_centric.ipynb index c125588de..243f537ea 100644 --- a/python/docs/source/examples/time_centric.ipynb +++ b/python/docs/source/examples/time_centric.ipynb @@ -81,7 +81,7 @@ "## Kaskada Client Setup\n", "\n", "```\n", - "%pip install kaskada>=0.6.0-a.1\n", + "%pip install kaskada>=0.6.0-a.0\n", "```" ] }, @@ -339,4 +339,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} \ No newline at end of file +} diff --git a/python/docs/source/index.md b/python/docs/source/index.md index c04b6fb9b..7b089adaa 100644 --- a/python/docs/source/index.md +++ b/python/docs/source/index.md @@ -1,7 +1,11 @@ --- hide-toc: true html_theme.sidebar_secondary.remove: true +sd_hide_title: true --- + +# Real-Time AI without the fuss. +
@@ -12,7 +16,7 @@ html_theme.sidebar_secondary.remove: true
-# Kaskada completes the Real-Time AI stack, providing... +## Kaskada completes the Real-Time AI stack, providing... ```{gallery-grid} :grid-columns: 1 2 2 3 diff --git a/python/docs/source/reference/timestream/aggregation.md b/python/docs/source/reference/timestream/aggregation.md index 3f5740fce..7d2ff14ee 100644 --- a/python/docs/source/reference/timestream/aggregation.md +++ b/python/docs/source/reference/timestream/aggregation.md @@ -2,12 +2,16 @@ Timestream aggregations are: -Cumulative: - They reflect all values up to and including the current time. -Grouped: - They reflect the values for each entity separately. -Windowed: - They reflect the values within a specific [window](../windows.md). +Cumulative +: They reflect all values up to and including the current time. + +Grouped +: They reflect the values for each entity separately. + +Windowed +: They reflect the values within a specific [window](../windows.md). + +## Aggregation Methods ```{eval-rst} .. currentmodule:: kaskada diff --git a/python/docs/source/reference/timestream/arithmetic.md b/python/docs/source/reference/timestream/arithmetic.md index 669b7c810..8bc527b7e 100644 --- a/python/docs/source/reference/timestream/arithmetic.md +++ b/python/docs/source/reference/timestream/arithmetic.md @@ -8,6 +8,8 @@ For instance, `a.add(b)` may be written as `a + b`. See the notes on the specific functions for more information. ``` +## Arithmetic Methods + ```{eval-rst} .. currentmodule:: kaskada diff --git a/python/docs/source/reference/timestream/collection.md b/python/docs/source/reference/timestream/collection.md index 99e579692..e9f9f177c 100644 --- a/python/docs/source/reference/timestream/collection.md +++ b/python/docs/source/reference/timestream/collection.md @@ -1,7 +1,9 @@ -# Arithmetic +# Collection Timestreams allow each point to contain a collection -- a `list` or `map` -- of elements. +## Collection Methods + ```{eval-rst} .. currentmodule:: kaskada diff --git a/python/docs/source/reference/timestream/comparison.md b/python/docs/source/reference/timestream/comparison.md index d25e3d28f..bdf014ba9 100644 --- a/python/docs/source/reference/timestream/comparison.md +++ b/python/docs/source/reference/timestream/comparison.md @@ -10,6 +10,8 @@ See the notes on the specific functions for more information. To respect the semantics of `__eq__` and `__ne__`, `a == b` and `a != b` are *not* overloaded. ``` +## Comparison Methods + ```{eval-rst} .. currentmodule:: kaskada diff --git a/python/docs/source/reference/timestream/index.md b/python/docs/source/reference/timestream/index.md index 93dbf9ce9..5e861cacb 100644 --- a/python/docs/source/reference/timestream/index.md +++ b/python/docs/source/reference/timestream/index.md @@ -12,6 +12,7 @@ html_theme.sidebar_secondary.remove: :exclude-members: __init__ .. autoproperty:: data_type +.. autoclass:: kaskada.Arg ``` ```{toctree} diff --git a/python/docs/source/reference/timestream/misc.md b/python/docs/source/reference/timestream/misc.md index c3d6b6434..482f89826 100644 --- a/python/docs/source/reference/timestream/misc.md +++ b/python/docs/source/reference/timestream/misc.md @@ -13,4 +13,5 @@ Timestream.if_ Timestream.lag Timestream.null_if + Timestream.pipe ``` \ No newline at end of file diff --git a/python/docs/source/reference/timestream/records.md b/python/docs/source/reference/timestream/records.md index 7fe5b10c1..24a2b4743 100644 --- a/python/docs/source/reference/timestream/records.md +++ b/python/docs/source/reference/timestream/records.md @@ -1,7 +1,8 @@ # Records Record operations create, extract or manipulate Timestreams of records. -Comparison operations produce boolean Timestreams. + +## Record Methods ```{eval-rst} .. currentmodule:: kaskada diff --git a/python/docs/source/reference/windows.md b/python/docs/source/reference/windows.md index 007f26a12..d14513a98 100644 --- a/python/docs/source/reference/windows.md +++ b/python/docs/source/reference/windows.md @@ -6,6 +6,7 @@ .. autosummary:: :toctree: apidocs/windows/ + Window Since Sliding Trailing diff --git a/python/noxfile.py b/python/noxfile.py index 906d93967..27cf7bdef 100644 --- a/python/noxfile.py +++ b/python/noxfile.py @@ -31,9 +31,7 @@ def check_lint(session: nox.Session) -> None: session.run("flake8", *args) session.run("isort", "--filter-files", "--check-only", *args) - # Only do darglint and pydocstyle on pysrc (source) - session.run("darglint", "pysrc") - session.run("pydocstyle", "--convention=numpy", "pysrc") + session.run("pydocstyle", "--convention=google", "pysrc") # No way to run this as a check. # session.run("pyupgrade", "--py38-plus") diff --git a/python/poetry.lock b/python/poetry.lock index 8d6ac9ebd..40122c39e 100644 --- a/python/poetry.lock +++ b/python/poetry.lock @@ -490,17 +490,6 @@ tomli = {version = "*", optional = true, markers = "python_full_version <= \"3.1 [package.extras] toml = ["tomli"] -[[package]] -name = "darglint" -version = "1.8.1" -description = "A utility for ensuring Google-style docstrings stay up to date with the source code." -optional = false -python-versions = ">=3.6,<4.0" -files = [ - {file = "darglint-1.8.1-py3-none-any.whl", hash = "sha256:5ae11c259c17b0701618a20c3da343a3eb98b3bc4b5a83d31cdd94f5ebdced8d"}, - {file = "darglint-1.8.1.tar.gz", hash = "sha256:080d5106df149b199822e7ee7deb9c012b49891538f14a11be681044f0bb20da"}, -] - [[package]] name = "debugpy" version = "1.6.7.post1" @@ -2484,6 +2473,25 @@ sphinx = "*" [package.extras] test = ["pytest", "pytest-cov"] +[[package]] +name = "sphinx-autodoc-typehints" +version = "1.23.0" +description = "Type hints (PEP 484) support for the Sphinx autodoc extension" +optional = false +python-versions = ">=3.7" +files = [ + {file = "sphinx_autodoc_typehints-1.23.0-py3-none-any.whl", hash = "sha256:ac099057e66b09e51b698058ba7dd76e57e1fe696cd91b54e121d3dad188f91d"}, + {file = "sphinx_autodoc_typehints-1.23.0.tar.gz", hash = "sha256:5d44e2996633cdada499b6d27a496ddf9dbc95dd1f0f09f7b37940249e61f6e9"}, +] + +[package.dependencies] +sphinx = ">=5.3" + +[package.extras] +docs = ["furo (>=2022.12.7)", "sphinx (>=6.1.3)", "sphinx-autodoc-typehints (>=1.23.4)"] +testing = ["covdefaults (>=2.2.2)", "coverage (>=7.2.2)", "diff-cover (>=7.5)", "nptyping (>=2.5)", "pytest (>=7.2.2)", "pytest-cov (>=4)", "sphobjinv (>=2.3.1)", "typing-extensions (>=4.5)"] +type-comment = ["typed-ast (>=1.5.4)"] + [[package]] name = "sphinx-book-theme" version = "1.0.1" @@ -2977,4 +2985,4 @@ plot = ["plotly"] [metadata] lock-version = "2.0" python-versions = ">=3.8.1,<4.0" -content-hash = "855debf781d9b061faaf6481cce7e377bcf9416bfeecd9411c104a119d2e66b7" +content-hash = "207cb0246f166fabdd6f69502dc050c632ffa07497da5220b04bbf54b581818e" diff --git a/python/pyproject.toml b/python/pyproject.toml index 7e2625579..440369cb2 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "kaskada" -version = "0.6.0-a.1" +version = "0.6.0-a.2" description = "Kaskada query builder and local execution engine." requires-python = ">=3.8.1,<4.0" classifiers = [ @@ -23,7 +23,7 @@ plot = [ name = "kaskada" description = "Kaskada query builder and local execution engine." authors = [] -version = "0.6.0-a.1" +version = "0.6.0-a.2" [tool.poetry.dependencies] # Dependencies to install for using the project. @@ -49,7 +49,6 @@ optional = true [tool.poetry.group.lint.dependencies] black = { version = ">=21.10b0", extras = ["jupyter"] } -darglint = ">=1.8.1" flake8 = ">=4.0.1" flake8-bugbear = ">=21.9.2" flake8-rst-docstrings = ">=0.2.5" @@ -81,6 +80,7 @@ optional = true [tool.poetry.group.docs.dependencies] sphinx = ">=6.0.0" sphinx-autobuild = ">=2021.3.14" +sphinx-autodoc-typehints = ">=1.23.0" sphinx-book-theme = "^1.0.1" sphinx-copybutton = "^0.5.2" sphinx-design = "^0.5.0" diff --git a/python/pysrc/kaskada/__init__.py b/python/pysrc/kaskada/__init__.py index bc5121cc2..2fc4eda1b 100644 --- a/python/pysrc/kaskada/__init__.py +++ b/python/pysrc/kaskada/__init__.py @@ -7,6 +7,7 @@ from ._execution import ExecutionOptions from ._result import Result from ._session import init_session +from ._timestream import Arg from ._timestream import Literal from ._timestream import Timestream from ._timestream import record @@ -14,6 +15,7 @@ __all__ = [ + "Arg", "ExecutionOptions", "init_session", "Literal", diff --git a/python/pysrc/kaskada/_execution.py b/python/pysrc/kaskada/_execution.py index 4829264dd..6fde445d4 100644 --- a/python/pysrc/kaskada/_execution.py +++ b/python/pysrc/kaskada/_execution.py @@ -6,18 +6,11 @@ class ExecutionOptions: """Execution options for a query. - Attributes - ---------- - row_limit : Optional[int] - The maximum number of rows to return. - If not specified, all rows are returned. - - max_batch_size : Optional[int] - The maximum batch size to use when returning results. - If not specified, the default batch size will be used. - - materialize : bool - If true, the query will be a continuous materialization. + Attributes: + row_limit: The maximum number of rows to return. If not specified, all rows are returned. + max_batch_size: The maximum batch size to use when returning results. + If not specified, the default batch size will be used. + materialize: If true, the query will be a continuous materialization. """ row_limit: Optional[int] = None diff --git a/python/pysrc/kaskada/_result.py b/python/pysrc/kaskada/_result.py index eea4e04ac..c5a138141 100644 --- a/python/pysrc/kaskada/_result.py +++ b/python/pysrc/kaskada/_result.py @@ -17,34 +17,26 @@ def __init__(self, ffi_execution: _ffi.Execution) -> None: self._ffi_execution = ffi_execution def to_pandas(self) -> pd.DataFrame: - """ - Convert the result to a Pandas DataFrame. + """Convert the result to a Pandas DataFrame. - Returns - ------- - pd.DataFrame + Returns: The result as a Pandas DataFrame. - Warnings - -------- - This method will block on the complete results of the query and collect - all results into memory. If this is not desired, use `iter_pandas` instead. + Warnings: + This method will block on the complete results of the query and collect + all results into memory. If this is not desired, use `iter_pandas` instead. """ return self.to_pyarrow().to_pandas() def to_pyarrow(self) -> pa.Table: - """ - Convert the result to a PyArrow Table. + """Convert the result to a PyArrow Table. - Returns - ------- - pa.Table + Returns: The result as a PyArrow Table. - Warnings - -------- - This method will block on the complete results of the query and collect - all results into memory. If this is not desired, use `iter_pyarrow` instead. + Warnings: + This method will block on the complete results of the query and collect + all results into memory. If this is not desired, use `iter_pyarrow` instead. """ batches = self._ffi_execution.collect_pyarrow() if len(batches) == 0: @@ -55,14 +47,7 @@ def to_pyarrow(self) -> pa.Table: return table def iter_pyarrow(self) -> Iterator[pa.RecordBatch]: - """ - Iterate over the results as PyArrow RecordBatches. - - Yields - ------ - pa.RecordBatch - The next RecordBatch. - """ + """Yield the results as PyArrow RecordBatches.""" next_batch = self._ffi_execution.next_pyarrow() while next_batch is not None: # Annoyingly, PyArrow doesn't suport `drop_columns` on batches. @@ -75,39 +60,18 @@ def iter_pyarrow(self) -> Iterator[pa.RecordBatch]: next_batch = self._ffi_execution.next_pyarrow() def iter_pandas(self) -> Iterator[pd.DataFrame]: - """ - Iterate over the results as Pandas DataFrames. - - Yields - ------ - pd.DataFrame - The next Pandas DataFrame. - """ + """Yield the resulting Pandas DataFrames.""" for batch in self.iter_pyarrow(): yield batch.to_pandas() def iter_rows(self) -> Iterator[dict]: - """ - Iterate over the results as row dictionaries. - - Yields - ------ - dict - The next row as a dictionary. - """ + """Yield the resulting rows as dictionaries.""" for batch in self.iter_pyarrow(): for row in batch.to_pylist(): yield row async def iter_pyarrow_async(self) -> AsyncIterator[pa.RecordBatch]: - """ - Asynchronously iterate over the results as PyArrow RecordBatches. - - Yields - ------ - pa.RecordBatch - The next RecordBatch. - """ + """Yield the resulting PyArrow RecordBatches asynchronously.""" next_batch = await self._ffi_execution.next_pyarrow_async() while next_batch is not None: # Annoyingly, PyArrow doesn't suport `drop_columns` on batches. @@ -120,26 +84,12 @@ async def iter_pyarrow_async(self) -> AsyncIterator[pa.RecordBatch]: next_batch = await self._ffi_execution.next_pyarrow_async() async def iter_pandas_async(self) -> AsyncIterator[pd.DataFrame]: - """ - Asynchronously iterate over the results as Pandas DataFrames. - - Yields - ------ - pd.DataFrame - The next Pandas DataFrame. - """ + """Yield the resulting Pandas DataFrames asynchronously.""" async for batch in self.iter_pyarrow_async(): yield batch.to_pandas() async def iter_rows_async(self) -> AsyncIterator[dict]: - """ - Asycnchronously iterate over the results as row dictionaries. - - Yields - ------ - dict - The next row as a dictionary. - """ + """Yield the resulting row dictionaries asynchronously.""" async for batch in self.iter_pyarrow_async(): for row in batch.to_pylist(): yield row diff --git a/python/pysrc/kaskada/_session.py b/python/pysrc/kaskada/_session.py index 8cfa0b970..fcbc5fb1a 100644 --- a/python/pysrc/kaskada/_session.py +++ b/python/pysrc/kaskada/_session.py @@ -9,16 +9,13 @@ def init_session() -> None: - """ - Initialize the Kaskada session for this Python process. + """Initialize the Kaskada session for this Python process. This must only be called once per session. It must be called before any other Kaskada functions are called. - Raises - ------ - RuntimeError - If the session has already been initialized. + Raises: + RuntimeError: If the session has already been initialized. """ global _SESSION if _SESSION is not None: @@ -27,18 +24,12 @@ def init_session() -> None: def _get_session() -> _ffi.Session: - """ - Assert that the session has been initialized. + """Assert that the session has been initialized. - Returns - ------- - _ffi.Session - The FFI session handle. + Returns: The FFI session handle. - Raises - ------ - AssertionError - If the session has not been initialized. + Raises: + AssertionError: If the session has not been initialized. """ global _SESSION assert _SESSION is not None, "Session has not been initialized" diff --git a/python/pysrc/kaskada/_timestream.py b/python/pysrc/kaskada/_timestream.py index fc9c92bce..ba5c8496d 100644 --- a/python/pysrc/kaskada/_timestream.py +++ b/python/pysrc/kaskada/_timestream.py @@ -71,32 +71,22 @@ def _call( *args: Union[Timestream, Literal], session: Optional[_ffi.Session] = None, ) -> Timestream: - """ - Construct a new Timestream by calling the given function. - - Parameters - ---------- - func : str - Name of the function to apply. - *args : Timestream | int | str | float | bool | None - List of arguments to the expression. - session : FFI Session - FFI Session to create the expression in. - If unspecified, will infer from the arguments. - Will fail if all arguments are literals and the session is not provided. - - Returns - ------- - Timestream + """Construct a new Timestream by calling the given function. + + Args: + func: Name of the function to apply. + *args: List of arguments to the expression. + session: FFI Session to create the expression in. + If unspecified, will infer from the arguments. + Will fail if all arguments are literals and the session is not provided. + + Returns: Timestream representing the result of the function applied to the arguments. - Raises - ------ - # noqa: DAR401 _augment_error - TypeError - If the argument types are invalid for the given function. - ValueError - If the argument values are invalid for the given function. + Raises: + # noqa: DAR401 _augment_error + TypeError: If the argument types are invalid for the given function. + ValueError: If the argument values are invalid for the given function. """ if session is None: session = next( @@ -146,56 +136,45 @@ def pipe( *args: Union[Timestream, Literal], **kwargs: Union[Timestream, Literal], ) -> Timestream: - """ - Apply chainable functions that produce Timestreams. - - Parameters - ---------- - func : Callable[..., Timestream] | Tuple[Callable[..., Timestream], str] - Function to apply to this Timestream. Alternatively a `(func, - keyword)` tuple where `keyword` is a string indicating the keyword - of `func` that expects the Timestream. - args : iterable, optional - Positional arguments passed into ``func``. - kwargs : mapping, optional - A dictionary of keyword arguments passed into ``func``. - - Returns - ------- - Timestream + """Apply chainable functions that produce Timestreams. + + Args: + func: Function to apply to this Timestream. + Alternatively a `(func, keyword)` tuple where `keyword` is a string + indicating the keyword of `func` that expects the Timestream. + *args: Positional arguments passed into ``func``. + **kwargs: A dictionary of keyword arguments passed into ``func``. + + Returns: The result of applying `func` to the arguments. - Raises - ------ - ValueError - When using `self` with a specific `keyword` if the `keyword` also - appears on in the `kwargs`. + Raises: + ValueError: When using `self` with a specific `keyword` if the `keyword` also + appears on in the `kwargs`. - Notes - ----- - Use ``.pipe`` when chaining together functions that expect Timestreams. + Notes: + Use ``.pipe`` when chaining together functions that expect Timestreams. - Examples - -------- - Instead of writing + Examples: + Instead of writing - >>> func(g(h(df), arg1=a), arg2=b, arg3=c) # doctest: +SKIP + >>> func(g(h(df), arg1=a), arg2=b, arg3=c) # doctest: +SKIP - You can write + You can write - >>> (df.pipe(h) - >>> .pipe(g, arg1=a) - >>> .pipe(func, arg2=b, arg3=c) - >>> ) # doctest: +SKIP + >>> (df.pipe(h) + >>> .pipe(g, arg1=a) + >>> .pipe(func, arg2=b, arg3=c) + >>> ) # doctest: +SKIP - If you have a function that takes the data as (say) the second - argument, pass a tuple indicating which keyword expects the - data. For example, suppose ``func`` takes its data as ``arg2``: + If you have a function that takes the data as (say) the second + argument, pass a tuple indicating which keyword expects the + data. For example, suppose ``func`` takes its data as ``arg2``: - >>> (df.pipe(h) - >>> .pipe(g, arg1=a) - >>> .pipe((func, 'arg2'), arg1=a, arg3=c) - >>> ) # doctest: +SKIP + >>> (df.pipe(h) + >>> .pipe(g, arg1=a) + >>> .pipe((func, 'arg2'), arg1=a, arg3=c) + >>> ) # doctest: +SKIP """ if isinstance(func, tuple): func, target = func @@ -207,23 +186,14 @@ def pipe( else: return func(self, *args, **kwargs) - def add(self, rhs: Union[Timestream, Literal]) -> Timestream: - """ - Create a Timestream adding this and `rhs`. - - Parameters - ---------- - rhs : Union[Timestream, Literal] - The Timestream or literal value to add to this. + def add(self, rhs: Arg) -> Timestream: + """Return a Timestream adding this and `rhs`. - Returns - ------- - Timestream - The Timestream resulting from `self + rhs`. + Args: + rhs: The Timestream or literal value to add to this. - Notes - ----- - You can also write `a.add(b)` as `a + b`. + Notes: + You can also write `a.add(b)` as `a + b`. """ if isinstance(rhs, timedelta): # Right now, we can't convert a time delta directly to a scalar value (literal). @@ -249,14 +219,7 @@ def __radd__(self, lhs: Union[Timestream, Literal]) -> Timestream: return lhs.add(self) def ceil(self) -> Timestream: - """ - Create a Timestream for the number rounded up to the next largest integer. - - Returns - ------- - Timestream - The Timestream resulting from the numeric ceiling of this. - """ + """Return a Timestream rounding self up to the next largest integer.""" return Timestream._call("ceil", self) def clamp( @@ -264,40 +227,22 @@ def clamp( min: Union[Timestream, Literal, None] = None, max: Union[Timestream, Literal, None] = None, ) -> Timestream: - """ - Create a Timestream clamped between the bounds of min and max. - - Parameters - ---------- - min : Union[Timestream, Literal, None] - The literal value to set as the lower bound - max : Union[Timestream, Literal, None] - The literal value to set as the upper bound - - Returns - ------- - Timestream - The Timestream resulting from the clamped bounds between min and max. + """Return a Timestream from `self` clamped between `min` and `max`. + + Args: + min: The literal value to set as the lower bound. + max: The literal value to set as the upper bound. """ return Timestream._call("clamp", self, min, max) def sub(self, rhs: Union[Timestream, Literal]) -> Timestream: - """ - Create a Timestream substracting `rhs` from this. - - Parameters - ---------- - rhs : Union[Timestream, Literal] - The Timestream or literal value to subtract from this. + """Return a Timestream subtracting `rhs` from this. - Returns - ------- - Timestream - The Timestream resulting from `self - rhs`. + Args: + rhs: The Timestream or literal value to subtract from this. - Notes - ----- - You can also write `a.sub(b)` as `a - b`. + Notes: + You can also write `a.sub(b)` as `a - b`. """ return Timestream._call("sub", self, rhs) @@ -312,44 +257,21 @@ def __rsub__(self, lhs: Union[Timestream, Literal]) -> Timestream: return lhs.sub(self) def exp(self) -> Timestream: - """ - Create a Timestream raising `e` to the power of this. - - Returns - ------- - Timestream - The Timestream resulting from `e^this`. - """ + """Return a Timestream raising `e` to the power of `self`.""" return Timestream._call("exp", self) def floor(self) -> Timestream: - """ - Create a Timestream of the values rounded down to the nearest integer. - - Returns - ------- - Timestream - The Timestream resulting from the numeric floor of this. - """ + """Return a Timestream of self rounded down to the nearest integer.""" return Timestream._call("floor", self) def mul(self, rhs: Union[Timestream, Literal]) -> Timestream: - """ - Create a Timestream multiplying this and `rhs`. - - Parameters - ---------- - rhs : Union[Timestream, Literal] - The Timestream or literal value to multiply with this. + """Return a Timestream multiplying this and `rhs`. - Returns - ------- - Timestream - The Timestream resulting from `self * rhs`. + Args: + rhs: The Timestream or literal value to multiply with this. - Notes - ----- - You can also write `a.mul(b)` as `a * b`. + Notes: + You can also write `a.mul(b)` as `a * b`. """ return Timestream._call("mul", self, rhs) @@ -364,37 +286,21 @@ def __rmul__(self, lhs: Union[Timestream, Literal]) -> Timestream: return lhs.mul(self) def powf(self, power: Union[Timestream, Literal]) -> Timestream: - """ - Create a Timestream raising `this` to the power of `power`. - - Parameters - ---------- - power : Union[Timestream, Literal] - The Timestream or literal value to raise this by. + """Return a Timestream raising `self` to the power of `power`. - Returns - ------- - Timestream: The Timestream resulting from `self ^ power`. + Args: + power: The Timestream or literal value to raise `self` to. """ return Timestream._call("powf", self, power) def div(self, divisor: Union[Timestream, Literal]) -> Timestream: - """ - Create a Timestream by dividing this and `divisor`. - - Parameters - ---------- - divisor : Union[Timestream, Literal] - The Timestream or literal value to divide this by. + """Return a Timestream by dividing this and `divisor`. - Returns - ------- - Timestream - The Timestream resulting from `self / divisor`. + Args: + divisor: The Timestream or literal value to divide this by. - Notes - ----- - You can also write `a.div(b)` as `a / b`. + Notes: + You can also write `a.div(b)` as `a / b`. """ return Timestream._call("div", self, divisor) @@ -409,22 +315,13 @@ def __rtruediv__(self, dividend: Union[Timestream, Literal]) -> Timestream: return dividend.div(self) def lt(self, rhs: Union[Timestream, Literal]) -> Timestream: - """ - Create a Timestream that is true if this is less than `rhs`. - - Parameters - ---------- - rhs : Union[Timestream, Literal] - The Timestream or literal value to compare to. + """Return a Timestream that is true if this is less than `rhs`. - Returns - ------- - Timestream - The Timestream resulting from `self < rhs`. + Args: + rhs: The Timestream or literal value to compare to. - Notes - ----- - You can also write `a.lt(b)` as `a < b`. + Notes: + You can also write `a.lt(b)` as `a < b`. """ return Timestream._call("lt", self, rhs) @@ -433,22 +330,13 @@ def __lt__(self, rhs: Union[Timestream, Literal]) -> Timestream: return self.lt(rhs) def le(self, rhs: Union[Timestream, Literal]) -> Timestream: - """ - Create a Timestream that is true if this is less than or equal to `rhs`. - - Parameters - ---------- - rhs : Union[Timestream, Literal] - The Timestream or literal value to compare to. + """Return a Timestream that is true if this is less than or equal to `rhs`. - Returns - ------- - Timestream - The Timestream resulting from `self <= rhs`. + Args: + rhs: The Timestream or literal value to compare to. - Notes - ----- - You can also write `a.le(b)` as `a <= b`. + Notes: + You can also write `a.le(b)` as `a <= b`. """ return Timestream._call("lte", self, rhs) @@ -457,22 +345,13 @@ def __le__(self, rhs: Union[Timestream, Literal]) -> Timestream: return self.le(rhs) def gt(self, rhs: Union[Timestream, Literal]) -> Timestream: - """ - Create a Timestream that is true if this is greater than `rhs`. - - Parameters - ---------- - rhs : Union[Timestream, Literal] - The Timestream or literal value to compare to. + """Return a Timestream that is true if this is greater than `rhs`. - Returns - ------- - Timestream - The Timestream resulting from `self > rhs`. + Args: + rhs: The Timestream or literal value to compare to. - Notes - ----- - You can also write `a.gt(b)` as `a > b`. + Notes: + You can also write `a.gt(b)` as `a > b`. """ return Timestream._call("gt", self, rhs) @@ -481,22 +360,13 @@ def __gt__(self, rhs: Union[Timestream, Literal]) -> Timestream: return self.gt(rhs) def ge(self, rhs: Union[Timestream, Literal]) -> Timestream: - """ - Create a Timestream that is true if this is greater than or equal to `rhs`. - - Parameters - ---------- - rhs : Union[Timestream, Literal] - The Timestream or literal value to compare to. + """Return a TimeStream that is true if this is greater than or equal to `rhs`. - Returns - ------- - Timestream - The Timestream resulting from `self >= rhs`. + Args: + rhs: The Timestream or literal value to compare to. - Notes - ----- - You can also write `a.ge(b)` as `a >= b`. + Notes: + You can also write `a.ge(b)` as `a >= b`. """ return Timestream._call("gte", self, rhs) @@ -505,85 +375,44 @@ def __ge__(self, rhs: Union[Timestream, Literal]) -> Timestream: return self.ge(rhs) def and_(self, rhs: Union[Timestream, Literal]) -> Timestream: - """ - Create the logical conjunction of this Timestream and `rhs`. + """Return the logical conjunction of this Timestream and `rhs`. - Parameters - ---------- - rhs : Union[Timestream, Literal] - The Timestream or literal value to conjoin with. - - Returns - ------- - Timestream - The Timestream resulting from `self and rhs`. + Args: + rhs: The Timestream or literal value to conjoin with. """ return Timestream._call("logical_and", self, rhs) def or_(self, rhs: Union[Timestream, Literal]) -> Timestream: - """ - Create the logical disjunction of this Timestream and `rhs`. + """Return the logical disjunction of this Timestream and `rhs`. - Parameters - ---------- - rhs : Union[Timestream, Literal] - The Timestream or literal value to disjoin with. - - Returns - ------- - Timestream - The Timestream resulting from `self or rhs`. + Args: + rhs: The Timestream or literal value to disjoin with. """ return Timestream._call("logical_or", self, rhs) def not_(self) -> Timestream: - """ - Create the logical negation of this Timestream. - - Returns - ------- - Timestream - The Timestream resulting from `not self`. - """ + """Return the logical negation of this Timestream.""" return Timestream._call("not", self) def eq(self, other: Union[Timestream, Literal]) -> Timestream: - """ - Create a Timestream that is true if this is equal to `other`. - - Parameters - ---------- - other : Union[Timestream, Literal] - The Timestream or literal value to compare to. + """Return a Timestream that is true if this is equal to `other`. - Returns - ------- - Timestream - The Timestream indicating whether the `self` and `other` are equal. + Args: + other: The Timestream or literal value to compare to. - Note - ---- - Equality is *not* available as `a == b`. + Note: + Equality is *not* available as `a == b`. """ return Timestream._call("eq", self, other) def ne(self, other: Union[Timestream, Literal]) -> Timestream: - """ - Create a Timestream that is true if this is not equal to `other`. - - Parameters - ---------- - other : Union[Timestream, Literal] - The Timestream or literal value to compare to. + """Return a Timestream that is true if this is not equal to `other`. - Returns - ------- - Timestream - The Timestream indicating whether `self` and `other` are not equal. + Args: + other: The Timestream or literal value to compare to. - Note - ---- - Inequality is *not* available as `a != b`. + Note: + Inequality is *not* available as `a != b`. """ return Timestream._call("neq", self, other) @@ -604,31 +433,23 @@ def __ne__(self, other: object) -> bool: return super().__ne__(other) def index(self, key: Union[Timestream, Literal]) -> Timestream: - """ - Index into the elements of a Timestream. + """Return a Timestream indexing into the elements of `self`. If the Timestream contains lists, the key should be an integer index. If the Timestream contains maps, the key should be the same type as the map keys. - Parameters - ---------- - key : Union[Timestream, Literal] - The key to index into the expression. + Args: + key: The key to index into the expression. - Raises - ------ - TypeError - When the Timestream is not a record, list, or map. + Raises: + TypeError: When the Timestream is not a record, list, or map. - Returns - ------- - Timestream + Returns: Timestream with the resulting value (or `null` if absent) at each point. - Note - ---- - Indexing may be written using the operator `self[key]` instead of `self.index(key)`. + Note: + Indexing may be written using the operator `self[key]` instead of `self.index(key)`. """ data_type = self.data_type if isinstance(data_type, pa.MapType): @@ -639,43 +460,21 @@ def index(self, key: Union[Timestream, Literal]) -> Timestream: raise TypeError(f"Cannot index into {data_type}") def __getitem__(self, key: Union[Timestream, Literal]) -> Timestream: - """ - Index into a list or map Timestrem. + """Implement `self[key]` using `index`. - Parameters - ---------- - key : Union[Timestream, Literal] - The key to index into the expression. - - Returns - ------- - Timestream - Timestream with the resulting value (or `null` if absent) at each point. - - See Also - -------- - index + See Also: + index """ return self.index(key) def col(self, name: str) -> Timestream: - """ - Access a named column or field of a Timestream. + """Return a Timestream accessing the named column or field of `self`. - Parameters - ---------- - name : str - The name of the column or field to access. - - Returns - ------- - Timestream - Timestream with the resulting value (or `null` if absent) at each point. + Args: + name: The name of the column or field to access. - Raises - ------ - TypeError - When the Timestream is not a record. + Raises: + TypeError: When the Timestream is not a record. """ data_type = self.data_type if isinstance(data_type, pa.StructType) or isinstance(data_type, pa.ListType): @@ -686,55 +485,31 @@ def col(self, name: str) -> Timestream: ) def select(self, *args: str) -> Timestream: - """ - Select the given fields from a Timestream of records. - - Parameters - ---------- - args : list[str] - List of field names to select. + """Return a Timestream selecting the given fields from `self`. - Returns - ------- - Timestream - Timestream with the same records limited to the specified fields. + Args: + *args: The field names to select. """ return Timestream._call("select_fields", self, *args) def remove(self, *args: str) -> Timestream: - """ - Remove the given fileds from a Timestream of records. - - Parameters - ---------- - args : list[str] - List of field names to exclude. + """Return a Timestream removing the given fields from `self`. - Returns - ------- - Timestream - Timestream with the same records and the given fields excluded. + Args: + *args: The field names to remove. """ return Timestream._call("remove_fields", self, *args) def extend( self, fields: Mapping[str, Arg] | Callable[[Timestream], Mapping[str, Arg]] ) -> Timestream: - """ - Extend this Timestream of records with additional fields. + """Return a Timestream containing fields from `self` and `fields`. If a field exists in the base Timestream and the `fields`, the value from the `fields` will be taken. - Parameters - ---------- - fields : Mapping[str, Arg] | Callable[[Timestream], Mapping[str, Arg]] - Fields to add to each record in the Timestream. - - Returns - ------- - Timestream - Timestream with the given fields added. + Args: + fields: Fields to add to each record in the Timestream. """ # This argument order is weird, and we shouldn't need to make a record # in order to do the extension. @@ -744,51 +519,22 @@ def extend( return Timestream._call("extend_record", extension, self) def neg(self) -> Timestream: - """ - Create a Timestream from the numeric negation of self. - - Returns - ------- - Timestream - Timestream of the numeric negation of self. - """ + """Return a Timestream from the numeric negation of self.""" return Timestream._call("neg", self) def is_null(self) -> Timestream: - """ - Create a boolean Timestream containing `true` when self is `null`. - - Returns - ------- - Timestream - Timestream with `true` when self is `null` and `false` when it isn't. - """ + """Return a boolean Timestream containing `true` when `self` is `null`.""" return self.is_not_null().not_() def is_not_null(self) -> Timestream: - """ - Create a boolean Timestream containing `true` when self is not `null`. - - Returns - ------- - Timestream - Timestream with `true` when self is not `null` and `false` when it is. - """ + """Return a boolean Timestream containing `true` when `self` is not `null`.""" return Timestream._call("is_valid", self) def filter(self, condition: Timestream) -> Timestream: - """ - Create a Timestream containing only the points where `condition` is `true`. - - Parameters - ---------- - condition : Timestream - The condition to filter on. + """Return a Timestream containing only the points where `condition` is `true`. - Returns - ------- - Timestream - Timestream containing `self` where `condition` is `true`. + Args: + condition: The condition to filter on. """ return Timestream._call("when", condition, self) @@ -799,27 +545,20 @@ def collect( min: Optional[int] = 0, window: Optional[kd.windows.Window] = None, ) -> Timestream: - """ - Create a Timestream collecting up to the last `max` values in the `window`. + """Return a Timestream collecting up to the last `max` values in the `window`. Collects the values for each key separately. - Parameters - ---------- - max : Optional[int] - The maximum number of values to collect. - If `None` all values are collected. - min: Optional[int] - The minimum number of values to collect before - producing a value. Defaults to 0. - window : Optional[Window] - The window to use for the aggregation. - If not specified, the entire Timestream is used. - - Returns - ------- - Timestream - Timestream containing the collected list at each point. + Args: + max: The maximum number of values to collect. + If `None` all values are collected. + min: The minimum number of values to collect before producing a value. + Defaults to 0. + window: The window to use for the aggregation. If not specified, + the entire Timestream is used. + + Returns: + A Timestream containing the list of collected elements at each point. """ if pa.types.is_list(self.data_type): return ( @@ -831,80 +570,39 @@ def collect( return _aggregation("collect", self, window, max, min) def time(self) -> Timestream: - """ - Create a Timestream containing the time of each point. - - Returns - ------- - Timestream - Timestream containing the time of each point. - """ + """Return a Timestream containing the time of each point.""" return Timestream._call("time_of", self) def lag(self, n: int) -> Timestream: - """ - Create a Timestream containing the value `n` points before each point. + """Return a Timestream containing the value `n` points before each point. - Parameters - ---------- - n : int - The number of points to lag by. - - Returns - ------- - Timestream - Timestream containing the value `n` points before each point. + Args: + n: The number of points to lag by. """ # hack to support structs/lists (as collect supports lists) return self.collect(max=n + 1, min=n + 1)[0] def if_(self, condition: Union[Timestream, Literal]) -> Timestream: - """ - Return `self` where `condition` is `true`, or `null` otherwise. - - Parameters - ---------- - condition : Union[Timestream, Literal] - The condition to check. - - Returns - ------- - Timestream - Timestream containing the value of `self` where `condition` is `true`, or - `null` otherwise. + """Return a `Timestream` from `self` at points where `condition` is `true`, and `null` otherwise. + + Args: + condition: The condition to check. """ return Timestream._call("if", condition, self) def null_if(self, condition: Union[Timestream, Literal]) -> Timestream: - """ - Return `self` where `condition` is `false`, or `null` otherwise. - - Parameters - ---------- - condition : Union[Timestream, Literal] - The condition to check. - - Returns - ------- - Timestream - Timestream containing the value of `self` where `condition` is `false`, or - `null` otherwise. + """Return a `Timestream` from `self` at points where `condition` is not `false`, and `null` otherwise. + + Args: + condition: The condition to check. """ return Timestream._call("null_if", condition, self) def length(self) -> Timestream: - """ - Create a Timestream containing the length of `self`. + """Return a Timestream containing the length of `self`. - Returns - ------- - Timestream - Timestream containing the length of `self`. - - Raises - ------ - TypeError - When the Timestream is not a string or list. + Raises: + TypeError: When the Timestream is not a string or list. """ if self.data_type.equals(pa.string()): return Timestream._call("len", self) @@ -914,89 +612,54 @@ def length(self) -> Timestream: raise TypeError(f"length not supported for {self.data_type}") def with_key(self, key: Timestream, grouping: Optional[str] = None) -> Timestream: - """ - Create a Timestream with a new grouping by `key`. - - Parameters - ---------- - key : Timestream - The new key to use for the grouping. - grouping : Optional[str] - A string literal naming the new grouping. If no `grouping` is specified, - one will be computed from the type of the `key`. - - Returns - ------- - Timestream - Timestream with a new grouping by `key`. + """Return a Timestream with a new grouping by `key`. + + Args: + key: The new key to use for the grouping. + grouping: A string literal naming the new grouping. If no `grouping` is specified, + one will be computed from the type of the `key`. """ return Timestream._call("with_key", key, self, grouping) def lookup(self, key: Union[Timestream, Literal]) -> Timestream: - """ - Lookup the value of `self` for each `key` at the times in `key`. + """Return a Timestream looking up the value of `self` for each `key`. For each non-`null` point in the `key` timestream, returns the value from `self` at that time and associated with that `key`. Returns `null` if the `key` is `null` or if there is no `value` computed for that key at the corresponding time. - Parameters - ---------- - key : Union[Timestream, Literal] - The foreign key to lookup. - This must match the type of the keys in `self`. - - Returns - ------- - Timestream - Timestream containing the lookup join between the `key` and `self`. + Args: + key: The foreign key to lookup. This must match the type of the keys in `self`. """ return Timestream._call("lookup", key, self) def coalesce( self, arg: Union[Timestream, Literal], *args: Union[Timestream, Literal] ) -> Timestream: - """ - Create a Timestream for returning the first non-null value or null if all values are null. - - Parameters - ---------- - arg : Union[Timestream, Literal] - The next value to be coalesced (required). - args : Union[Timestream, Literal] - Additional values to be coalesced (optional). - - Returns - ------- - Timestream - Timestream containing the first non-null value from that row. + """Return a Timestream containing the first non-null point from self and the arguments. + + Args: + arg: The next value to be coalesced (required). + args: Additional values to be coalesced (optional). + + Returns: + Timestream containing the first non-null value from each point. If all values are null, then returns null. """ return Timestream._call("coalesce", self, arg, *args) def shift_to(self, time: Union[Timestream, datetime]) -> Timestream: - """ - Create a Timestream shifting each point forward to `time`. + """Return a Timestream shifting each point forward to `time`. If multiple values are shifted to the same time, they will be emitted in the order in which they originally occurred. - Parameters - ---------- - time : Union[Timestream, datetime] - The time to shift to. - This must be a datetime or a Timestream of timestamp_ns. - - Returns - ------- - Timestream - Timestream containing the shifted points. - - Raises - ------ - NotImplementedError - When `time` is a datetime (shift_to literal not yet implemented). + Args: + time: The time to shift to. This must be a datetime or a Timestream of timestamp_ns. + + Raises: + NotImplementedError: When `time` is a datetime (shift_to literal not yet implemented). """ if isinstance(time, datetime): # session = self._ffi_expr.session() @@ -1009,21 +672,13 @@ def shift_to(self, time: Union[Timestream, datetime]) -> Timestream: return Timestream._call("shift_to", time, self) def shift_by(self, delta: Union[Timestream, timedelta]) -> Timestream: - """ - Create a Timestream shifting each point forward by the `delta`. + """Return a Timestream shifting each point forward by the `delta`. If multiple values are shifted to the same time, they will be emitted in the order in which they originally occurred. - Parameters - ---------- - delta : Union[Timestream, timedelta] - The delta to shift the point forward by. - - Returns - ------- - Timestream - Timestream containing the shifted points. + Args: + delta: The delta to shift the point forward by. """ if isinstance(delta, timedelta): session = self._ffi_expr.session() @@ -1035,8 +690,7 @@ def shift_by(self, delta: Union[Timestream, timedelta]) -> Timestream: return Timestream._call("shift_by", delta, self) def shift_until(self, predicate: Timestream) -> Timestream: - """ - Shift points from `self` forward to the next time `predicate` is true. + """Return a Timestream shifting each point forward to the next time `predicate` is true. Note that if the `predicate` evaluates to true at the same time as `self`, the point will be emitted at that time. @@ -1044,268 +698,138 @@ def shift_until(self, predicate: Timestream) -> Timestream: If multiple values are shifted to the same time, they will be emitted in the order in which they originally occurred. - Parameters - ---------- - predicate : Timestream - The predicate to determine whether to emit shifted rows. - - Returns - ------- - Timestream - Timestream containing the shifted points. + Args: + predicate: The predicate to determine whether to emit shifted rows. """ return Timestream._call("shift_until", predicate, self) def sum(self, *, window: Optional[kd.windows.Window] = None) -> Timestream: - """ - Create a Timestream summing the values in the `window`. + """Return a Timestream summing the values in the `window`. Computes the sum for each key separately. - Parameters - ---------- - window : Optional[Window] - The window to use for the aggregation. - If not specified, the entire Timestream is used. - - Returns - ------- - Timestream - Timestream containing the sum up to and including each point. + Args: + window: The window to use for the aggregation. Defaults to the entire Timestream. """ return _aggregation("sum", self, window) def first(self, *, window: Optional[kd.windows.Window] = None) -> Timestream: - """ - Create a Timestream containing the first value in the `window`. + """Return a Timestream containing the first value in the `window`. Computed for each key separately. - Parameters - ---------- - window : Optional[Window] - The window to use for the aggregation. - If not specified, the entire Timestream is used. - - Returns - ------- - Timestream - Timestream containing the first value for the key in the window for - each point. + Args: + window: The window to use for the aggregation. Defaults to the entire Timestream. """ return _aggregation("first", self, window) def last(self, window: Optional[kd.windows.Window] = None) -> Timestream: - """ - Create a Timestream containing the last value in the `window`. + """Return a Timestream containing the last value in the `window`. Computed for each key separately. - Parameters - ---------- - window : Optional[Window] - The window to use for the aggregation. - If not specified, the entire Timestream is used. - - Returns - ------- - Timestream - Timestream containing the last value for the key in the window for - each point. + Args: + window: The window to use for the aggregation. Defaults to the entire Timestream. """ return _aggregation("last", self, window) def count(self, window: Optional[kd.windows.Window] = None) -> Timestream: - """ - Create a Timestream containing the count value in the `window`. + """Return a Timestream containing the count value in the `window`. Computed for each key separately. - Parameters - ---------- - window : Optional[Window] - The window to use for the aggregation. - If not specified, the entire Timestream is used. - - Returns - ------- - Timestream - Timestream containing the count value for the key in the window for - each point. + Args: + window: The window to use for the aggregation. Defaults to the entire Timestream. """ return _aggregation("count", self, window) def count_if(self, window: Optional[kd.windows.Window] = None) -> Timestream: - """ - Create a Timestream containing the count of `true` values in `window`. + """Return a Timestream containing the count of `true` values in `window`. Computed for each key separately. - Parameters - ---------- - window : Optional[Window] - The window to use for the aggregation. - If not specified, the entire Timestream is used. - - Returns - ------- - Timestream - Timestream containing the count value if true for the key in the window for - each point. + Args: + window: The window to use for the aggregation. Defaults to the entire Timestream. """ return _aggregation("count_if", self, window) def max(self, window: Optional[kd.windows.Window] = None) -> Timestream: - """ - Create a Timestream containing the max value in the `window`. + """Return a Timestream containing the max value in the `window`. Computed for each key separately. - Parameters - ---------- - window : Optional[Window] - The window to use for the aggregation. - If not specified, the entire Timestream is used. - - Returns - ------- - Timestream - Timestream containing the max value for the key in the window for - each point. + Args: + window: The window to use for the aggregation. Defaults to the entire Timestream. """ return _aggregation("max", self, window) def min(self, window: Optional[kd.windows.Window] = None) -> Timestream: - """ - Create a Timestream containing the min value in the `window`. + """Return a Timestream containing the min value in the `window`. Computed for each key separately. - Parameters - ---------- - window : Optional[Window] - The window to use for the aggregation. - If not specified, the entire Timestream is used. - - Returns - ------- - Timestream - Timestream containing the min value for the key in the window for - each point. + Args: + window: The window to use for the aggregation. Defaults to the entire Timestream. """ return _aggregation("min", self, window) def mean(self, window: Optional[kd.windows.Window] = None) -> Timestream: - """ - Create a Timestream containing the mean value in the `window`. + """Return a Timestream containing the mean value in the `window`. Computed for each key separately. - Parameters - ---------- - window : Optional[Window] - The window to use for the aggregation. - If not specified, the entire Timestream is used. - - Returns - ------- - Timestream - Timestream containing the mean value for the key in the window for - each point. + Args: + window: The window to use for the aggregation. Defaults to the entire Timestream. """ return _aggregation("mean", self, window) def stddev(self, window: Optional[kd.windows.Window] = None) -> Timestream: - """ - Create a Timestream containing the standard deviation in the `window`. + """Return a Timestream containing the standard deviation in the `window`. Computed for each key separately. - Parameters - ---------- - window : Optional[Window] - The window to use for the aggregation. - If not specified, the entire Timestream is used. - - Returns - ------- - Timestream - Timestream containing the standard deviation for the key in the window for - each point. + Args: + window: The window to use for the aggregation. Defaults to the entire Timestream. """ return _aggregation("stddev", self, window) def variance(self, window: Optional[kd.windows.Window] = None) -> Timestream: - """ - Create a Timestream containing the variance in the `window`. + """Return a Timestream containing the variance in the `window`. Computed for each key separately. - Parameters - ---------- - window : Optional[Window] - The window to use for the aggregation. - If not specified, the entire Timestream is used. - - Returns - ------- - Timestream - Timestream containing the variance for the key in the window for - each point. + Args: + window: The window to use for the aggregation. Defaults to the entire Timestream. """ return _aggregation("variance", self, window) def cast(self, data_type: pa.DataType) -> Timestream: - """ - Cast the type of this Timestream to the given data type. - - Parameters - ---------- - data_type : pa.DataType - The data type to cast to. + """Return this Timestream cast to the given data type. - Returns - ------- - Timestream - Timestream with the given data type. + Args: + data_type: The DataType to cast to. """ return Timestream(self._ffi_expr.cast(data_type)) def else_(self, other: Timestream) -> Timestream: - """ - Return `self` if not `null` otherwise `other`. - - Parameters - ---------- - other : Timestream - The Timestream to use if self is `null`. + """Return a Timestream containing `self` when not `null`, and `other` otherwise. - Returns - ------- - Timestream - Timestream containing the value of `self` not `null` otherwise `other`. + Args: + other: The Timestream to use if self is `null`. """ return Timestream._call("else", other, self) def seconds_since(self, time: Union[Timestream, Literal]) -> Timestream: - """ - Return a Timestream containing seconds between `time` and `self`. - - Parameters - ---------- - time : Union[Timestream, Literal] - The time to compute the seconds since. + """Return a Timestream containing seconds between `time` and `self`. - This can be either a stream of timestamps or a datetime literal. - If `time` is a Timestream, the result will contain the seconds - from `self.time()` to `time.time()` for each point. + If `self.time()` is greater than `time`, the result will be positive. - Returns - ------- - Timestream - Timestream containing the number of seconds since `time`. + Args: + time: The time to compute the seconds since. - If `self.time()` is greater than `time`, the result will be positive. + This can be either a stream of timestamps or a datetime literal. + If `time` is a Timestream, the result will contain the seconds + from `self.time()` to `time.time()` for each point. """ if isinstance(time, datetime): session = self._ffi_expr.session() @@ -1316,22 +840,13 @@ def seconds_since(self, time: Union[Timestream, Literal]) -> Timestream: return Timestream._call("seconds_between", time, self) def seconds_since_previous(self, n: int = 1) -> Timestream: - """ - Return a Timestream containing seconds between `self` and the time `n` points ago. - - Parameters - ---------- - n : int - The number of points to look back. For example, `n=1` refers to - the previous point. + """Return a Timestream containing seconds between `self` and the time `n` points ago. - Defaults to 1 (the previous point). + Args: + n: The number of points to look back. For example, `n=1` refers to + the previous point. - Returns - ------- - Timestream - Timestream containing the number of seconds since the time `n` - points ago. + Defaults to 1 (the previous point). """ time_of_current = Timestream._call("time_of", self).cast(pa.int64()) time_of_previous = Timestream._call("time_of", self).lag(n).cast(pa.int64()) @@ -1344,87 +859,57 @@ def flatten(self) -> Timestream: return Timestream._call("flatten", self) def union(self, other: Timestream) -> Timestream: - """ - Union the lists in this timestream with the lists in the other Timestream. - - This correspons to a pair-wise union within each row of the timestreams. + """Union the lists in this timestream with the lists in the other Timestream. - Parameters - ---------- - other : Timestream - The Timestream of lists to union with. + This corresponds to a pair-wise union within each row of the timestreams. - Returns - ------- - Timestream - Timestream containing the union of the lists. + Args: + other: The Timestream of lists to union with. """ return Timestream._call("union", self, other) def record(self, fields: Callable[[Timestream], Mapping[str, Arg]]) -> Timestream: - """ - Create a record Timestream from fields computed from this timestream. - - Parameters - ---------- - fields : Callable[[Timestream], Mapping[str, Arg]] - The fields to include in the record. - - Returns - ------- - Timestream - Timestream containing records with the given fields. - - See Also - -------- - kaskada.record: Function for creating a record from one or more - timestreams. + """Return a record Timestream from fields computed from this timestream. + + Args: + fields: The fields to include in the record. + + See Also: + kaskada.record: Function for creating a record from one or more + timestreams. """ return record(fields(self)) def preview(self, limit: int = 100) -> pd.DataFrame: - """ - Return the first N rows of the result as a Pandas DataFrame. + """Return the first N rows of the result as a Pandas DataFrame. This makes it easy to preview the content of the Timestream. - Parameters - ---------- - limit : int - Maximum number of rows to print. + Args: + limit: Maximum number of rows to print. - Returns - ------- - pd.DataFrame + Returns: The Pandas DataFrame containing the first `limit` points. """ return self.run(row_limit=limit).to_pandas() def run( self, + *, row_limit: Optional[int] = None, max_batch_size: Optional[int] = None, materialize: bool = False, ) -> Result: - """ - Run the Timestream once. + """Run the Timestream. - Parameters - ---------- - row_limit : Optional[int] - The maximum number of rows to return. - If not specified all rows are returned. + Args: + row_limit: The maximum number of rows to return. + If not specified all rows are returned. + max_batch_size: The maximum number of rows to return in each batch. + If not specified the default is used. + materialize: If true, the execution will be a continuous materialization. - max_batch_size : Optional[int] - The maximum number of rows to return in each batch. - If not specified the default is used. - - materialize : bool - If true, the execution will be a continuous materialization. - - Returns - ------- - Result + Returns: The `Result` object to use for accessing the results. """ expr = self @@ -1444,29 +929,16 @@ def _aggregation( window: Optional[kd.windows.Window], *args: Union[Timestream, Literal], ) -> Timestream: - """ - Create the aggregation `op` with the given `input`, `window` and `args`. - - Parameters - ---------- - op : str - The operation to create. - input : Timestream - The input to the aggregation. - window : Optional[Window] - The window to use for the aggregation. - *args : Union[Timestream, Literal] - Additional arguments to provide after `input` and before the flattened window. - - Returns - ------- - Timestream - The resulting Timestream. - - Raises - ------ - NotImplementedError - If the window is not a known type. + """Return the aggregation `op` with the given `input`, `window` and `args`. + + Args: + op: The operation to create. + input: The input to the aggregation. + window: The window to use for the aggregation. + *args: Additional arguments to provide after `input` and before the flattened window. + + Raises: + NotImplementedError: If the window is not a known type. """ if window is None: return Timestream._call(op, input, *args, None, None) @@ -1507,23 +979,14 @@ def _aggregation( def record(fields: Mapping[str, Arg]) -> Timestream: - """ - Create a record Timestream from the given fields. - - Parameters - ---------- - fields : dict[str, Timestream] - The fields to include in the record. - - Returns - ------- - Timestream - Timestream containing records with the given fields. - - See Also - -------- - Timestream.record: Method for creating a record from fields computed from - a timestream. + """Return a record Timestream from the given fields. + + Args: + fields: The fields to include in the record. + + See Also: + Timestream.record: Method for creating a record from fields computed from + a timestream. """ import itertools diff --git a/python/pysrc/kaskada/plot.py b/python/pysrc/kaskada/plot.py index 68208db09..e4446fa8c 100644 --- a/python/pysrc/kaskada/plot.py +++ b/python/pysrc/kaskada/plot.py @@ -25,16 +25,11 @@ def _require_plotly() -> None: @dataclass class Plot(object): - """ - Configuration for a single plot to render. - - Parameters - ---------- - stream : Timestream - The Timestream to render. - name: str - The name of the plot to render. - Defaults to `Result` if not set. + """Configuration for a single plot to render. + + Attributes: + stream: The Timestream to render. + name: The name of the plot to render. Defaults to `Result` if not set. """ stream: Timestream diff --git a/python/pysrc/kaskada/sources/arrow.py b/python/pysrc/kaskada/sources/arrow.py index 48ddb11dd..40ea1f7a1 100644 --- a/python/pysrc/kaskada/sources/arrow.py +++ b/python/pysrc/kaskada/sources/arrow.py @@ -28,31 +28,20 @@ def __init__( grouping_name: Optional[str] = None, time_unit: Optional[TimeUnit] = None, ) -> None: - """ - Create a source reading Pandas DataFrames. - - Parameters - ---------- - dataframe : pd.DataFrame - The DataFrame to start from. - time_column : str - The name of the column containing the time. - key_column : str - The name of the column containing the key. - subsort_column : str, optional - The name of the column containing the subsort. - If not provided, the subsort will be assigned by the system. - schema : pa.Schema, optional - The schema to use. - If not specified, it will be inferred from the `dataframe`. - grouping_name : str, optional - The name of the groups associated with each key. - This is used to ensure implicit joins are only performed between - sources with compatible groupings. - time_unit : str, optional - The unit of the time column. - One of `ns`, `us`, `ms`, or `s`. - If not specified (and not specified in the `dataframe`), nanosecond will be assumed. + """Create a source reading Pandas DataFrames. + + Args: + dataframe: The DataFrame to start from. + time_column: The name of the column containing the time. + key_column: The name of the column containing the key. + subsort_column: The name of the column containing the subsort. + If not provided, the subsort will be assigned by the system. + schema: The schema to use. If not provided, it will be inferred from the input. + grouping_name: The name of the group associated with each key. + This is used to ensure implicit joins are only performed between data grouped + by the same entity. + time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`. + If not specified (and not specified in the data), nanosecond will be assumed. """ if schema is None: schema = pa.Schema.from_pandas(dataframe) @@ -87,31 +76,20 @@ def __init__( grouping_name: Optional[str] = None, time_unit: Optional[TimeUnit] = None, ) -> None: - """ - Create a source reading from rows represented as dicts. - - Parameters - ---------- - rows : dict | list[dict] - One or more represented as dicts. - time_column : str - The name of the column containing the time. - key_column : str - The name of the column containing the key. - subsort_column : str, optional - The name of the column containing the subsort. - If not provided, the subsort will be assigned by the system. - schema : pa.Schema, optional - The schema to use. - If not specified, it will be inferred from the `rows`. - grouping_name : str, optional - The name of the groups associated with each key. - This is used to ensure implicit joins are only performed between - sources with compatible groupings. - time_unit : str, optional - The unit of the time column. - One of `ns`, `us`, `ms`, or `s`. - If not specified nanosecond will be assumed. + """Create a source reading from rows represented as dicts. + + Args: + rows: One or more rows represented as dicts. + time_column: The name of the column containing the time. + key_column: The name of the column containing the key. + subsort_column: The name of the column containing the subsort. + If not provided, the subsort will be assigned by the system. + schema: The schema to use. If not provided, it will be inferred from the input. + grouping_name: The name of the group associated with each key. + This is used to ensure implicit joins are only performed between data grouped + by the same entity. + time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`. + If not specified (and not specified in the data), nanosecond will be assumed. """ if schema is None: schema = pa.Table.from_pylist(rows).schema @@ -152,31 +130,20 @@ def __init__( grouping_name: Optional[str] = None, time_unit: Optional[TimeUnit] = None, ) -> None: - """ - Create a CSV String Source. - - Parameters - ---------- - csv_string : str - The CSV string to start from. - time_column : str - The name of the column containing the time. - key_column : str - The name of the column containing the key. - subsort_column : str, optional - The name of the column containing the subsort. - If not provided, the subsort will be assigned by the system. - schema : pa.Schema, optional - The schema to use. - If not specified, it will be inferred from the `csv_string`. - grouping_name : str, optional - The name of the groups associated with each key. - This is used to ensure implicit joins are only performed between - sources with compatible groupings. - time_unit : str, optional - The unit of the time column. - One of `ns`, `us`, `ms`, or `s`. - If not specified nanosecond will be assumed. + """Create a CSV String Source. + + Args: + csv_string: The CSV string to start from. + time_column: The name of the column containing the time. + key_column: The name of the column containing the key. + subsort_column: The name of the column containing the subsort. + If not provided, the subsort will be assigned by the system. + schema: The schema to use. If not provided, it will be inferred from the input. + grouping_name: The name of the group associated with each key. + This is used to ensure implicit joins are only performed between data grouped + by the same entity. + time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`. + If not specified (and not specified in the data), nanosecond will be assumed. """ if isinstance(csv_string, str): csv_string = BytesIO(csv_string.encode("utf-8")) @@ -221,31 +188,20 @@ def __init__( grouping_name: Optional[str] = None, time_unit: Optional[TimeUnit] = None, ) -> None: - """ - Create a JSON String Source. - - Parameters - ---------- - json_string : str - The line-delimited JSON string to start from. - time_column : str - The name of the column containing the time. - key_column : str - The name of the column containing the key. - subsort_column : str, optional - The name of the column containing the subsort. - If not provided, the subsort will be assigned by the system. - schema : pa.Schema, optional - The schema to use. - If not specified, it will be inferred from the JSON records. - grouping_name : str, optional - The name of the groups associated with each key. - This is used to ensure implicit joins are only performed between - sources with compatible groupings. - time_unit : str, optional - The unit of the time column. - One of `ns`, `us`, `ms`, or `s`. - If not specified nanosecond will be assumed. + """Create a JSON String Source. + + Args: + json_string: The line-delimited JSON string to start from. + time_column: The name of the column containing the time. + key_column: The name of the column containing the key. + subsort_column: The name of the column containing the subsort. + If not provided, the subsort will be assigned by the system. + schema: The schema to use. If not provided, it will be inferred from the input. + grouping_name: The name of the group associated with each key. + This is used to ensure implicit joins are only performed between data grouped + by the same entity. + time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`. + If not specified (and not specified in the data), nanosecond will be assumed. """ if isinstance(json_string, str): json_string = BytesIO(json_string.encode("utf-8")) @@ -287,31 +243,21 @@ def __init__( grouping_name: Optional[str] = None, time_unit: Optional[TimeUnit] = None, ) -> None: - """ - Create a Parquet source. - - Parameters - ---------- - path : str - The path to the Parquet file to add. - time_column : str - The name of the column containing the time. - key_column : str - The name of the column containing the key. - subsort_column : str, optional - The name of the column containing the subsort. - If not provided, the subsort will be assigned by the system. - schema : pa.Schema, optional - The schema to use. - If not specified, it will be inferred from the Parquet file. - grouping_name : str, optional - The name of the groups associated with each key. - This is used to ensure implicit joins are only performed between - sources with compatible groupings. - time_unit : str, optional - The unit of the time column. - One of `ns`, `us`, `ms`, or `s`. - If not specified nanosecond will be assumed. + """Create a Parquet source. + + Args: + path: The path to the Parquet file to add. + dataframe: The DataFrame to start from. + time_column: The name of the column containing the time. + key_column: The name of the column containing the key. + subsort_column: The name of the column containing the subsort. + If not provided, the subsort will be assigned by the system. + schema: The schema to use. If not provided, it will be inferred from the input. + grouping_name: The name of the group associated with each key. + This is used to ensure implicit joins are only performed between data grouped + by the same entity. + time_unit: The unit of the time column. One of `ns`, `us`, `ms`, or `s`. + If not specified (and not specified in the data), nanosecond will be assumed. """ if schema is None: schema = pa.parquet.read_schema(path) diff --git a/python/pysrc/kaskada/windows.py b/python/pysrc/kaskada/windows.py index 7bbfc9f25..e269ca3d1 100644 --- a/python/pysrc/kaskada/windows.py +++ b/python/pysrc/kaskada/windows.py @@ -16,104 +16,56 @@ class Window(object): @dataclass(frozen=True) class Since(Window): - """ - Window since the last time a predicate was true. + """Window since the last time a predicate was true. Aggregations will contain all values starting from the last time the predicate evaluated to true (inclusive). - - Parameters - ---------- - predicate : Timestream | Callable[..., Timestream] | bool - The boolean Timestream to use as predicate for the window. - Each time the predicate evaluates to true the window will be cleared. - - The predicate may be a callable which returns the boolean Timestream, in - which case it is applied to the Timestream being aggregated. """ + #: The boolean Timestream to use as predicate for the window. + #: Each time the predicate evaluates to true the window will be cleared. + #: + #: The predicate may be a callable which returns the boolean Timestream, in + #: which case it is applied to the Timestream being aggregated. predicate: Timestream | Callable[..., Timestream] | bool @staticmethod def minutely() -> Since: - """ - Return a window since the start of each minute. - - Returns - ------- - Since - Window since the start of each minute. - """ + """Return a window since the start of each minute.""" return Since(predicate=lambda domain: Timestream._call("minutely", domain)) @staticmethod def hourly() -> Since: - """ - Return a window since the start of each hour. - - Returns - ------- - Since - Window since the start of each hour. - """ + """Return a window since the start of each hour.""" return Since(predicate=lambda domain: Timestream._call("hourly", domain)) @staticmethod def daily() -> Since: - """ - Return a window since the start of each day. - - Returns - ------- - Since - Window since the start of each day. - """ + """Return a window since the start of each day.""" return Since(predicate=lambda domain: Timestream._call("daily", domain)) @staticmethod def monthly() -> Since: - """ - Return a window since the start of each month. - - Returns - ------- - Since - Window since the start of each month. - """ + """Return a window since the start of each month.""" return Since(predicate=lambda domain: Timestream._call("monthly", domain)) @staticmethod def yearly() -> Since: - """ - Return a window since the start of each year. - - Returns - ------- - Since - Window since the start of each year. - """ + """Return a window since the start of each year.""" return Since(predicate=lambda domain: Timestream._call("yearly", domain)) @dataclass(frozen=True) class Sliding(Window): - """ - Window for the last `duration` intervals of some `predicate`. - - Parameters - ---------- - duration : int - The number of sliding intervals to use in the window. - - predicate : Timestream | Callable[..., Timestream] | bool - The boolean Timestream to use as predicate for the window - Each time the predicate evaluates to true the window starts a new interval. - - The predicate may be a callable which returns the boolean Timestream, in - which case it is applied to the Timestream being aggregated. - """ + """Window for the last `duration` intervals of some `predicate`.""" + #: The number of sliding intervals to use in the window. duration: int + #: The boolean Timestream to use as a predicate for the window. + #: Each time the predicate evaluates to true the window starts a new interval. + #: + #: The predicate may be a callable which returns the boolean Timestream, in + #: which case it is applied to the Timestream being aggregated. predicate: Timestream | Callable[..., Timestream] | bool def __post_init__(self): @@ -123,18 +75,10 @@ def __post_init__(self): @staticmethod def minutely(duration: int) -> Sliding: - """ - Return a sliding window containing `duration` minutes. + """Return a sliding window containing `duration` minutes. - Parameters - ---------- - duration : int - The number of minutes to use in the window. - - Returns - ------- - Sliding - Sliding window with `duration` minutes, advancing every minute. + Args: + duration: The number of minutes to use in the window. """ return Sliding( duration=duration, @@ -143,18 +87,10 @@ def minutely(duration: int) -> Sliding: @staticmethod def hourly(duration: int) -> Sliding: - """ - Return a sliding window containing `duration` hours. + """Return a sliding window containing `duration` hours. - Parameters - ---------- - duration : int - The number of hours to use in the window. - - Returns - ------- - Sliding - Sliding window with `duration` hours, advancing every hour. + Args: + duration: The number of hours to use in the window. """ return Sliding( duration=duration, @@ -163,18 +99,10 @@ def hourly(duration: int) -> Sliding: @staticmethod def daily(duration: int) -> Sliding: - """ - Return a sliding window containing `duration` daily. + """Return a sliding window containing `duration` daily. - Parameters - ---------- - duration : int - The number of days to use in the window. - - Returns - ------- - Sliding - Sliding window with `duration` days, advancing every day. + Args: + duration: The number of days to use in the window. """ return Sliding( duration=duration, @@ -183,18 +111,10 @@ def daily(duration: int) -> Sliding: @staticmethod def monthly(duration: int) -> Sliding: - """ - Return a sliding window containing `duration` months. + """Return a sliding window containing `duration` months. - Parameters - ---------- - duration : int - The number of months to use in the window. - - Returns - ------- - Sliding - Sliding window with `duration` months, advancing every month. + Args: + duration: The number of months to use in the window. """ return Sliding( duration=duration, @@ -203,18 +123,10 @@ def monthly(duration: int) -> Sliding: @staticmethod def yearly(duration: int) -> Sliding: - """ - Return a sliding window containing `duration` years. + """Return a sliding window containing `duration` years. - Parameters - ---------- - duration : int - The number of years to use in the window. - - Returns - ------- - Sliding - Sliding window with `duration` years, advancing every year. + Args: + duration: The number of years to use in the window. """ return Sliding( duration=duration, @@ -224,15 +136,13 @@ def yearly(duration: int) -> Sliding: @dataclass(frozen=True) class Trailing(Window): - """ - Window the last `duration` time period. + """Window the last `duration` time period. - Parameters - ---------- - duration : timedelta - The duration of the window. + Args: + duration: The duration of the window. """ + #: The duration of the window. duration: timedelta def __post_init__(self):