Skip to content

Commit

Permalink
Minor corrections.
Browse files Browse the repository at this point in the history
  • Loading branch information
yohplala committed Apr 21, 2024
1 parent ec7dc4e commit 5360c49
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 42 deletions.
1 change: 1 addition & 0 deletions oups/aggstream/aggstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,7 @@ def _iter_data(
for storing temporary results from one chunk processing to
the next. Its initial value is that provided by `check.buffer`.
In-place modifications of seed dataframe can be carried out here.
check_buffer : dict or None
Buffer to keep track of intermediate data that can be required for
proceeding with check of individual seed item.
Expand Down
61 changes: 32 additions & 29 deletions tests/test_aggstream/test_aggstream_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from oups import AggStream
from oups import ParquetSet
from oups import toplevel
from oups.aggstream.aggstream import KEY_AGG
from oups.aggstream.aggstream import KEY_AGGSTREAM
from oups.aggstream.aggstream import KEY_RESTART_INDEX
from oups.aggstream.aggstream import SeedCheckException
Expand All @@ -44,6 +45,8 @@
from oups.aggstream.jcumsegagg import LAST
from oups.aggstream.jcumsegagg import MAX
from oups.aggstream.jcumsegagg import MIN
from oups.aggstream.segmentby import KEY_BIN_BY
from oups.aggstream.segmentby import KEY_SNAP_BY
from oups.aggstream.segmentby import by_x_rows


Expand Down Expand Up @@ -77,16 +80,16 @@ def test_3_keys_only_bins(store, seed_path):
key2 = Indexer("agg_13T")
key3 = Indexer("agg_4rows")
key1_cf = {
"bin_by": TimeGrouper(key=ordered_on, freq="2T", closed="left", label="left"),
"agg": {FIRST: ("val", FIRST), LAST: ("val", LAST)},
KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="2T", closed="left", label="left"),
KEY_AGG: {FIRST: ("val", FIRST), LAST: ("val", LAST)},
}
key2_cf = {
"bin_by": TimeGrouper(key=ordered_on, freq="13T", closed="left", label="left"),
"agg": {FIRST: ("val", FIRST), MAX: ("val", MAX)},
KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="13T", closed="left", label="left"),
KEY_AGG: {FIRST: ("val", FIRST), MAX: ("val", MAX)},
}
key3_cf = {
"bin_by": by_x_rows,
"agg": {MIN: ("val", MIN), MAX: ("val", MAX)},
KEY_BIN_BY: by_x_rows,
KEY_AGG: {MIN: ("val", MIN), MAX: ("val", MAX)},
}
max_row_group_size = 6
key_configs = {
Expand Down Expand Up @@ -130,23 +133,23 @@ def get_ref_results(seed_df):
key3: deepcopy(key3_cf),
}
k1_res = (
seed_df.groupby(key_configs[key1]["bin_by"])
.agg(**key_configs[key1]["agg"])
seed_df.groupby(key_configs[key1][KEY_BIN_BY])
.agg(**key_configs[key1][KEY_AGG])
.reset_index()
)
k1_res[FIRST] = k1_res[FIRST].astype(DTYPE_NULLABLE_INT64)
k1_res[LAST] = k1_res[LAST].astype(DTYPE_NULLABLE_INT64)
k2_res = (
seed_df.groupby(key_configs[key2]["bin_by"])
.agg(**key_configs[key2]["agg"])
seed_df.groupby(key_configs[key2][KEY_BIN_BY])
.agg(**key_configs[key2][KEY_AGG])
.reset_index()
)
key3_bins = by_x_rows(on=seed_df[ordered_on], buffer={})
key3_bins = pSeries(pNaT, index=np.arange(len(seed_df)))
key3_bin_starts = np.arange(0, len(seed_df), 4)
key3_bins.iloc[key3_bin_starts] = seed_df.iloc[key3_bin_starts].loc[:, ordered_on]
key3_bins.ffill(inplace=True)
k3_res = seed_df.groupby(key3_bins).agg(**key_configs[key3]["agg"])
k3_res = seed_df.groupby(key3_bins).agg(**key_configs[key3][KEY_AGG])
k3_res.index.name = ordered_on
k3_res.reset_index(inplace=True)
return k1_res, k2_res, k3_res
Expand Down Expand Up @@ -231,8 +234,8 @@ def test_exception_different_indexes_at_restart(store, seed_path):
# Setup a 1st separate streamed aggregations (awkward...).
key1 = Indexer("agg_2T")
key1_cf = {
"bin_by": TimeGrouper(key=ordered_on, freq="2T", closed="left", label="left"),
"agg": {FIRST: ("val", FIRST), LAST: ("val", LAST)},
KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="2T", closed="left", label="left"),
KEY_AGG: {FIRST: ("val", FIRST), LAST: ("val", LAST)},
}
max_row_group_size = 6
as1 = AggStream(
Expand All @@ -259,8 +262,8 @@ def test_exception_different_indexes_at_restart(store, seed_path):
# Setup a 2nd separate streamed aggregation.
key2 = Indexer("agg_13T")
key2_cf = {
"bin_by": TimeGrouper(key=ordered_on, freq="13T", closed="left", label="left"),
"agg": {FIRST: ("val", FIRST), MAX: ("val", MAX)},
KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="13T", closed="left", label="left"),
KEY_AGG: {FIRST: ("val", FIRST), MAX: ("val", MAX)},
}
as2 = AggStream(
ordered_on=ordered_on,
Expand Down Expand Up @@ -322,13 +325,13 @@ def check(seed_chunk, check_buffer=None):

key1 = Indexer("agg_2T")
key1_cf = {
"bin_by": TimeGrouper(key=ordered_on, freq="2T", closed="left", label="left"),
"agg": {FIRST: ("val", FIRST), LAST: ("val", LAST)},
KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="2T", closed="left", label="left"),
KEY_AGG: {FIRST: ("val", FIRST), LAST: ("val", LAST)},
}
key2 = Indexer("agg_60T")
key2_cf = {
"bin_by": TimeGrouper(key=ordered_on, freq="60T", closed="left", label="left"),
"agg": {FIRST: ("val", FIRST), MAX: ("val", MAX)},
KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="60T", closed="left", label="left"),
KEY_AGG: {FIRST: ("val", FIRST), MAX: ("val", MAX)},
}
filter1 = "filter1"
filter2 = "filter2"
Expand Down Expand Up @@ -525,20 +528,20 @@ def test_3_keys_bins_snaps_filters(store, seed_path):
max_row_group_size = 5
snap_duration = "5T"
common_key_params = {
"snap_by": TimeGrouper(key=ordered_on, freq=snap_duration, closed="left", label="right"),
"agg": {FIRST: (val, FIRST), LAST: (val, LAST)},
KEY_SNAP_BY: TimeGrouper(key=ordered_on, freq=snap_duration, closed="left", label="right"),
KEY_AGG: {FIRST: (val, FIRST), LAST: (val, LAST)},
}
key1 = Indexer("agg_10T")
key1_cf = {
"bin_by": TimeGrouper(key=ordered_on, freq="10T", closed="left", label="right"),
KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="10T", closed="left", label="right"),
}
key2 = Indexer("agg_20T")
key2_cf = {
"bin_by": TimeGrouper(key=ordered_on, freq="20T", closed="left", label="right"),
KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="20T", closed="left", label="right"),
}
key3 = Indexer("agg_2T")
key3_cf = {
"bin_by": TimeGrouper(key=ordered_on, freq="2T", closed="left", label="right"),
KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="2T", closed="left", label="right"),
}
filter1 = "filter1"
filter2 = "filter2"
Expand Down Expand Up @@ -967,20 +970,20 @@ def test_3_keys_bins_snaps_filters_restart(store, seed_path):
max_row_group_size = 5
snap_duration = "5T"
common_key_params = {
"snap_by": TimeGrouper(key=ordered_on, freq=snap_duration, closed="left", label="right"),
"agg": {FIRST: (val, FIRST), LAST: (val, LAST)},
KEY_SNAP_BY: TimeGrouper(key=ordered_on, freq=snap_duration, closed="left", label="right"),
KEY_AGG: {FIRST: (val, FIRST), LAST: (val, LAST)},
}
key1 = Indexer("agg_10T")
key1_cf = {
"bin_by": TimeGrouper(key=ordered_on, freq="10T", closed="left", label="right"),
KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="10T", closed="left", label="right"),
}
key2 = Indexer("agg_20T")
key2_cf = {
"bin_by": TimeGrouper(key=ordered_on, freq="20T", closed="left", label="right"),
KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="20T", closed="left", label="right"),
}
key3 = Indexer("agg_2T")
key3_cf = {
"bin_by": TimeGrouper(key=ordered_on, freq="2T", closed="left", label="right"),
KEY_BIN_BY: TimeGrouper(key=ordered_on, freq="2T", closed="left", label="right"),
}
filter1 = "filter1"
filter2 = "filter2"
Expand Down
28 changes: 15 additions & 13 deletions tests/test_aggstream/test_aggstream_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@
from oups.aggstream.jcumsegagg import FIRST
from oups.aggstream.jcumsegagg import LAST
from oups.aggstream.jcumsegagg import SUM
from oups.aggstream.segmentby import KEY_BIN_BY
from oups.aggstream.segmentby import KEY_ORDERED_ON
from oups.aggstream.segmentby import KEY_SNAP_BY
from oups.store.writer import KEY_DUPLICATES_ON
from oups.store.writer import KEY_MAX_ROW_GROUP_SIZE

