Skip to content

Commit

Permalink
feat: add more aggregations/functions (#686)
Browse files Browse the repository at this point in the history
Adds coalesce, ceil, clamp, exp, powf, floor.
  • Loading branch information
kevinjnguyen authored Aug 23, 2023
1 parent 146c747 commit 555472e
Show file tree
Hide file tree
Showing 24 changed files with 408 additions and 0 deletions.
5 changes: 5 additions & 0 deletions python/docs/source/reference/timestream/arithmetic.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ See the notes on the specific functions for more information.
:toctree: ../apidocs/
Timestream.add
Timestream.ceil
Timestream.clamp
Timestream.exp
Timestream.floor
Timestream.powf
Timestream.sub
Timestream.mul
Timestream.div
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/timestream/misc.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
:toctree: ../apidocs/
Timestream.cast
Timestream.coalesce
Timestream.data_type
Timestream.else_
Timestream.filter
Expand Down
91 changes: 91 additions & 0 deletions python/pysrc/kaskada/_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,39 @@ def __radd__(self, lhs: Union[Timestream, Literal]) -> Timestream:
lhs = Timestream._literal(lhs, self._ffi_expr.session())
return lhs.add(self)

def ceil(self) -> Timestream:
"""
Create a Timestream for the number rounded up to the next largest integer.
Returns
-------
Timestream
The Timestream resulting from the numeric ceiling of this.
"""
return Timestream._call("ceil", self)

def clamp(
self,
min: Union[Timestream, Literal, None] = None,
max: Union[Timestream, Literal, None] = None,
) -> Timestream:
"""
Create a Timestream clamped between the bounds of min and max.
Parameters
----------
min : Union[Timestream, Literal, None]
The literal value to set as the lower bound
max : Union[Timestream, Literal, None]
The literal value to set as the upper bound
Returns
-------
Timestream
The Timestream resulting from the clamped bounds between min and max.
"""
return Timestream._call("clamp", self, min, max)

def sub(self, rhs: Union[Timestream, Literal]) -> Timestream:
"""
Create a Timestream substracting `rhs` from this.
Expand Down Expand Up @@ -270,6 +303,28 @@ def __rsub__(self, lhs: Union[Timestream, Literal]) -> Timestream:
lhs = Timestream._literal(lhs, self._ffi_expr.session())
return lhs.sub(self)

def exp(self) -> Timestream:
"""
Create a Timestream raising `e` to the power of this.
Returns
-------
Timestream
The Timestream resulting from `e^this`.
"""
return Timestream._call("exp", self)

def floor(self) -> Timestream:
"""
Create a Timestream of the values rounded down to the nearest integer.
Returns
-------
Timestream
The Timestream resulting from the numeric floor of this.
"""
return Timestream._call("floor", self)

def mul(self, rhs: Union[Timestream, Literal]) -> Timestream:
"""
Create a Timestream multiplying this and `rhs`.
Expand Down Expand Up @@ -300,6 +355,21 @@ def __rmul__(self, lhs: Union[Timestream, Literal]) -> Timestream:
lhs = Timestream._literal(lhs, self._ffi_expr.session())
return lhs.mul(self)

def powf(self, power: Union[Timestream, Literal]) -> Timestream:
"""
Create a Timestream raising `this` to the power of `power`.
Parameters
----------
power : Union[Timestream, Literal]
The Timestream or literal value to raise this by.
Returns
-------
Timestream: The Timestream resulting from `self ^ power`.
"""
return Timestream._call("powf", self, power)

def div(self, divisor: Union[Timestream, Literal]) -> Timestream:
"""
Create a Timestream by dividing this and `divisor`.
Expand Down Expand Up @@ -876,6 +946,27 @@ def lookup(self, key: Union[Timestream, Literal]) -> Timestream:
"""
return Timestream._call("lookup", key, self)

def coalesce(
self, arg: Union[Timestream, Literal], *args: Union[Timestream, Literal]
) -> Timestream:
"""
Create a Timestream for returning the first non-null value or null if all values are null.
Parameters
----------
arg : Union[Timestream, Literal]
The next value to be coalesced (required).
args : Union[Timestream, Literal]
Additional values to be coalesced (optional).
Returns
-------
Timestream
Timestream containing the first non-null value from that row.
If all values are null, then returns null.
"""
return Timestream._call("coalesce", self, arg, *args)

def shift_to(self, time: Union[Timestream, datetime]) -> Timestream:
"""
Create a Timestream shifting each point forward to `time`.
Expand Down
24 changes: 24 additions & 0 deletions python/pytests/ceil_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import kaskada as kd

import pytest


@pytest.fixture(scope="module")
def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m",
"1996-12-19T16:39:57,A,5",
"1996-12-19T16:39:58,B,100.0001",
"1996-12-19T16:39:59,A,2.50",
"1996-12-19T16:40:00,A,",
"1996-12-19T16:40:01,A,0.99",
"1996-12-19T16:40:02,A,1.01",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_ceil(source, golden) -> None:
m = source.col("m")
golden.jsonl(kd.record({"m": m, "ceil_m": m.ceil()}))
92 changes: 92 additions & 0 deletions python/pytests/clamp_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import kaskada as kd

import pytest


@pytest.fixture(scope="module")
def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m",
"1996-12-19T16:39:57,A,5",
"1996-12-19T16:39:58,B,100.0001",
"1996-12-19T16:39:59,A,2.50",
"1996-12-19T16:40:00,A,",
"1996-12-19T16:40:01,A,0.99",
"1996-12-19T16:40:02,A,1.01",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_clamp_min_max(source, golden) -> None:
m = source.col("m")
golden.jsonl(kd.record({"m": m, "clamped_m": m.clamp(min=5, max=100)}))


def test_clamp_min(source, golden) -> None:
m = source.col("m")
golden.jsonl(kd.record({"m": m, "clamped_min": m.clamp(min=5)}))


def test_clamp_max(source, golden) -> None:
m = source.col("m")
golden.jsonl(kd.record({"m": m, "clamped_min": m.clamp(max=100)}))


@pytest.fixture(scope="module")
def banking_source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,current_balance,min_balance,max_balance",
"1996-12-19T16:39:57,A,5.00,6.00,6.01",
"1996-12-19T16:39:58,B,6.00,7.00,7.01",
"1996-12-19T16:39:59,A,7.00,8.00,8.01",
"1996-12-19T16:40:00,A,8.00,9.00,9.01",
"1996-12-19T16:40:01,A,9.00,10.00,10.01",
"1996-12-19T16:40:02,A,10.00,11.00,11.01",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_clamp_banking_min_max(banking_source, golden) -> None:
current_balance = banking_source.col("current_balance")
min_balance = banking_source.col("min_balance")
max_balance = banking_source.col("max_balance")
golden.jsonl(
kd.record(
{
"current_balance": current_balance,
"clamped_balance": current_balance.clamp(
min=min_balance, max=max_balance
),
}
)
)


def test_clamp_banking_min(banking_source, golden) -> None:
current_balance = banking_source.col("current_balance")
min_balance = banking_source.col("min_balance")
golden.jsonl(
kd.record(
{
"current_balance": current_balance,
"clamped_balance": current_balance.clamp(min=min_balance),
}
)
)


def test_clamp_banking_max(banking_source, golden) -> None:
current_balance = banking_source.col("current_balance")
max_balance = banking_source.col("max_balance")
golden.jsonl(
kd.record(
{
"current_balance": current_balance,
"clamped_balance": current_balance.clamp(max=max_balance),
}
)
)
32 changes: 32 additions & 0 deletions python/pytests/coalesce_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import kaskada as kd

import pytest


@pytest.fixture(scope="module")
def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m,n,o",
"1996-12-19T16:39:57,A,5,10,15",
"1996-12-19T16:39:58,B,24,3,15",
"1996-12-19T16:39:59,A,17,6,15",
"1996-12-19T16:40:00,A,,9,15",
"1996-12-19T16:40:01,A,12,,15",
"1996-12-19T16:40:02,A,,,15",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_coalesce(source, golden) -> None:
m = source.col("m")
n = source.col("n")
golden.jsonl(kd.record({"m": m, "n": n, "coalesced_val": m.coalesce(n)}))


def test_coalesce_three(source, golden) -> None:
m = source.col("m")
n = source.col("n")
o = source.col("o")
golden.jsonl(kd.record({"m": m, "n": n, "o": o, "coalesced_val": m.coalesce(n, o)}))
24 changes: 24 additions & 0 deletions python/pytests/exp_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import kaskada as kd

import pytest


@pytest.fixture(scope="module")
def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m",
"1996-12-19T16:39:57,A,1",
"1996-12-19T16:39:58,B,1",
"1996-12-19T16:39:59,A,2",
"1996-12-19T16:40:00,A,3",
"1996-12-19T16:40:01,A,4",
"1996-12-19T16:40:02,A,5",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_exp(source, golden) -> None:
m = source.col("m")
golden.jsonl(kd.record({"m": m, "exp_m": m.exp()}))
24 changes: 24 additions & 0 deletions python/pytests/floor_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import kaskada as kd

import pytest


@pytest.fixture(scope="module")
def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m",
"1996-12-19T16:39:57,A,5",
"1996-12-19T16:39:58,B,100.0001",
"1996-12-19T16:39:59,A,2.50",
"1996-12-19T16:40:00,A,",
"1996-12-19T16:40:01,A,0.99",
"1996-12-19T16:40:02,A,1.01",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_floor(source, golden) -> None:
m = source.col("m")
golden.jsonl(kd.record({"m": m, "floor_m": m.floor()}))
6 changes: 6 additions & 0 deletions python/pytests/golden/ceil_test/test_ceil.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_key":"A","m":5.0,"ceil_m":5.0}
{"_time":"1996-12-19T16:39:58.000000000","_key":"B","m":100.0001,"ceil_m":101.0}
{"_time":"1996-12-19T16:39:59.000000000","_key":"A","m":2.5,"ceil_m":3.0}
{"_time":"1996-12-19T16:40:00.000000000","_key":"A","m":null,"ceil_m":null}
{"_time":"1996-12-19T16:40:01.000000000","_key":"A","m":0.99,"ceil_m":1.0}
{"_time":"1996-12-19T16:40:02.000000000","_key":"A","m":1.01,"ceil_m":2.0}
6 changes: 6 additions & 0 deletions python/pytests/golden/ceil_test/test_ceil_unwindowed.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"ceil_m":5.0}
{"_time":"1996-12-19T16:39:58.000000000","_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":100.0001,"ceil_m":101.0}
{"_time":"1996-12-19T16:39:59.000000000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":2.5,"ceil_m":3.0}
{"_time":"1996-12-19T16:40:00.000000000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"ceil_m":null}
{"_time":"1996-12-19T16:40:01.000000000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":0.99,"ceil_m":1.0}
{"_time":"1996-12-19T16:40:02.000000000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":1.01,"ceil_m":2.0}
6 changes: 6 additions & 0 deletions python/pytests/golden/clamp_test/test_clamp.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"clamped_m":5.0}
{"_time":"1996-12-19T16:39:58.000000000","_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":100.0001,"clamped_m":100.0}
{"_time":"1996-12-19T16:39:59.000000000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":2.5,"clamped_m":5.0}
{"_time":"1996-12-19T16:40:00.000000000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"clamped_m":null}
{"_time":"1996-12-19T16:40:01.000000000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":0.99,"clamped_m":5.0}
{"_time":"1996-12-19T16:40:02.000000000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":1.01,"clamped_m":5.0}
6 changes: 6 additions & 0 deletions python/pytests/golden/clamp_test/test_clamp_banking_max.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_key":"A","current_balance":5.0,"clamped_balance":5.0}
{"_time":"1996-12-19T16:39:58.000000000","_key":"B","current_balance":6.0,"clamped_balance":6.0}
{"_time":"1996-12-19T16:39:59.000000000","_key":"A","current_balance":7.0,"clamped_balance":7.0}
{"_time":"1996-12-19T16:40:00.000000000","_key":"A","current_balance":8.0,"clamped_balance":8.0}
{"_time":"1996-12-19T16:40:01.000000000","_key":"A","current_balance":9.0,"clamped_balance":9.0}
{"_time":"1996-12-19T16:40:02.000000000","_key":"A","current_balance":10.0,"clamped_balance":10.0}
6 changes: 6 additions & 0 deletions python/pytests/golden/clamp_test/test_clamp_banking_min.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_key":"A","current_balance":5.0,"clamped_balance":6.0}
{"_time":"1996-12-19T16:39:58.000000000","_key":"B","current_balance":6.0,"clamped_balance":7.0}
{"_time":"1996-12-19T16:39:59.000000000","_key":"A","current_balance":7.0,"clamped_balance":8.0}
{"_time":"1996-12-19T16:40:00.000000000","_key":"A","current_balance":8.0,"clamped_balance":9.0}
{"_time":"1996-12-19T16:40:01.000000000","_key":"A","current_balance":9.0,"clamped_balance":10.0}
{"_time":"1996-12-19T16:40:02.000000000","_key":"A","current_balance":10.0,"clamped_balance":11.0}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_key":"A","current_balance":5.0,"clamped_balance":6.0}
{"_time":"1996-12-19T16:39:58.000000000","_key":"B","current_balance":6.0,"clamped_balance":7.0}
{"_time":"1996-12-19T16:39:59.000000000","_key":"A","current_balance":7.0,"clamped_balance":8.0}
{"_time":"1996-12-19T16:40:00.000000000","_key":"A","current_balance":8.0,"clamped_balance":9.0}
{"_time":"1996-12-19T16:40:01.000000000","_key":"A","current_balance":9.0,"clamped_balance":10.0}
{"_time":"1996-12-19T16:40:02.000000000","_key":"A","current_balance":10.0,"clamped_balance":11.0}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","current_balance":5.0,"clamped_balance":6.0}
{"_time":"1996-12-19T16:39:58.000000000","_subsort":1,"_key_hash":2867199309159137213,"_key":"B","current_balance":6.0,"clamped_balance":7.0}
{"_time":"1996-12-19T16:39:59.000000000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","current_balance":7.0,"clamped_balance":8.0}
{"_time":"1996-12-19T16:40:00.000000000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","current_balance":8.0,"clamped_balance":9.0}
{"_time":"1996-12-19T16:40:01.000000000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","current_balance":9.0,"clamped_balance":10.0}
{"_time":"1996-12-19T16:40:02.000000000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","current_balance":10.0,"clamped_balance":11.0}
6 changes: 6 additions & 0 deletions python/pytests/golden/clamp_test/test_clamp_max.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_key":"A","m":5.0,"clamped_min":5.0}
{"_time":"1996-12-19T16:39:58.000000000","_key":"B","m":100.0001,"clamped_min":100.0}
{"_time":"1996-12-19T16:39:59.000000000","_key":"A","m":2.5,"clamped_min":2.5}
{"_time":"1996-12-19T16:40:00.000000000","_key":"A","m":null,"clamped_min":null}
{"_time":"1996-12-19T16:40:01.000000000","_key":"A","m":0.99,"clamped_min":0.99}
{"_time":"1996-12-19T16:40:02.000000000","_key":"A","m":1.01,"clamped_min":1.01}
Loading

0 comments on commit 555472e

Please sign in to comment.