Skip to content

Commit

Permalink
Only raising exception if seed is not ordered.
Browse files Browse the repository at this point in the history
  • Loading branch information
yohplala committed Apr 18, 2024
1 parent a97c2ba commit e7effce
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 29 deletions.
24 changes: 16 additions & 8 deletions oups/aggstream/aggstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,14 @@ class SeedCheckException(Exception):
Exception related to user-defined checks on seed chunk.
"""

def __init__(self):
def __init__(self, message: str = None):
"""
Exception message.
"""
self.message = "failing used-defined checks."
if message is None:
self.message = "failing user-defined checks."
else:
self.message = message


def _iter_data(
Expand Down Expand Up @@ -472,20 +475,24 @@ def _iter_data(
trim_start = False
seed_remainder = None
for seed_chunk in seed:
# Check seed chunk is ordered on 'ordered_on', else order.
# Check seed chunk is ordered on 'ordered_on'.
# This re-ordering is made because for 'trim_start' and
# 'discard_last', this ordereding is required.
if not seed_chunk[ordered_on].is_monotonic_increasing:
seed_chunk.sort_values(by=ordered_on, inplace=True)
# Currently un-eased to silently modify seed data without knowing
# if it makes sense, so leaving this row commented.
# seed_chunk.sort_values(by=ordered_on, inplace=True)
# Instead, raise an exception.
raise SeedCheckException("seed data is not in ascending order.")
# Step 1 / Seed check by user.
if check:
# Apply user checks.
try:
check(seed_chunk, check_buffer)
except Exception:
except Exception as e:
# Stop iteration in case of failing check.
# Aggregation has been run up to the last valid chunk.
raise SeedCheckException()
raise SeedCheckException(str(e))
# Step 2 / If a previous remainder, concatenate it to give current
# DataFrame its 'final' length.
if not (seed_remainder is None or seed_remainder.empty):
Expand Down Expand Up @@ -1350,8 +1357,9 @@ def agg(
# Set 'seed_index_restart' to the 'last_seed_index' with
# which restarting the next aggregation iteration.
self.seed_config[KEY_RESTART_INDEX] = _last_seed_index
except SeedCheckException:
except SeedCheckException as sce:
seed_check_exception = True
exception_message = str(sce)
if final_write:
# Post-process & write results from last iteration, this time
# keeping last aggregation row, and recording metadata for a
Expand All @@ -1370,4 +1378,4 @@ def agg(
for key, agg_res in self.agg_buffers.items()
)
if seed and seed_check_exception:
raise SeedCheckException()
raise SeedCheckException(exception_message)
44 changes: 23 additions & 21 deletions tests/test_aggstream/test_aggstream_advanced.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,12 +303,22 @@ def test_exception_seed_check_and_restart(store, seed_path):
# - key 1: filter1, time grouper '2T', agg 'first', and 'last',
# - key 2: filter2, time grouper '15T', agg 'first', and 'max',
#
start = Timestamp("2020/01/01")
rr = np.random.default_rng(1)
N = 20
rand_ints = rr.integers(100, size=N)
rand_ints.sort()
ts = [start + Timedelta(f"{mn}T") for mn in rand_ints]
ref_idx = 10

def check(seed_chunk, check_buffer=None):
"""
Raise a 'ValueError' if a NaT is in 'ordered_on' column.
Raise a 'ValueError' if 'ts[10]' is at start in 'ordered_on' column.
"""
if seed_chunk.loc[:, ordered_on].isna().any():
raise ValueError
if seed_chunk.iloc[0].loc[ordered_on] == ts[ref_idx]:
raise ValueError(
f"not possible to have {ts[ref_idx]} as first value in 'ordered_on' column.",
)

key1 = Indexer("agg_2T")
key1_cf = {
Expand Down Expand Up @@ -339,47 +349,39 @@ def check(seed_chunk, check_buffer=None):
check=check,
)
# Seed data.
start = Timestamp("2020/01/01")
rr = np.random.default_rng(1)
N = 20
rand_ints = rr.integers(100, size=N)
rand_ints.sort()
ts = [start + Timedelta(f"{mn}T") for mn in rand_ints]
filter_val = np.ones(len(ts), dtype=bool)
filter_val[::2] = False
seed_orig = pDataFrame({ordered_on: ts, "val": rand_ints, filter_on: filter_val})
seed_mod = seed_orig.copy(deep=True)
# Set a 'NaT' in 'ordered_on' column, 2nd chunk for raising an exception.
seed_mod.iloc[11, seed_orig.columns.get_loc(ordered_on)] = pNaT
seed = pDataFrame({ordered_on: ts, "val": rand_ints, filter_on: filter_val})
# Streamed aggregation, raising an exception, but 1st chunk should be
# written.
with pytest.raises(SeedCheckException):
with pytest.raises(SeedCheckException, match="^not possible to have"):
as_.agg(
seed=[seed_mod[:10], seed_mod[10:]],
seed=[seed[:ref_idx], seed[ref_idx:]],
trim_start=False,
discard_last=False,
final_write=True,
)
# Check 'restart_index' in results.
restart_index = seed_mod.iloc[9, seed_mod.columns.get_loc(ordered_on)]
assert store[key1]._oups_metadata[KEY_AGGSTREAM][KEY_RESTART_INDEX] == restart_index
assert store[key2]._oups_metadata[KEY_AGGSTREAM][KEY_RESTART_INDEX] == restart_index
assert store[key1]._oups_metadata[KEY_AGGSTREAM][KEY_RESTART_INDEX] == ts[ref_idx - 1]
assert store[key2]._oups_metadata[KEY_AGGSTREAM][KEY_RESTART_INDEX] == ts[ref_idx - 1]
# "Correct" seed.
seed.iloc[ref_idx, seed.columns.get_loc(ordered_on)] = ts[ref_idx] + Timedelta("1s")
# Restart with 'corrected' seed.
as_.agg(
seed=seed_orig[10:],
seed=seed[ref_idx:],
trim_start=False,
discard_last=False,
final_write=True,
)
# Check with ref results.
bin_res_ref_key1 = cumsegagg(
data=seed_orig.loc[seed_orig[filter_on], :],
data=seed.loc[seed[filter_on], :],
**key1_cf,
ordered_on=ordered_on,
)
assert store[key1].pdf.equals(bin_res_ref_key1.reset_index())
bin_res_ref_key2 = cumsegagg(
data=seed_orig.loc[~seed_orig[filter_on], :],
data=seed.loc[~seed[filter_on], :],
**key2_cf,
ordered_on=ordered_on,
)
Expand Down
43 changes: 43 additions & 0 deletions tests/test_aggstream/test_aggstream_simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
from oups.aggstream.aggstream import KEY_POST_BUFFER
from oups.aggstream.aggstream import KEY_RESTART_INDEX
from oups.aggstream.aggstream import NO_FILTER_ID
from oups.aggstream.aggstream import SeedCheckException
from oups.aggstream.cumsegagg import DTYPE_NULLABLE_INT64
from oups.aggstream.jcumsegagg import FIRST
from oups.aggstream.jcumsegagg import LAST
Expand Down Expand Up @@ -1431,3 +1432,45 @@ def post(buffer: dict, bin_res: pDataFrame):
ref_res = seed.groupby(bin_by).agg(**agg).reset_index(drop=True)
rec_res = store[key].pdf
assert rec_res.equals(ref_res)


def test_exception_unordered_seed(store, seed_path):
# Test exception when checking seed data, with seed unordered.
# - key 1: time grouper '2T', agg 'first', and 'last',
#
start = Timestamp("2020/01/01")
rr = np.random.default_rng(1)
N = 20
rand_ints = rr.integers(100, size=N)
rand_ints.sort()
ts = [start + Timedelta(f"{mn}T") for mn in rand_ints]
ref_idx = 10

ordered_on = "ts"
key_cf = {
"bin_by": TimeGrouper(key=ordered_on, freq="2T", closed="left", label="left"),
"agg": {FIRST: ("val", FIRST), LAST: ("val", LAST)},
}
max_row_group_size = 6
as_ = AggStream(
ordered_on=ordered_on,
store=store,
keys=key,
**key_cf,
max_row_group_size=max_row_group_size,
)
# Seed data.
seed = pDataFrame({ordered_on: ts, "val": rand_ints})
# Set a 'NaT' in 'ordered_on' column, 2nd chunk for raising an exception.
seed.iloc[ref_idx, seed.columns.get_loc(ordered_on)] = pNaT
# Streamed aggregation, raising an exception, but 1st chunk should be
# written.
with pytest.raises(SeedCheckException, match="^seed data is not in"):
as_.agg(
seed=[seed[:ref_idx], seed[ref_idx:]],
trim_start=False,
discard_last=False,
final_write=True,
)
# Check 'restart_index' in results.
assert store[key]._oups_metadata[KEY_AGGSTREAM][KEY_RESTART_INDEX] == ts[ref_idx - 1]

0 comments on commit e7effce

Please sign in to comment.