Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add more aggregations/functions #686

Merged
merged 22 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions python/docs/source/reference/timestream/arithmetic.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ See the notes on the specific functions for more information.
:toctree: ../apidocs/

Timestream.add
Timestream.ceil
Timestream.clamp
Timestream.exp
Timestream.floor
Timestream.powf
Timestream.sub
Timestream.mul
Timestream.div
Expand Down
1 change: 1 addition & 0 deletions python/docs/source/reference/timestream/misc.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
:toctree: ../apidocs/

Timestream.cast
Timestream.coalesce
Timestream.data_type
Timestream.else_
Timestream.filter
Expand Down
91 changes: 91 additions & 0 deletions python/pysrc/kaskada/_timestream.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,39 @@ def __radd__(self, lhs: Union[Timestream, Literal]) -> Timestream:
lhs = Timestream._literal(lhs, self._ffi_expr.session())
return lhs.add(self)

def ceil(self) -> Timestream:
"""
Create a Timestream for the number rounded up to the next largest integer.

Returns
-------
Timestream
The Timestream resulting from the numeric ceiling of this.
"""
return Timestream._call("ceil", self)

def clamp(
self,
min: Union[Timestream, Literal, None] = None,
max: Union[Timestream, Literal, None] = None,
) -> Timestream:
"""
Create a Timestream clamped between the bounds of min and max.

Parameters
----------
min : Union[Timestream, Literal, None]
The literal value to set as the lower bound
max : Union[Timestream, Literal, None]
The literal value to set as the upper bound

Returns
-------
Timestream
The Timestream resulting from the clamped bounds between min and max.
"""
return Timestream._call("clamp", self, min, max)

def sub(self, rhs: Union[Timestream, Literal]) -> Timestream:
"""
Create a Timestream substracting `rhs` from this.
Expand Down Expand Up @@ -270,6 +303,28 @@ def __rsub__(self, lhs: Union[Timestream, Literal]) -> Timestream:
lhs = Timestream._literal(lhs, self._ffi_expr.session())
return lhs.sub(self)

def exp(self) -> Timestream:
"""
Create a Timestream raising `e` to the power of this.

Returns
-------
Timestream
The Timestream resulting from `e^this`.
"""
return Timestream._call("exp", self)

def floor(self) -> Timestream:
"""
Create a Timestream of the values rounded down to the nearest integer.

Returns
-------
Timestream
The Timestream resulting from the numeric floor of this.
"""
return Timestream._call("floor", self)

def mul(self, rhs: Union[Timestream, Literal]) -> Timestream:
"""
Create a Timestream multiplying this and `rhs`.
Expand Down Expand Up @@ -300,6 +355,21 @@ def __rmul__(self, lhs: Union[Timestream, Literal]) -> Timestream:
lhs = Timestream._literal(lhs, self._ffi_expr.session())
return lhs.mul(self)

def powf(self, power: Union[Timestream, Literal]) -> Timestream:
"""
Create a Timestream raising `this` to the power of `power`.

Parameters
----------
power : Union[Timestream, Literal]
The Timestream or literal value to raise this by.

Returns
-------
Timestream: The Timestream resulting from `self ^ power`.
"""
return Timestream._call("powf", self, power)

def div(self, divisor: Union[Timestream, Literal]) -> Timestream:
"""
Create a Timestream by dividing this and `divisor`.
Expand Down Expand Up @@ -876,6 +946,27 @@ def lookup(self, key: Union[Timestream, Literal]) -> Timestream:
"""
return Timestream._call("lookup", key, self)

def coalesce(
self, arg: Union[Timestream, Literal], *args: Union[Timestream, Literal]
) -> Timestream:
"""
Create a Timestream for returning the first non-null value or null if all values are null.

Parameters
----------
arg : Union[Timestream, Literal]
The next value to be coalesced (required).
kevinjnguyen marked this conversation as resolved.
Show resolved Hide resolved
args : Union[Timestream, Literal]
Additional values to be coalesced (optional).

Returns
-------
Timestream
Timestream containing the first non-null value from that row.
If all values are null, then returns null.
"""
return Timestream._call("coalesce", self, arg, *args)