Expand Down Expand Up @@ -81,7 +83,7 @@ def always_false(**kwargs):
KEY_MAX_ROW_GROUP_SIZE: 6,
KEY_ORDERED_ON: "ts",
"keys": Indexer("key1"),
"bin_by": TimeGrouper(key="ts", freq="1H", closed="left", label="left"),
KEY_BIN_BY: TimeGrouper(key="ts", freq="1H", closed="left", label="left"),
KEY_AGG: {"agg_out": ("val", SUM)},
},
# ref_seed_config
Expand Down Expand Up @@ -131,22 +133,22 @@ def always_false(**kwargs):
"keys": {
Indexer("key1_some_default"): {
KEY_AGG: {"out_spec": ("in_spec", FIRST)},
"bin_by": TimeGrouper(key="ts_dflt", freq="1H"),
KEY_BIN_BY: TimeGrouper(key="ts_dflt", freq="1H"),
KEY_POST: always_false,
},
Indexer("key2_only_specific"): {
KEY_AGG: {"out_spec": ("in_spec", FIRST)},
"bin_by": always_true,
KEY_BIN_BY: always_true,
KEY_POST: None,
KEY_MAX_ROW_GROUP_SIZE: 3000,
KEY_ORDERED_ON: "ts_spec",
},
Indexer("key3_only_default"): {
"bin_by": always_false,
KEY_BIN_BY: always_false,
"bin_on": ("bin_on_spec", "bin_out_spec"),
},
Indexer("key4_most_default"): {
"bin_by": TimeGrouper(key="ts_dflt", freq="1H"),
KEY_BIN_BY: TimeGrouper(key="ts_dflt", freq="1H"),
KEY_ORDERED_ON: "ts_spec",
},
},
Expand Down Expand Up @@ -235,29 +237,29 @@ def always_false(**kwargs):
KEY_CHECK: always_true,
KEY_AGG: {"out_dflt": ("in_dflt", LAST)},
KEY_POST: always_true,
"snap_by": TimeGrouper(key="ts_dflt", freq="30T"),
KEY_SNAP_BY: TimeGrouper(key="ts_dflt", freq="30T"),
"keys": {
"filter1": {
Indexer("key1_some_default"): {
KEY_AGG: {"out_spec": ("in_spec", FIRST)},
"bin_by": TimeGrouper(key="ts_dflt", freq="1H"),
KEY_BIN_BY: TimeGrouper(key="ts_dflt", freq="1H"),
KEY_POST: always_false,
},
Indexer("key2_only_specific"): {
KEY_AGG: {"out_spec": ("in_spec", FIRST)},
"bin_by": always_true,
KEY_BIN_BY: always_true,
KEY_POST: None,
KEY_MAX_ROW_GROUP_SIZE: 3000,
KEY_ORDERED_ON: "ts_spec",
},
Indexer("key3_only_default"): {
"bin_by": always_false,
KEY_BIN_BY: always_false,
"bin_on": ("bin_on_spec", "bin_out_spec"),
},
},
"filter2": {
Indexer("key4_most_default"): {
"bin_by": TimeGrouper(key="ts_dflt", freq="1H"),
KEY_BIN_BY: TimeGrouper(key="ts_dflt", freq="1H"),
KEY_ORDERED_ON: "ts_spec",
},
},
Expand Down Expand Up @@ -357,9 +359,9 @@ def test_aggstream_init(
assert as_.seed_config == ref_seed_config
# Do not check 'seg_config' in 'keys_config'.
res_keys_config = deepcopy(as_.keys_config)
if "snap_by" in root_parameters:
if KEY_SNAP_BY in root_parameters:
# Check 'snap_by' is initialized in 'seg_config':
ref_grouper = root_parameters["snap_by"]
ref_grouper = root_parameters[KEY_SNAP_BY]
ref_grouper_attr = {
"key": ref_grouper.key,
"freq": ref_grouper.freq,
Expand All @@ -373,7 +375,7 @@ def test_aggstream_init(
"origin": ref_grouper.origin,
}
for key in ref_keys_config:
res_grouper = res_keys_config[key]["seg_config"]["snap_by"]
res_grouper = res_keys_config[key]["seg_config"][KEY_SNAP_BY]
for attr in ref_grouper_attr:
assert getattr(res_grouper, attr) == ref_grouper_attr[attr]
for key, ref in ref_keys_config.items():
Expand Down

0 comments on commit 5360c49

Please sign in to comment.