Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add additional opts for pct, media, unique, ... in the Python client #3946

Merged
merged 5 commits into from
Aug 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 80 additions & 13 deletions py/client/pydeephaven/agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import List, Union
from typing import List, Union, Any

import numpy as np

from pydeephaven._utils import to_list
from pydeephaven.proto import table_pb2
Expand All @@ -15,6 +17,7 @@
_GrpcAggregationCount = _GrpcAggregation.AggregationCount
_GrpcAggregationPartition = _GrpcAggregation.AggregationPartition
_GrpcAggSpec = table_pb2.AggSpec
_GrpcNullValue = table_pb2.NullValue


class Aggregation(ABC):
Expand Down Expand Up @@ -138,17 +141,19 @@ def partition(col: str, include_by_columns: bool = True) -> Aggregation:
return _AggregationPartition(col=col, include_by_columns=include_by_columns)


def count_distinct(cols: Union[str, List[str]] = None) -> Aggregation:
"""Creates a Count Distinct aggregation.
def count_distinct(cols: Union[str, List[str]] = None, count_nulls: bool = False) -> Aggregation:
"""Creates a Count Distinct aggregation which computes the count of distinct values within an aggregation group for
each of the given columns.

Args:
cols (Union[str, List[str]]): the column(s) to aggregate on, can be renaming expressions, i.e. "new_col = col";
default is None, only valid when used in Table agg_all_by operation
count_nulls (bool): whether null values should be counted, default is False

Returns:
an aggregation
"""
agg_spec = _GrpcAggSpec(count_distinct=_GrpcAggSpec.AggSpecCountDistinct(False))
agg_spec = _GrpcAggSpec(count_distinct=_GrpcAggSpec.AggSpecCountDistinct(count_nulls=count_nulls))
return _AggregationColumns(agg_spec=agg_spec, cols=to_list(cols))


Expand Down Expand Up @@ -224,33 +229,41 @@ def max_(cols: Union[str, List[str]] = None) -> Aggregation:
return _AggregationColumns(agg_spec=agg_spec, cols=to_list(cols))


def median(cols: Union[str, List[str]] = None) -> Aggregation:
"""Creates a Median aggregation.
def median(cols: Union[str, List[str]] = None, average_evenly_divided: bool = True) -> Aggregation:
"""Creates a Median aggregation which computes the median value within an aggregation group for each of the
given columns.

Args:
cols (Union[str, List[str]]): the column(s) to aggregate on, can be renaming expressions, i.e. "new_col = col";
default is None, only valid when used in Table agg_all_by operation
average_evenly_divided (bool): when the group size is an even number, whether to average the two middle values
for the output value. When set to True, average the two middle values. When set to False, use the smaller
value. The default is True. This flag is only valid for numeric types.

Returns:
an aggregation
"""
agg_spec = _GrpcAggSpec(median=_GrpcAggSpec.AggSpecMedian(average_evenly_divided=True))
agg_spec = _GrpcAggSpec(median=_GrpcAggSpec.AggSpecMedian(average_evenly_divided=average_evenly_divided))
return _AggregationColumns(agg_spec=agg_spec, cols=to_list(cols))


def pct(percentile: float, cols: Union[str, List[str]] = None) -> Aggregation:
"""Creates a Percentile aggregation.
def pct(percentile: float, cols: Union[str, List[str]] = None, average_evenly_divided: bool = False) -> Aggregation:
"""Creates a Percentile aggregation which computes the percentile value within an aggregation group for each of
the given columns.

Args:
percentile (float): the percentile used for calculation
cols (Union[str, List[str]]): the column(s) to aggregate on, can be renaming expressions, i.e. "new_col = col";
default is None, only valid when used in Table agg_all_by operation
average_evenly_divided (bool): when the percentile splits the group into two halves, whether to average the two
middle values for the output value. When set to True, average the two middle values. When set to False, use
the smaller value. The default is False. This flag is only valid for numeric types.