def shift_to(self, time: Union[Timestream, datetime]) -> Timestream:
"""
Create a Timestream shifting each point forward to `time`.
Expand Down
24 changes: 24 additions & 0 deletions python/pytests/ceil_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import kaskada as kd

import pytest


@pytest.fixture(scope="module")
def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m",
"1996-12-19T16:39:57,A,5",
"1996-12-19T16:39:58,B,100.0001",
"1996-12-19T16:39:59,A,2.50",
"1996-12-19T16:40:00,A,",
"1996-12-19T16:40:01,A,0.99",
"1996-12-19T16:40:02,A,1.01",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_ceil(source, golden) -> None:
m = source.col("m")
golden.jsonl(kd.record({"m": m, "ceil_m": m.ceil()}))
92 changes: 92 additions & 0 deletions python/pytests/clamp_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import kaskada as kd

import pytest


@pytest.fixture(scope="module")
def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m",
"1996-12-19T16:39:57,A,5",
"1996-12-19T16:39:58,B,100.0001",
"1996-12-19T16:39:59,A,2.50",
"1996-12-19T16:40:00,A,",
"1996-12-19T16:40:01,A,0.99",
"1996-12-19T16:40:02,A,1.01",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_clamp_min_max(source, golden) -> None:
m = source.col("m")
golden.jsonl(kd.record({"m": m, "clamped_m": m.clamp(min=5, max=100)}))


def test_clamp_min(source, golden) -> None:
m = source.col("m")
golden.jsonl(kd.record({"m": m, "clamped_min": m.clamp(min=5)}))


def test_clamp_max(source, golden) -> None:
m = source.col("m")
golden.jsonl(kd.record({"m": m, "clamped_min": m.clamp(max=100)}))
kevinjnguyen marked this conversation as resolved.
Show resolved Hide resolved


@pytest.fixture(scope="module")
def banking_source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,current_balance,min_balance,max_balance",
"1996-12-19T16:39:57,A,5.00,6.00,6.01",
"1996-12-19T16:39:58,B,6.00,7.00,7.01",
"1996-12-19T16:39:59,A,7.00,8.00,8.01",
"1996-12-19T16:40:00,A,8.00,9.00,9.01",
"1996-12-19T16:40:01,A,9.00,10.00,10.01",
"1996-12-19T16:40:02,A,10.00,11.00,11.01",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_clamp_banking_min_max(banking_source, golden) -> None:
current_balance = banking_source.col("current_balance")
min_balance = banking_source.col("min_balance")
max_balance = banking_source.col("max_balance")
golden.jsonl(
kd.record(
{
"current_balance": current_balance,
"clamped_balance": current_balance.clamp(
min=min_balance, max=max_balance
),
}
)
)


def test_clamp_banking_min(banking_source, golden) -> None:
current_balance = banking_source.col("current_balance")
min_balance = banking_source.col("min_balance")
golden.jsonl(
kd.record(
{
"current_balance": current_balance,
"clamped_balance": current_balance.clamp(min=min_balance),
}
)
)


def test_clamp_banking_max(banking_source, golden) -> None:
current_balance = banking_source.col("current_balance")
max_balance = banking_source.col("max_balance")
golden.jsonl(
kd.record(
{
"current_balance": current_balance,
"clamped_balance": current_balance.clamp(max=max_balance),
}
)
)
32 changes: 32 additions & 0 deletions python/pytests/coalesce_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import kaskada as kd

import pytest


@pytest.fixture(scope="module")
def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m,n,o",
"1996-12-19T16:39:57,A,5,10,15",
"1996-12-19T16:39:58,B,24,3,15",
"1996-12-19T16:39:59,A,17,6,15",
"1996-12-19T16:40:00,A,,9,15",
"1996-12-19T16:40:01,A,12,,15",
"1996-12-19T16:40:02,A,,,15",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_coalesce(source, golden) -> None:
m = source.col("m")
n = source.col("n")
golden.jsonl(kd.record({"m": m, "n": n, "coalesced_val": m.coalesce(n)}))
kevinjnguyen marked this conversation as resolved.
Show resolved Hide resolved


def test_coalesce_three(source, golden) -> None:
m = source.col("m")
n = source.col("n")
o = source.col("o")
golden.jsonl(kd.record({"m": m, "n": n, "o": o, "coalesced_val": m.coalesce(n, o)}))
24 changes: 24 additions & 0 deletions python/pytests/exp_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import kaskada as kd

import pytest


@pytest.fixture(scope="module")
def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m",
"1996-12-19T16:39:57,A,1",
"1996-12-19T16:39:58,B,1",
"1996-12-19T16:39:59,A,2",
"1996-12-19T16:40:00,A,3",
"1996-12-19T16:40:01,A,4",
"1996-12-19T16:40:02,A,5",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_exp(source, golden) -> None:
m = source.col("m")
golden.jsonl(kd.record({"m": m, "exp_m": m.exp()}))
24 changes: 24 additions & 0 deletions python/pytests/floor_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import kaskada as kd

import pytest


@pytest.fixture(scope="module")
def source() -> kd.sources.CsvString:
content = "\n".join(
[
"time,key,m",
"1996-12-19T16:39:57,A,5",
"1996-12-19T16:39:58,B,100.0001",
"1996-12-19T16:39:59,A,2.50",
"1996-12-19T16:40:00,A,",
"1996-12-19T16:40:01,A,0.99",
"1996-12-19T16:40:02,A,1.01",
]
)
return kd.sources.CsvString(content, time_column_name="time", key_column_name="key")


def test_floor(source, golden) -> None:
m = source.col("m")
golden.jsonl(kd.record({"m": m, "floor_m": m.floor()}))
6 changes: 6 additions & 0 deletions python/pytests/golden/ceil_test/test_ceil.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_key":"A","m":5.0,"ceil_m":5.0}
{"_time":"1996-12-19T16:39:58.000000000","_key":"B","m":100.0001,"ceil_m":101.0}
{"_time":"1996-12-19T16:39:59.000000000","_key":"A","m":2.5,"ceil_m":3.0}
{"_time":"1996-12-19T16:40:00.000000000","_key":"A","m":null,"ceil_m":null}
{"_time":"1996-12-19T16:40:01.000000000","_key":"A","m":0.99,"ceil_m":1.0}
{"_time":"1996-12-19T16:40:02.000000000","_key":"A","m":1.01,"ceil_m":2.0}
6 changes: 6 additions & 0 deletions python/pytests/golden/ceil_test/test_ceil_unwindowed.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"ceil_m":5.0}
{"_time":"1996-12-19T16:39:58.000000000","_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":100.0001,"ceil_m":101.0}
{"_time":"1996-12-19T16:39:59.000000000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":2.5,"ceil_m":3.0}
{"_time":"1996-12-19T16:40:00.000000000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"ceil_m":null}
{"_time":"1996-12-19T16:40:01.000000000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":0.99,"ceil_m":1.0}
{"_time":"1996-12-19T16:40:02.000000000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":1.01,"ceil_m":2.0}
6 changes: 6 additions & 0 deletions python/pytests/golden/clamp_test/test_clamp.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","m":5.0,"clamped_m":5.0}
{"_time":"1996-12-19T16:39:58.000000000","_subsort":1,"_key_hash":2867199309159137213,"_key":"B","m":100.0001,"clamped_m":100.0}
{"_time":"1996-12-19T16:39:59.000000000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","m":2.5,"clamped_m":5.0}
{"_time":"1996-12-19T16:40:00.000000000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","m":null,"clamped_m":null}
{"_time":"1996-12-19T16:40:01.000000000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","m":0.99,"clamped_m":5.0}
{"_time":"1996-12-19T16:40:02.000000000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","m":1.01,"clamped_m":5.0}
6 changes: 6 additions & 0 deletions python/pytests/golden/clamp_test/test_clamp_banking_max.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_key":"A","current_balance":5.0,"clamped_balance":5.0}
{"_time":"1996-12-19T16:39:58.000000000","_key":"B","current_balance":6.0,"clamped_balance":6.0}
{"_time":"1996-12-19T16:39:59.000000000","_key":"A","current_balance":7.0,"clamped_balance":7.0}
{"_time":"1996-12-19T16:40:00.000000000","_key":"A","current_balance":8.0,"clamped_balance":8.0}
{"_time":"1996-12-19T16:40:01.000000000","_key":"A","current_balance":9.0,"clamped_balance":9.0}
{"_time":"1996-12-19T16:40:02.000000000","_key":"A","current_balance":10.0,"clamped_balance":10.0}
6 changes: 6 additions & 0 deletions python/pytests/golden/clamp_test/test_clamp_banking_min.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_key":"A","current_balance":5.0,"clamped_balance":6.0}
{"_time":"1996-12-19T16:39:58.000000000","_key":"B","current_balance":6.0,"clamped_balance":7.0}
{"_time":"1996-12-19T16:39:59.000000000","_key":"A","current_balance":7.0,"clamped_balance":8.0}
{"_time":"1996-12-19T16:40:00.000000000","_key":"A","current_balance":8.0,"clamped_balance":9.0}
{"_time":"1996-12-19T16:40:01.000000000","_key":"A","current_balance":9.0,"clamped_balance":10.0}
{"_time":"1996-12-19T16:40:02.000000000","_key":"A","current_balance":10.0,"clamped_balance":11.0}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_key":"A","current_balance":5.0,"clamped_balance":6.0}
{"_time":"1996-12-19T16:39:58.000000000","_key":"B","current_balance":6.0,"clamped_balance":7.0}
{"_time":"1996-12-19T16:39:59.000000000","_key":"A","current_balance":7.0,"clamped_balance":8.0}
{"_time":"1996-12-19T16:40:00.000000000","_key":"A","current_balance":8.0,"clamped_balance":9.0}
{"_time":"1996-12-19T16:40:01.000000000","_key":"A","current_balance":9.0,"clamped_balance":10.0}
{"_time":"1996-12-19T16:40:02.000000000","_key":"A","current_balance":10.0,"clamped_balance":11.0}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_subsort":0,"_key_hash":12960666915911099378,"_key":"A","current_balance":5.0,"clamped_balance":6.0}
{"_time":"1996-12-19T16:39:58.000000000","_subsort":1,"_key_hash":2867199309159137213,"_key":"B","current_balance":6.0,"clamped_balance":7.0}
{"_time":"1996-12-19T16:39:59.000000000","_subsort":2,"_key_hash":12960666915911099378,"_key":"A","current_balance":7.0,"clamped_balance":8.0}
{"_time":"1996-12-19T16:40:00.000000000","_subsort":3,"_key_hash":12960666915911099378,"_key":"A","current_balance":8.0,"clamped_balance":9.0}
{"_time":"1996-12-19T16:40:01.000000000","_subsort":4,"_key_hash":12960666915911099378,"_key":"A","current_balance":9.0,"clamped_balance":10.0}
{"_time":"1996-12-19T16:40:02.000000000","_subsort":5,"_key_hash":12960666915911099378,"_key":"A","current_balance":10.0,"clamped_balance":11.0}
6 changes: 6 additions & 0 deletions python/pytests/golden/clamp_test/test_clamp_max.jsonl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{"_time":"1996-12-19T16:39:57.000000000","_key":"A","m":5.0,"clamped_min":5.0}
{"_time":"1996-12-19T16:39:58.000000000","_key":"B","m":100.0001,"clamped_min":100.0}
{"_time":"1996-12-19T16:39:59.000000000","_key":"A","m":2.5,"clamped_min":2.5}
{"_time":"1996-12-19T16:40:00.000000000","_key":"A","m":null,"clamped_min":null}
{"_time":"1996-12-19T16:40:01.000000000","_key":"A","m":0.99,"clamped_min":0.99}
{"_time":"1996-12-19T16:40:02.000000000","_key":"A","m":1.01,"clamped_min":1.01}
Loading
Loading