Skip to content

Commit

Permalink
Mark streaming mode as unstable
Browse files Browse the repository at this point in the history
  • Loading branch information
stinodego committed Jan 24, 2024
1 parent b03b5b7 commit 384ca90
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 14 deletions.
34 changes: 31 additions & 3 deletions py-polars/polars/functions/lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
deprecate_renamed_function,
issue_deprecation_warning,
)
from polars.utils.unstable import unstable
from polars.utils.unstable import issue_unstable_warning, unstable

with contextlib.suppress(ImportError): # Module not available when building docs
import polars.polars as plr
Expand Down Expand Up @@ -1636,7 +1636,17 @@ def collect_all(
comm_subexpr_elim
Common subexpressions will be cached and reused.
streaming
Run parts of the query in a streaming fashion (this is in an alpha state)
Process the query in batches to handle larger-than-memory data.
If set to `False` (default), the entire query is processed in a single
batch.
.. warning::
Streaming mode is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
.. note::
Use :func:`explain` to see if Polars can process the query in streaming
mode.
Returns
-------
Expand All @@ -1650,6 +1660,10 @@ def collect_all(
comm_subplan_elim = False
comm_subexpr_elim = False

if streaming:
issue_unstable_warning("Streaming mode is considered unstable.")
comm_subplan_elim = False

prepared = []

for lf in lazy_frames:
Expand Down Expand Up @@ -1762,7 +1776,17 @@ def collect_all_async(
comm_subexpr_elim
Common subexpressions will be cached and reused.
streaming
Run parts of the query in a streaming fashion (this is in an alpha state)
Process the query in batches to handle larger-than-memory data.
If set to `False` (default), the entire query is processed in a single
batch.
.. warning::
Streaming mode is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
.. note::
Use :func:`explain` to see if Polars can process the query in streaming
mode.
See Also
--------
Expand All @@ -1788,6 +1812,10 @@ def collect_all_async(
comm_subplan_elim = False
comm_subexpr_elim = False

if streaming:
issue_unstable_warning("Streaming mode is considered unstable.")
comm_subplan_elim = False

prepared = []

for lf in lazy_frames:
Expand Down
55 changes: 44 additions & 11 deletions py-polars/polars/lazyframe/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
deprecate_saturating,
issue_deprecation_warning,
)
from polars.utils.unstable import unstable
from polars.utils.unstable import issue_unstable_warning, unstable
from polars.utils.various import (
_in_notebook,
_prepare_row_index_args,
Expand Down Expand Up @@ -1646,7 +1646,8 @@ def collect(
batch.
.. warning::
This functionality is currently in an alpha state.
Streaming mode is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
.. note::
Use :func:`explain` to see if Polars can process the query in streaming
Expand Down Expand Up @@ -1712,6 +1713,7 @@ def collect(
comm_subexpr_elim = False

if streaming:
issue_unstable_warning("Streaming mode is considered unstable.")
comm_subplan_elim = False

ldf = self._ldf.optimization_toggle(
Expand Down Expand Up @@ -1813,17 +1815,17 @@ def collect_async(
comm_subexpr_elim
Common subexpressions will be cached and reused.
streaming
Run parts of the query in a streaming fashion (this is in an alpha state)
Process the query in batches to handle larger-than-memory data.
If set to `False` (default), the entire query is processed in a single
batch.
Notes
-----
In case of error `set_exception` is used on
`asyncio.Future`/`gevent.event.AsyncResult` and will be reraised by them.
.. warning::
Streaming mode is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
See Also
--------
polars.collect_all : Collect multiple LazyFrames at the same time.
polars.collect_all_async: Collect multiple LazyFrames at the same time lazily.
.. note::
Use :func:`explain` to see if Polars can process the query in streaming
mode.
Returns
-------
Expand All @@ -1832,6 +1834,16 @@ def collect_async(
If `gevent=True` then returns wrapper that has
`.get(block=True, timeout=None)` method.
See Also
--------
polars.collect_all : Collect multiple LazyFrames at the same time.
polars.collect_all_async: Collect multiple LazyFrames at the same time lazily.
Notes
-----
In case of error `set_exception` is used on
`asyncio.Future`/`gevent.event.AsyncResult` and will be reraised by them.
Examples
--------
>>> import asyncio
Expand Down Expand Up @@ -1868,6 +1880,7 @@ def collect_async(
comm_subexpr_elim = False

if streaming:
issue_unstable_warning("Streaming mode is considered unstable.")
comm_subplan_elim = False

ldf = self._ldf.optimization_toggle(
Expand All @@ -1886,6 +1899,7 @@ def collect_async(
ldf.collect_with_callback(result._callback) # type: ignore[attr-defined]
return result # type: ignore[return-value]

@unstable()
def sink_parquet(
self,
path: str | Path,
Expand All @@ -1906,6 +1920,10 @@ def sink_parquet(
"""
Evaluate the query in streaming mode and write to a Parquet file.
.. warning::
Streaming mode is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
This allows streaming results that are larger than RAM to be written to disk.
Parameters
Expand Down Expand Up @@ -1978,6 +1996,7 @@ def sink_parquet(
maintain_order=maintain_order,
)

@unstable()
def sink_ipc(
self,
path: str | Path,
Expand All @@ -1994,6 +2013,10 @@ def sink_ipc(
"""
Evaluate the query in streaming mode and write to an IPC file.
.. warning::
Streaming mode is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
This allows streaming results that are larger than RAM to be written to disk.
Parameters
Expand Down Expand Up @@ -2045,6 +2068,7 @@ def sink_ipc(

@deprecate_renamed_parameter("quote", "quote_char", version="0.19.8")
@deprecate_renamed_parameter("has_header", "include_header", version="0.19.13")
@unstable()
def sink_csv(
self,
path: str | Path,
Expand Down Expand Up @@ -2072,6 +2096,10 @@ def sink_csv(
"""
Evaluate the query in streaming mode and write to a CSV file.
.. warning::
Streaming mode is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
This allows streaming results that are larger than RAM to be written to disk.
Parameters
Expand Down Expand Up @@ -2182,6 +2210,7 @@ def sink_csv(
maintain_order=maintain_order,
)

@unstable()
def sink_ndjson(
self,
path: str | Path,
Expand All @@ -2197,6 +2226,10 @@ def sink_ndjson(
"""
Evaluate the query in streaming mode and write to an NDJSON file.
.. warning::
Streaming mode is considered **unstable**. It may be changed
at any point without it being considered a breaking change.
This allows streaming results that are larger than RAM to be written to disk.
Parameters
Expand Down

0 comments on commit 384ca90

Please sign in to comment.