Returns:
an aggregation
"""
agg_spec = _GrpcAggSpec(percentile=_GrpcAggSpec.AggSpecPercentile(percentile=percentile,
average_evenly_divided=False))
average_evenly_divided=average_evenly_divided))
return _AggregationColumns(agg_spec=agg_spec, cols=to_list(cols))


Expand Down Expand Up @@ -300,17 +313,55 @@ def std(cols: Union[str, List[str]] = None) -> Aggregation:
return _AggregationColumns(agg_spec=agg_spec, cols=to_list(cols))


def unique(cols: Union[str, List[str]] = None) -> Aggregation:
"""Creates a Unique aggregation.
def unique(cols: Union[str, List[str]] = None, include_nulls: bool = False,
non_unique_sentinel: Union[np.number, str, bool] = None) -> Aggregation:
"""Creates a Unique aggregation which computes the single unique value within an aggregation group for each of
the given columns. If all values in a column are null, or if there is more than one distinct value in a column, the
result is the specified non_unique_sentinel value (defaults to null).

Args:
cols (Union[str, List[str]]): the column(s) to aggregate on, can be renaming expressions, i.e. "new_col = col";
default is None, only valid when used in Table agg_all_by operation
include_nulls (bool): whether null is treated as a value for the purpose of determining if the values in the
aggregation group are unique, default is False.
non_unique_sentinel (Union[np.number, str, bool]): the non-null sentinel value when no unique value exists,
default is None. Must be a non-None value when include_nulls is True. When passed in as a numpy scalar
number value, it must be of one of these types: np.int8, np.int16, np.uint16, np.int32, np.int64(int),
np.float32, np.float64(float). Please note that np.uint16 is interpreted as a Deephaven/Java char.

Raises:
TypeError

Returns:
an aggregation
"""
agg_spec = _GrpcAggSpec(unique=_GrpcAggSpec.AggSpecUnique(include_nulls=False, non_unique_sentinel=None))
if non_unique_sentinel is not None:
if isinstance(non_unique_sentinel, np.byte):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
agg_spec_non_unique_sentinel = _GrpcAggSpec.AggSpecNonUniqueSentinel(byte_value=non_unique_sentinel)
elif isinstance(non_unique_sentinel, np.short):
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
agg_spec_non_unique_sentinel = _GrpcAggSpec.AggSpecNonUniqueSentinel(short_value=non_unique_sentinel)
elif isinstance(non_unique_sentinel, np.int32):
agg_spec_non_unique_sentinel = _GrpcAggSpec.AggSpecNonUniqueSentinel(int_value=non_unique_sentinel)
elif isinstance(non_unique_sentinel, (np.int64, int)):
agg_spec_non_unique_sentinel = _GrpcAggSpec.AggSpecNonUniqueSentinel(long_value=non_unique_sentinel)
elif isinstance(non_unique_sentinel, np.float32):
agg_spec_non_unique_sentinel = _GrpcAggSpec.AggSpecNonUniqueSentinel(float_value=non_unique_sentinel)
elif isinstance(non_unique_sentinel, (np.float64, float)):
agg_spec_non_unique_sentinel = _GrpcAggSpec.AggSpecNonUniqueSentinel(double_value=non_unique_sentinel)
elif isinstance(non_unique_sentinel, np.uint16):
agg_spec_non_unique_sentinel = _GrpcAggSpec.AggSpecNonUniqueSentinel(char_value=non_unique_sentinel)
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
elif isinstance(non_unique_sentinel, str):
agg_spec_non_unique_sentinel = _GrpcAggSpec.AggSpecNonUniqueSentinel(string_value=non_unique_sentinel)
elif isinstance(non_unique_sentinel, (bool, np.bool_)):
agg_spec_non_unique_sentinel = _GrpcAggSpec.AggSpecNonUniqueSentinel(bool_value=non_unique_sentinel)
else:
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
raise TypeError(f"invalid non-unique-sentinel value type {type(non_unique_sentinel)}")
jmao-denver marked this conversation as resolved.
Show resolved Hide resolved
else:
agg_spec_non_unique_sentinel = _GrpcAggSpec.AggSpecNonUniqueSentinel(null_value=_GrpcNullValue.NULL_VALUE)

