diff --git a/ibis/backends/duckdb/registry.py b/ibis/backends/duckdb/registry.py index 17c15b2c566f..26d0a12ec2f0 100644 --- a/ibis/backends/duckdb/registry.py +++ b/ibis/backends/duckdb/registry.py @@ -87,6 +87,17 @@ def _timestamp_from_unix(t, op): raise UnsupportedOperationError(f"{unit!r} unit is not supported!") +def _timestamp_bucket(t, op): + arg = t.translate(op.arg) + interval = t.translate(op.interval) + + origin = sa.literal_column("'epoch'::TIMESTAMP") + + if op.offset is not None: + origin += t.translate(op.offset) + return sa.func.time_bucket(interval, arg, origin) + + class struct_pack(GenericFunction): def __init__(self, values: Mapping[str, Any], *, type: StructType) -> None: super().__init__() @@ -418,6 +429,7 @@ def _to_json_collection(t, op): ), ops.TableColumn: _table_column, ops.TimestampFromUNIX: _timestamp_from_unix, + ops.TimestampBucket: _timestamp_bucket, ops.TimestampNow: fixed_arity( # duckdb 0.6.0 changes now to be a timestamp with time zone force # it back to the original for backwards compatibility diff --git a/ibis/backends/tests/test_temporal.py b/ibis/backends/tests/test_temporal.py index 75db0e35c6b5..cb719c22400a 100644 --- a/ibis/backends/tests/test_temporal.py +++ b/ibis/backends/tests/test_temporal.py @@ -2513,3 +2513,87 @@ def test_timestamp_precision_output(con, ts, scale, unit): def test_delta(con, start, end, unit, expected): expr = end.delta(start, unit) assert con.execute(expr) == expected + + +@pytest.mark.notimpl( + [ + "bigquery", + "clickhouse", + "dask", + "datafusion", + "flink", + "impala", + "mssql", + "mysql", + "oracle", + "pandas", + "polars", + "postgres", + "pyspark", + "snowflake", + "sqlite", + "trino", + ], + raises=com.OperationNotDefinedError, +) +@pytest.mark.broken( + ["druid"], + raises=AttributeError, + reason="Druid tests load timestamp_col as a string currently", +) +@pytest.mark.parametrize( + "kws, pd_freq", + [ + ({"milliseconds": 50}, "50ms"), + ({"seconds": 2}, "2s"), + ({"minutes": 5}, "300s"), + ({"hours": 2}, "2h"), + ({"days": 2}, "2D"), + ], +) +def test_timestamp_bucket(backend, kws, pd_freq): + ts = backend.functional_alltypes.timestamp_col.name("ts").execute() + res = backend.functional_alltypes.timestamp_col.bucket(**kws).name("ts").execute() + sol = ts.dt.floor(pd_freq) + backend.assert_series_equal(res, sol) + + +@pytest.mark.notimpl( + [ + "bigquery", + "clickhouse", + "dask", + "datafusion", + "flink", + "impala", + "mssql", + "mysql", + "oracle", + "pandas", + "polars", + "postgres", + "pyspark", + "snowflake", + "sqlite", + "trino", + ], + raises=com.OperationNotDefinedError, +) +@pytest.mark.broken( + ["druid"], + raises=AttributeError, + reason="Druid tests load timestamp_col as a string currently", +) +@pytest.mark.notimpl( + ["clickhouse", "mssql"], + reason="offset arg not supported", + raises=com.UnsupportedOperationError, +) +@pytest.mark.parametrize("offset_mins", [2, -2]) +def test_timestamp_bucket_offset(backend, offset_mins): + ts = backend.functional_alltypes.timestamp_col.name("ts") + expr = ts.bucket(minutes=5, offset=ibis.interval(minutes=offset_mins)).name("ts") + res = expr.execute().astype("datetime64[ns]") + td = pd.Timedelta(minutes=offset_mins) + sol = ((ts.execute() - td).dt.floor("300s") + td).astype("datetime64[ns]") + backend.assert_series_equal(res, sol) diff --git a/ibis/expr/operations/temporal.py b/ibis/expr/operations/temporal.py index 8a5bf0827875..ae4d65960b71 100644 --- a/ibis/expr/operations/temporal.py +++ b/ibis/expr/operations/temporal.py @@ -1,7 +1,7 @@ from __future__ import annotations import operator -from typing import Annotated +from typing import Annotated, Union from public import public @@ -10,7 +10,7 @@ from ibis.common.annotations import attribute from ibis.common.patterns import As, Attrs from ibis.common.temporal import DateUnit, IntervalUnit, TimestampUnit, TimeUnit -from ibis.expr.operations.core import Binary, Unary, Value +from ibis.expr.operations.core import Binary, Scalar, Unary, Value from ibis.expr.operations.logical import Between @@ -51,6 +51,16 @@ class TimeTruncate(Value): dtype = dt.time +@public +class TimestampBucket(Value): + arg: Value[dt.Timestamp] + interval: Scalar[dt.Interval] + offset: Union[Scalar[dt.Interval], None] = None + + shape = rlz.shape_like("arg") + dtype = dt.timestamp + + @public class Strftime(Value): arg: Value[dt.Temporal] diff --git a/ibis/expr/types/temporal.py b/ibis/expr/types/temporal.py index 27a460103ac6..f5cbc84b4ed9 100644 --- a/ibis/expr/types/temporal.py +++ b/ibis/expr/types/temporal.py @@ -1,10 +1,11 @@ from __future__ import annotations import datetime -from typing import TYPE_CHECKING, Literal +from typing import TYPE_CHECKING, Literal, Any from public import public +import ibis import ibis.expr.datashape as ds import ibis.expr.operations as ops from ibis.expr.types.core import _binop @@ -486,6 +487,129 @@ def truncate( """ return ops.TimestampTruncate(self, unit).to_expr() + @util.experimental + def bucket( + self, + interval: Any = None, + *, + years: int | None = None, + quarters: int | None = None, + months: int | None = None, + weeks: int | None = None, + days: int | None = None, + hours: int | None = None, + minutes: int | None = None, + seconds: int | None = None, + milliseconds: int | None = None, + microseconds: int | None = None, + nanoseconds: int | None = None, + offset: Any = None, + ) -> TimestampValue: + """Truncate the timestamp to buckets of a specified interval. + + This is similar to `truncate`, but supports truncating to arbitrary + intervals rather than a single unit. Buckets are computed as fixed + intervals starting from the UNIX epoch. This origin may be offset by + specifying `offset`. + + Parameters + ---------- + interval + The bucket width as an interval. Alternatively may be specified + via component keyword arguments. + offset + An interval to use to offset the start of the bucket. + + Returns + ------- + TimestampValue + The start of the bucket as a timestamp. + + Examples + -------- + >>> import ibis + >>> from ibis import _ + >>> ibis.options.interactive = True + >>> t = ibis.memtable( + ... [ + ... ("2020-04-15 08:04:00", 1), + ... ("2020-04-15 08:06:00", 2), + ... ("2020-04-15 08:09:00", 3), + ... ("2020-04-15 08:11:00", 4), + ... ], + ... columns=["ts", "val"], + ... ).cast({"ts": "timestamp"}) + + Bucket the data into 5 minute wide buckets: + + >>> t.ts.bucket(minutes=5) + ┏━━━━━━━━━━━━━━━━━━━━━━━━━┓ + ┃ TimestampBucket(ts, 5m) ┃ + ┡━━━━━━━━━━━━━━━━━━━━━━━━━┩ + │ timestamp │ + ├─────────────────────────┤ + │ 2020-04-15 08:00:00 │ + │ 2020-04-15 08:05:00 │ + │ 2020-04-15 08:05:00 │ + │ 2020-04-15 08:10:00 │ + └─────────────────────────┘ + + Bucket the data into 5 minute wide buckets, offset by 2 minutes: + + >>> t.ts.bucket(minutes=5, offset=ibis.interval(minutes=2)) + ┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ + ┃ TimestampBucket(ts, 5m, 2m) ┃ + ┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩ + │ timestamp │ + ├─────────────────────────────┤ + │ 2020-04-15 08:02:00 │ + │ 2020-04-15 08:02:00 │ + │ 2020-04-15 08:07:00 │ + │ 2020-04-15 08:07:00 │ + └─────────────────────────────┘ + + One common use of timestamp bucketing is computing statistics per + bucket. Here we compute the mean of `val` across 5 minute intervals: + + >>> mean_by_bucket = ( + ... t.group_by(t.ts.bucket(minutes=5).name("bucket")) + ... .agg(mean=_.val.mean()) + ... .order_by("bucket") + ... ) + >>> mean_by_bucket + ┏━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┓ + ┃ bucket ┃ mean ┃ + ┡━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━┩ + │ timestamp │ float64 │ + ├─────────────────────┼─────────┤ + │ 2020-04-15 08:00:00 │ 1.0 │ + │ 2020-04-15 08:05:00 │ 2.5 │ + │ 2020-04-15 08:10:00 │ 4.0 │ + └─────────────────────┴─────────┘ + """ + + components = { + "years": years, + "quarters": quarters, + "months": months, + "weeks": weeks, + "days": days, + "hours": hours, + "minutes": minutes, + "seconds": seconds, + "milliseconds": milliseconds, + "microseconds": microseconds, + "nanoseconds": nanoseconds, + } + has_components = any(v is not None for v in components.values()) + if (interval is not None) == has_components: + raise ValueError( + "Must specify either interval value or components, but not both" + ) + if has_components: + interval = ibis.interval(**components) + return ops.TimestampBucket(self, interval, offset).to_expr() + def date(self) -> DateValue: """Return the date component of the expression. diff --git a/ibis/tests/expr/test_temporal.py b/ibis/tests/expr/test_temporal.py index 7ebb21af10ef..301239b59adb 100644 --- a/ibis/tests/expr/test_temporal.py +++ b/ibis/tests/expr/test_temporal.py @@ -889,3 +889,36 @@ def test_timestamp_expression(): expr = ibis.timestamp(t2.s, timezone="UTC") assert deferred.resolve(t2).equals(expr) assert repr(deferred) == "timestamp(_.s, timezone='UTC')" + + +def test_timestamp_bucket(): + ts = ibis.table({"ts": "timestamp"}).ts + + components = [ + "nanoseconds", + "microseconds", + "milliseconds", + "seconds", + "minutes", + "hours", + "days", + "weeks", + "months", + "quarters", + "years", + ] + for component in components: + kws = {component: 2} + expr1 = ts.bucket(**kws) + expr2 = ts.bucket(ibis.interval(**kws)) + assert expr1.equals(expr2) + + with pytest.raises( + ValueError, match="Must specify either interval value or components" + ): + ts.bucket(ibis.interval(seconds=1), minutes=2) + + with pytest.raises( + ValueError, match="Must specify either interval value or components" + ): + ts.bucket()