Skip to content

Commit

Permalink
feat(api): add bucket method for timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
jcrist authored and gforsyth committed Oct 20, 2023
1 parent c319ed3 commit ca0f7bc
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 3 deletions.
12 changes: 12 additions & 0 deletions ibis/backends/duckdb/registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,17 @@ def _timestamp_from_unix(t, op):
raise UnsupportedOperationError(f"{unit!r} unit is not supported!")


def _timestamp_bucket(t, op):
arg = t.translate(op.arg)
interval = t.translate(op.interval)

origin = sa.literal_column("'epoch'::TIMESTAMP")

if op.offset is not None:
origin += t.translate(op.offset)
return sa.func.time_bucket(interval, arg, origin)


class struct_pack(GenericFunction):
def __init__(self, values: Mapping[str, Any], *, type: StructType) -> None:
super().__init__()
Expand Down Expand Up @@ -418,6 +429,7 @@ def _to_json_collection(t, op):
),
ops.TableColumn: _table_column,
ops.TimestampFromUNIX: _timestamp_from_unix,
ops.TimestampBucket: _timestamp_bucket,
ops.TimestampNow: fixed_arity(
# duckdb 0.6.0 changes now to be a timestamp with time zone force
# it back to the original for backwards compatibility
Expand Down
84 changes: 84 additions & 0 deletions ibis/backends/tests/test_temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -2513,3 +2513,87 @@ def test_timestamp_precision_output(con, ts, scale, unit):
def test_delta(con, start, end, unit, expected):
expr = end.delta(start, unit)
assert con.execute(expr) == expected


@pytest.mark.notimpl(
[
"bigquery",
"clickhouse",
"dask",
"datafusion",
"flink",
"impala",
"mssql",
"mysql",
"oracle",
"pandas",
"polars",
"postgres",
"pyspark",
"snowflake",
"sqlite",
"trino",
],
raises=com.OperationNotDefinedError,
)
@pytest.mark.broken(
["druid"],
raises=AttributeError,
reason="Druid tests load timestamp_col as a string currently",
)
@pytest.mark.parametrize(
"kws, pd_freq",
[
({"milliseconds": 50}, "50ms"),
({"seconds": 2}, "2s"),
({"minutes": 5}, "300s"),
({"hours": 2}, "2h"),
({"days": 2}, "2D"),
],
)
def test_timestamp_bucket(backend, kws, pd_freq):
ts = backend.functional_alltypes.timestamp_col.name("ts").execute()
res = backend.functional_alltypes.timestamp_col.bucket(**kws).name("ts").execute()
sol = ts.dt.floor(pd_freq)
backend.assert_series_equal(res, sol)


@pytest.mark.notimpl(
[
"bigquery",
"clickhouse",
"dask",
"datafusion",
"flink",
"impala",
"mssql",
"mysql",
"oracle",
"pandas",
"polars",
"postgres",
"pyspark",
"snowflake",
"sqlite",
"trino",
],
raises=com.OperationNotDefinedError,
)
@pytest.mark.broken(
["druid"],
raises=AttributeError,
reason="Druid tests load timestamp_col as a string currently",
)
@pytest.mark.notimpl(
["clickhouse", "mssql"],
reason="offset arg not supported",
raises=com.UnsupportedOperationError,
)
@pytest.mark.parametrize("offset_mins", [2, -2])
def test_timestamp_bucket_offset(backend, offset_mins):
ts = backend.functional_alltypes.timestamp_col.name("ts")
expr = ts.bucket(minutes=5, offset=ibis.interval(minutes=offset_mins)).name("ts")
res = expr.execute().astype("datetime64[ns]")
td = pd.Timedelta(minutes=offset_mins)
sol = ((ts.execute() - td).dt.floor("300s") + td).astype("datetime64[ns]")
backend.assert_series_equal(res, sol)
14 changes: 12 additions & 2 deletions ibis/expr/operations/temporal.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from __future__ import annotations

import operator
from typing import Annotated
from typing import Annotated, Union

from public import public

Expand All @@ -10,7 +10,7 @@
from ibis.common.annotations import attribute
from ibis.common.patterns import As, Attrs
from ibis.common.temporal import DateUnit, IntervalUnit, TimestampUnit, TimeUnit
from ibis.expr.operations.core import Binary, Unary, Value
from ibis.expr.operations.core import Binary, Scalar, Unary, Value
from ibis.expr.operations.logical import Between


Expand Down Expand Up @@ -51,6 +51,16 @@ class TimeTruncate(Value):
dtype = dt.time


@public
class TimestampBucket(Value):
arg: Value[dt.Timestamp]
interval: Scalar[dt.Interval]
offset: Union[Scalar[dt.Interval], None] = None

shape = rlz.shape_like("arg")
dtype = dt.timestamp


@public
class Strftime(Value):
arg: Value[dt.Temporal]
Expand Down
126 changes: 125 additions & 1 deletion ibis/expr/types/temporal.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from __future__ import annotations

import datetime
from typing import TYPE_CHECKING, Literal
from typing import TYPE_CHECKING, Literal, Any

from public import public

import ibis
import ibis.expr.datashape as ds
import ibis.expr.operations as ops
from ibis.expr.types.core import _binop
Expand Down Expand Up @@ -486,6 +487,129 @@ def truncate(
"""
return ops.TimestampTruncate(self, unit).to_expr()

@util.experimental
def bucket(
self,
interval: Any = None,
*,
years: int | None = None,
quarters: int | None = None,
months: int | None = None,
weeks: int | None = None,
days: int | None = None,
hours: int | None = None,
minutes: int | None = None,
seconds: int | None = None,
milliseconds: int | None = None,
microseconds: int | None = None,
nanoseconds: int | None = None,
offset: Any = None,
) -> TimestampValue:
"""Truncate the timestamp to buckets of a specified interval.
This is similar to `truncate`, but supports truncating to arbitrary
intervals rather than a single unit. Buckets are computed as fixed
intervals starting from the UNIX epoch. This origin may be offset by
specifying `offset`.
Parameters
----------
interval
The bucket width as an interval. Alternatively may be specified
via component keyword arguments.
offset
An interval to use to offset the start of the bucket.
Returns
-------
TimestampValue
The start of the bucket as a timestamp.
Examples
--------
>>> import ibis
>>> from ibis import _
>>> ibis.options.interactive = True
>>> t = ibis.memtable(
... [
... ("2020-04-15 08:04:00", 1),
... ("2020-04-15 08:06:00", 2),
... ("2020-04-15 08:09:00", 3),
... ("2020-04-15 08:11:00", 4),
... ],
... columns=["ts", "val"],
... ).cast({"ts": "timestamp"})
Bucket the data into 5 minute wide buckets:
>>> t.ts.bucket(minutes=5)
┏━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ TimestampBucket(ts, 5m) ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ timestamp │
├─────────────────────────┤
│ 2020-04-15 08:00:00 │
│ 2020-04-15 08:05:00 │
│ 2020-04-15 08:05:00 │
│ 2020-04-15 08:10:00 │
└─────────────────────────┘
Bucket the data into 5 minute wide buckets, offset by 2 minutes:
>>> t.ts.bucket(minutes=5, offset=ibis.interval(minutes=2))
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ TimestampBucket(ts, 5m, 2m) ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ timestamp │
├─────────────────────────────┤
│ 2020-04-15 08:02:00 │
│ 2020-04-15 08:02:00 │
│ 2020-04-15 08:07:00 │
│ 2020-04-15 08:07:00 │
└─────────────────────────────┘
One common use of timestamp bucketing is computing statistics per
bucket. Here we compute the mean of `val` across 5 minute intervals:
>>> mean_by_bucket = (
... t.group_by(t.ts.bucket(minutes=5).name("bucket"))
... .agg(mean=_.val.mean())
... .order_by("bucket")
... )
>>> mean_by_bucket
┏━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━┓
┃ bucket ┃ mean ┃
┡━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━┩
│ timestamp │ float64 │
├─────────────────────┼─────────┤
│ 2020-04-15 08:00:00 │ 1.0 │
│ 2020-04-15 08:05:00 │ 2.5 │
│ 2020-04-15 08:10:00 │ 4.0 │
└─────────────────────┴─────────┘
"""

components = {
"years": years,
"quarters": quarters,
"months": months,
"weeks": weeks,
"days": days,
"hours": hours,
"minutes": minutes,
"seconds": seconds,
"milliseconds": milliseconds,
"microseconds": microseconds,
"nanoseconds": nanoseconds,
}
has_components = any(v is not None for v in components.values())
if (interval is not None) == has_components:
raise ValueError(
"Must specify either interval value or components, but not both"
)
if has_components:
interval = ibis.interval(**components)
return ops.TimestampBucket(self, interval, offset).to_expr()

def date(self) -> DateValue:
"""Return the date component of the expression.
Expand Down
33 changes: 33 additions & 0 deletions ibis/tests/expr/test_temporal.py
Original file line number Diff line number Diff line change
Expand Up @@ -889,3 +889,36 @@ def test_timestamp_expression():
expr = ibis.timestamp(t2.s, timezone="UTC")
assert deferred.resolve(t2).equals(expr)
assert repr(deferred) == "timestamp(_.s, timezone='UTC')"


def test_timestamp_bucket():
ts = ibis.table({"ts": "timestamp"}).ts

components = [
"nanoseconds",
"microseconds",
"milliseconds",
"seconds",
"minutes",
"hours",
"days",
"weeks",
"months",
"quarters",
"years",
]
for component in components:
kws = {component: 2}
expr1 = ts.bucket(**kws)
expr2 = ts.bucket(ibis.interval(**kws))
assert expr1.equals(expr2)

with pytest.raises(
ValueError, match="Must specify either interval value or components"
):
ts.bucket(ibis.interval(seconds=1), minutes=2)

with pytest.raises(
ValueError, match="Must specify either interval value or components"
):
ts.bucket()

0 comments on commit ca0f7bc

Please sign in to comment.