agg_spec = _GrpcAggSpec(
unique=_GrpcAggSpec.AggSpecUnique(include_nulls=include_nulls,
non_unique_sentinel=agg_spec_non_unique_sentinel))
return _AggregationColumns(agg_spec=agg_spec, cols=to_list(cols))


Expand Down Expand Up @@ -356,3 +407,19 @@ def weighted_sum(wcol: str, cols: Union[str, List[str]] = None) -> Aggregation:
"""
agg_spec = _GrpcAggSpec(weighted_sum=_GrpcAggSpec.AggSpecWeighted(weight_column=wcol))
return _AggregationColumns(agg_spec=agg_spec, cols=to_list(cols))


def distinct(cols: Union[str, List[str]] = None, include_nulls: bool = False) -> Aggregation:
"""Creates a Distinct aggregation which computes the distinct values within an aggregation group for each of the
given columns and stores them as vectors.

Args:
cols (Union[str, List[str]]): the column(s) to aggregate on, can be renaming expressions, i.e. "new_col = col";
default is None, only valid when used in Table agg_all_by operation
include_nulls (bool): whether nulls should be included as distinct values, default is False

Returns:
an aggregation
"""
agg_spec = _GrpcAggSpec(distinct=_GrpcAggSpec.AggSpecDistinct(include_nulls=include_nulls))
return _AggregationColumns(agg_spec=agg_spec, cols=to_list(cols))
38 changes: 37 additions & 1 deletion py/client/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@
import time
import unittest

import numpy as np
from pyarrow import csv

from pydeephaven import DHError
from pydeephaven import SortDirection
from pydeephaven.agg import sum_, avg, pct, weighted_avg, count_, partition
from pydeephaven.agg import sum_, avg, pct, weighted_avg, count_, partition, median, unique, count_distinct, distinct
from pydeephaven.table import Table
from tests.testbase import BaseTestCase

Expand Down Expand Up @@ -294,6 +295,41 @@ def test_meta_table(self):
test_table = self.session.import_table(pa_table).drop_columns(["e"])
self.assertEqual(len(test_table.schema), len(test_table.meta_table.to_arrow()))

def test_agg_with_options(self):
pa_table = csv.read_csv(self.csv_file)
test_table = self.session.import_table(pa_table).update(["b = a % 10 > 5 ? null : b", "c = c % 10",
"d = (char)i"])

aggs = [
median(cols=["ma = a", "mb = b"], average_evenly_divided=False),
pct(0.20, cols=["pa = a", "pb = b"], average_evenly_divided=True),
unique(cols=["ua = a", "ub = b"], include_nulls=True, non_unique_sentinel=np.int16(-1)),
unique(cols=["ud = d"], include_nulls=True, non_unique_sentinel=np.uint16(128)),
count_distinct(cols=["csa = a", "csb = b"], count_nulls=True),
distinct(cols=["da = a", "db = b"], include_nulls=True),
]
rt = test_table.agg_by(aggs=aggs, by=["c"])
self.assertEqual(rt.size, test_table.select_distinct(["c"]).size)

with self.assertRaises(TypeError):
aggs = [unique(cols=["ua = a", "ub = b"], include_nulls=True, non_unique_sentinel=np.uint32(-1))]

aggs_default = [
median(cols=["ma = a", "mb = b"]),
pct(0.20, cols=["pa = a", "pb = b"]),
unique(cols=["ua = a", "ub = b"]),
count_distinct(cols=["csa = a", "csb = b"]),
distinct(cols=["da = a", "db = b"]),
]

for agg_option, agg_default in zip(aggs, aggs_default):
with self.subTest(agg_option):
rt_option = test_table.agg_by(aggs=agg_option, by=["c"])
rt_default = test_table.agg_by(aggs=agg_default, by=["c"])
pa_table1 = rt_option.to_arrow()
pa_table2 = rt_default.to_arrow()
self.assertNotEqual(pa_table2, pa_table1)


if __name__ == '__main__':
unittest.main()
Loading