From 66add045dba87524afc5ce7f4520235c1b46adfe Mon Sep 17 00:00:00 2001 From: yohplala Date: Sun, 16 Jun 2024 12:07:39 +0200 Subject: [PATCH 1/4] Record 'pre_buffer' in metadata'. --- oups/aggstream/aggstream.py | 104 ++++++++++++++++++++++++------------ 1 file changed, 69 insertions(+), 35 deletions(-) diff --git a/oups/aggstream/aggstream.py b/oups/aggstream/aggstream.py index d698c6d..1e7f123 100644 --- a/oups/aggstream/aggstream.py +++ b/oups/aggstream/aggstream.py @@ -51,13 +51,13 @@ ACCEPTED_AGG_FUNC = {FIRST, LAST, MIN, MAX, SUM} # List of keys. KEY_AGGSTREAM = "aggstream" +KEY_PRE = "pre" +KEY_PRE_BUFFER = "pre_buffer" KEY_SEGAGG_BUFFER = "segagg_buffer" KEY_POST_BUFFER = "post_buffer" KEY_BIN_ON_OUT = "bin_on_out" KEY_AGG_RES_BUFFER = "agg_res_buffer" KEY_BIN_RES_BUFFER = "bin_res_buffer" -KEY_PRE = "pre" -KEY_PRE_BUFFER = "pre_buffer" KEY_FILTERS = "filters" KEY_RESTART_INDEX = "restart_index" KEY_AGG_RES = "agg_res" @@ -233,12 +233,12 @@ def _init_keys_config( return consolidated_keys_config, agg_pd -def _init_agg_buffers( +def _init_buffers( store: ParquetSet, keys: dict, ): """ - Initialize aggregation buffers into ``agg_buffers``. + Initialize pre, aggregation and post buffers from existing results. Also set ``seed_index_restart``. @@ -256,6 +256,8 @@ def _init_agg_buffers( - ``seed_index_restart``, int, float or pTimestamp, the index from which (included) should be restarted the nex aggregation iteration. + - ``pre_buffer``, dict, user-defined buffer to keep track of intermediate + variables between successive pre-processing of individual seed chunk. - ``agg_buffers``, dict of aggregation buffer variables specific for each key, in the form: ``{key: {'agg_n_rows' : 0, @@ -271,6 +273,7 @@ def _init_agg_buffers( """ agg_buffers = {} seed_index_restart_set = set() + pre_buffer_set = set() for key in keys: # Default values for aggregation counters and buffers. # 'agg_n_rows' : number of rows in aggregation result. @@ -293,6 +296,7 @@ def _init_agg_buffers( ) aggstream_md = prev_agg_res._oups_metadata[KEY_AGGSTREAM] # - 'last_seed_index' to trim accordingly head of seed data. + # - metadata related to pre-processing of individual seed chunk. # - metadata related to binning process from past binnings # on prior data. It is used in case 'bin_by' is a callable. # If not used, it is an empty dict. @@ -300,6 +304,7 @@ def _init_agg_buffers( # aggregation results, to be used by 'post'. If not used, # it is an empty dict. seed_index_restart_set.add(aggstream_md[KEY_RESTART_INDEX]) + pre_buffer_set.add(aggstream_md[KEY_PRE_BUFFER]) agg_buffers[key][KEY_SEGAGG_BUFFER] = ( aggstream_md[KEY_SEGAGG_BUFFER] if aggstream_md[KEY_SEGAGG_BUFFER] else {} ) @@ -312,11 +317,17 @@ def _init_agg_buffers( if len(seed_index_restart_set) > 1: raise ValueError( - "not possible to aggregate on multiple keys with existing" - " aggregation results not aggregated up to the same seed index.", + "not possible to aggregate on multiple keys with existing " + "aggregation results not aggregated up to the same seed index.", + ) + if len(pre_buffer_set) > 1: + raise ValueError( + "not possible to aggregate on multiple keys with existing " + "'pre_buffer' values, different between different keys.", ) return ( None if not seed_index_restart_set else seed_index_restart_set.pop(), + {} if not pre_buffer_set else pre_buffer_set.pop(), agg_buffers, ) @@ -425,7 +436,7 @@ def _iter_data( the next. Its initial value is that provided by `pre_buffer`. In-place modifications of seed dataframe has to be carried out here. - pre_buffer : dict or None + pre_buffer : dict Buffer to keep track of intermediate data that can be required for proceeding with pre of individual seed item. filters : dict or None @@ -453,6 +464,9 @@ def _iter_data( - 'last_seed_index', Union[int, float, pTimestamp], the last seed index value (likely of an incomplete group), of the current seed chunk, before filters are applied. + - 'pre_buffer' : dict, buffer to keep track of intermediate data that + can be required for proceeding with preprocessing of individual seed + chunk. - 'filter_id', str, indicating which set of filters has been applied for the seed chunk provided. - 'filtered_chunk', pDataFrame, from the seed Iterable, with @@ -466,8 +480,9 @@ def _iter_data( Reasons to discard last seed row (or row group) may be twofold: - last row is temporary (yet to get some final values, for instance if seed data is some kind of aggregation stream itself), - - last rows are part of a single row group not yet complete itself - (new rows part of this row group to be expected). + - last rows are part of a single row group 'same index value in + 'ordered_on')not yet complete itself (new rows part of this row group + to be expected). """ if restart_index is None: @@ -489,7 +504,7 @@ def _iter_data( if pre: # Apply user checks. try: - pre(seed_chunk, pre_buffer) + pre(on=seed_chunk, buffer=pre_buffer) except Exception as e: # Stop iteration in case of failing pre. # Aggregation has been run up to the last valid chunk. @@ -531,10 +546,12 @@ def _iter_data( continue elif filter_array_loc.all(): # If filter only contains 1, simply return full seed chunk. - yield last_seed_index, filt_id, seed_chunk + yield last_seed_index, pre_buffer, filt_id, seed_chunk else: # Otherwise, filter. - yield last_seed_index, filt_id, seed_chunk.loc[filter_array_loc].reset_index( + yield last_seed_index, pre_buffer, filt_id, seed_chunk.loc[ + filter_array_loc + ].reset_index( drop=True, ) @@ -589,6 +606,7 @@ def _post_n_write_agg_chunks( index_name: Optional[str] = None, post: Optional[Callable] = None, last_seed_index: Optional[Union[int, float, pTimestamp]] = None, + pre_buffer: Optional[dict] = None, ): """ Write list of aggregation row groups with optional post. @@ -666,19 +684,25 @@ def _post_n_write_agg_chunks( Last index in seed data. Can be numeric type, timestamp... (for recording in metadata of aggregation results) Writing metadata is triggered ONLY if ``last_seed_index`` is provided. + pre_buffer : dict or None + Buffer to keep track of intermediate data that can be required for + proceeding with preprocessing of individual seed chunk. """ + # TODO: rework to have 'OUPS_METADATA' be set incrementally. + # TODO: should it be 'is None'? or directly 'not'? if (agg_res := agg_buffers[KEY_AGG_RES]) is None: # No iteration has been achieved, as no data. if last_seed_index: # If 'last_seed_index', at least set it in oups metadata. # It is possible new seed data has been streamed and taken into # account, but used for this key, because having been filtered out. - OUPS_METADATA[key] = { - KEY_AGGSTREAM: { - KEY_RESTART_INDEX: last_seed_index, - }, - } + # Also set 'pre_buffer' if not null. + OUPS_METADATA[key] = ( + {KEY_AGGSTREAM: {KEY_RESTART_INDEX: last_seed_index, KEY_PRE_BUFFER: pre_buffer}} + if pre_buffer + else {KEY_AGGSTREAM: {KEY_RESTART_INDEX: last_seed_index}} + ) write_metadata(pf=store[key].pf, md_key=key) return # Concat list of aggregation results. @@ -703,13 +727,25 @@ def _post_n_write_agg_chunks( ) if last_seed_index: # If 'last_seed_index', set oups metadata. - OUPS_METADATA[key] = { - KEY_AGGSTREAM: { - KEY_RESTART_INDEX: last_seed_index, - KEY_SEGAGG_BUFFER: agg_buffers[KEY_SEGAGG_BUFFER], - KEY_POST_BUFFER: post_buffer, - }, - } + # Also set 'pre_buffer' if not null. + OUPS_METADATA[key] = ( + { + KEY_AGGSTREAM: { + KEY_RESTART_INDEX: last_seed_index, + KEY_SEGAGG_BUFFER: agg_buffers[KEY_SEGAGG_BUFFER], + KEY_POST_BUFFER: post_buffer, + }, + } + if pre_buffer + else { + KEY_AGGSTREAM: { + KEY_RESTART_INDEX: last_seed_index, + KEY_PRE_BUFFER: pre_buffer, + KEY_SEGAGG_BUFFER: agg_buffers[KEY_SEGAGG_BUFFER], + KEY_POST_BUFFER: post_buffer, + }, + } + ) # Record data. store[key] = write_config, agg_res # Reset aggregation buffers and counters. @@ -816,7 +852,8 @@ class AggStream: aggregation iteration. 'pre' : Callable, to apply user-defined pre-processing on seed. 'pre_buffer' : dict, to keep track of intermediate values for - `pre` function. + proceeding with pre-processing of individual seed + items (by `pre` function). 'filters' : dict, as per `filters` parameter. }`` - ``self.store``, oups store, as per `store` parameter. @@ -872,7 +909,6 @@ def __init__( store: ParquetSet, keys: Union[dataclass, dict], pre: Optional[Callable] = None, - pre_buffer: Optional[dict] = None, filters: Optional[dict] = None, agg: Optional[dict] = None, bin_by: Optional[Union[TimeGrouper, Callable[[Series, dict], tuple]]] = None, @@ -958,12 +994,6 @@ def __init__( Modification of seed chunk, if any, has to be realized in-place. No DataFrame returned by this function is expected. - pre_buffer : dict, default None - Buffer to keep track of intermediate data that can be required for - proceeding with pre-processing of individual seed item. - Once aggregation stream is over, its value is not recorded. - User has to take care of this if needed. Its value can be - retrieved with ``self.pre_buffer`` object attribute. filters : Union[dict, None], default None Dict in the form ``{"filter_id":[[("col", op, val), ...], ...]}`` @@ -1209,12 +1239,13 @@ def __init__( ) = _init_keys_config(ordered_on, keys, keys_default) ( restart_index, + pre_buffer, self.agg_buffers, - ) = _init_agg_buffers(store, keys) + ) = _init_buffers(store, keys) self.seed_config = { KEY_ORDERED_ON: ordered_on, KEY_PRE: pre, - KEY_PRE_BUFFER: {} if pre_buffer is None else pre_buffer, + KEY_PRE_BUFFER: pre_buffer, KEY_FILTERS: filters, KEY_RESTART_INDEX: restart_index, } @@ -1350,7 +1381,7 @@ def agg( seed = self._init_agg_cs(seed) seed_check_exception = False try: - for _last_seed_index, filter_id, filtered_chunk in _iter_data( + for _last_seed_index, _pre_buffer, filter_id, filtered_chunk in _iter_data( seed=seed, **self.seed_config, trim_start=trim_start, @@ -1374,6 +1405,8 @@ def agg( # Set 'seed_index_restart' to the 'last_seed_index' with # which restarting the next aggregation iteration. self.seed_config[KEY_RESTART_INDEX] = _last_seed_index + # Also keep track of last 'pre_buffer' value. + self.seed_config[KEY_PRE_BUFFER] = _pre_buffer except SeedPreException as sce: seed_check_exception = True exception_message = str(sce) @@ -1391,6 +1424,7 @@ def agg( index_name=self.keys_config[key][KEY_BIN_ON_OUT], post=self.keys_config[key][KEY_POST], last_seed_index=self.seed_config[KEY_RESTART_INDEX], + pre_buffer=self.seed_config[KEY_PRE_BUFFER], ) for key, agg_res in self.agg_buffers.items() ) From 93fb541440867c43936f5faa524ff9164ce52c4c Mon Sep 17 00:00:00 2001 From: yohplala Date: Sun, 16 Jun 2024 12:20:58 +0200 Subject: [PATCH 2/4] All tests running again. --- oups/aggstream/aggstream.py | 12 ++++-------- tests/test_aggstream/test_aggstream_advanced.py | 4 ++-- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/oups/aggstream/aggstream.py b/oups/aggstream/aggstream.py index 1e7f123..112463b 100644 --- a/oups/aggstream/aggstream.py +++ b/oups/aggstream/aggstream.py @@ -271,9 +271,9 @@ def _init_buffers( }`` """ + pre_buffer = {} agg_buffers = {} seed_index_restart_set = set() - pre_buffer_set = set() for key in keys: # Default values for aggregation counters and buffers. # 'agg_n_rows' : number of rows in aggregation result. @@ -304,7 +304,8 @@ def _init_buffers( # aggregation results, to be used by 'post'. If not used, # it is an empty dict. seed_index_restart_set.add(aggstream_md[KEY_RESTART_INDEX]) - pre_buffer_set.add(aggstream_md[KEY_PRE_BUFFER]) + if KEY_PRE_BUFFER in aggstream_md: + pre_buffer = aggstream_md[KEY_PRE_BUFFER] agg_buffers[key][KEY_SEGAGG_BUFFER] = ( aggstream_md[KEY_SEGAGG_BUFFER] if aggstream_md[KEY_SEGAGG_BUFFER] else {} ) @@ -320,14 +321,9 @@ def _init_buffers( "not possible to aggregate on multiple keys with existing " "aggregation results not aggregated up to the same seed index.", ) - if len(pre_buffer_set) > 1: - raise ValueError( - "not possible to aggregate on multiple keys with existing " - "'pre_buffer' values, different between different keys.", - ) return ( None if not seed_index_restart_set else seed_index_restart_set.pop(), - {} if not pre_buffer_set else pre_buffer_set.pop(), + pre_buffer, agg_buffers, ) diff --git a/tests/test_aggstream/test_aggstream_advanced.py b/tests/test_aggstream/test_aggstream_advanced.py index 314ced2..119d403 100644 --- a/tests/test_aggstream/test_aggstream_advanced.py +++ b/tests/test_aggstream/test_aggstream_advanced.py @@ -314,11 +314,11 @@ def test_exception_seed_check_and_restart(store, seed_path): ts = [start + Timedelta(f"{mn}T") for mn in rand_ints] ref_idx = 10 - def check(seed_chunk, pre_buffer=None): + def check(on, buffer=None): """ Raise a 'ValueError' if 'ts[10]' is at start in 'ordered_on' column. """ - if seed_chunk.iloc[0].loc[ordered_on] == ts[ref_idx]: + if on.iloc[0].loc[ordered_on] == ts[ref_idx]: raise ValueError( f"not possible to have {ts[ref_idx]} as first value in 'ordered_on' column.", ) From 0cb225e9d99de797f167514ed3711ac9c3dafdd4 Mon Sep 17 00:00:00 2001 From: yohplala Date: Sun, 16 Jun 2024 13:23:54 +0200 Subject: [PATCH 3/4] Testing recording and retrieving 'pre_buffer'. --- oups/aggstream/aggstream.py | 36 +++---- .../test_aggstream/test_aggstream_advanced.py | 27 +++++- tests/test_aggstream/test_aggstream_simple.py | 94 ++++++++++++------- 3 files changed, 91 insertions(+), 66 deletions(-) diff --git a/oups/aggstream/aggstream.py b/oups/aggstream/aggstream.py index 112463b..40e8dc9 100644 --- a/oups/aggstream/aggstream.py +++ b/oups/aggstream/aggstream.py @@ -685,8 +685,6 @@ def _post_n_write_agg_chunks( proceeding with preprocessing of individual seed chunk. """ - # TODO: rework to have 'OUPS_METADATA' be set incrementally. - # TODO: should it be 'is None'? or directly 'not'? if (agg_res := agg_buffers[KEY_AGG_RES]) is None: # No iteration has been achieved, as no data. if last_seed_index: @@ -694,11 +692,9 @@ def _post_n_write_agg_chunks( # It is possible new seed data has been streamed and taken into # account, but used for this key, because having been filtered out. # Also set 'pre_buffer' if not null. - OUPS_METADATA[key] = ( - {KEY_AGGSTREAM: {KEY_RESTART_INDEX: last_seed_index, KEY_PRE_BUFFER: pre_buffer}} - if pre_buffer - else {KEY_AGGSTREAM: {KEY_RESTART_INDEX: last_seed_index}} - ) + OUPS_METADATA[key] = { + KEY_AGGSTREAM: {KEY_RESTART_INDEX: last_seed_index, KEY_PRE_BUFFER: pre_buffer}, + } write_metadata(pf=store[key].pf, md_key=key) return # Concat list of aggregation results. @@ -724,24 +720,14 @@ def _post_n_write_agg_chunks( if last_seed_index: # If 'last_seed_index', set oups metadata. # Also set 'pre_buffer' if not null. - OUPS_METADATA[key] = ( - { - KEY_AGGSTREAM: { - KEY_RESTART_INDEX: last_seed_index, - KEY_SEGAGG_BUFFER: agg_buffers[KEY_SEGAGG_BUFFER], - KEY_POST_BUFFER: post_buffer, - }, - } - if pre_buffer - else { - KEY_AGGSTREAM: { - KEY_RESTART_INDEX: last_seed_index, - KEY_PRE_BUFFER: pre_buffer, - KEY_SEGAGG_BUFFER: agg_buffers[KEY_SEGAGG_BUFFER], - KEY_POST_BUFFER: post_buffer, - }, - } - ) + OUPS_METADATA[key] = { + KEY_AGGSTREAM: { + KEY_RESTART_INDEX: last_seed_index, + KEY_PRE_BUFFER: pre_buffer, + KEY_SEGAGG_BUFFER: agg_buffers[KEY_SEGAGG_BUFFER], + KEY_POST_BUFFER: post_buffer, + }, + } # Record data. store[key] = write_config, agg_res # Reset aggregation buffers and counters. diff --git a/tests/test_aggstream/test_aggstream_advanced.py b/tests/test_aggstream/test_aggstream_advanced.py index 119d403..fcc43f1 100644 --- a/tests/test_aggstream/test_aggstream_advanced.py +++ b/tests/test_aggstream/test_aggstream_advanced.py @@ -37,6 +37,7 @@ from oups import toplevel from oups.aggstream.aggstream import KEY_AGG from oups.aggstream.aggstream import KEY_AGGSTREAM +from oups.aggstream.aggstream import KEY_PRE_BUFFER from oups.aggstream.aggstream import KEY_RESTART_INDEX from oups.aggstream.aggstream import SeedPreException from oups.aggstream.cumsegagg import DTYPE_NULLABLE_INT64 @@ -314,14 +315,19 @@ def test_exception_seed_check_and_restart(store, seed_path): ts = [start + Timedelta(f"{mn}T") for mn in rand_ints] ref_idx = 10 - def check(on, buffer=None): + def check(on, buffer): """ Raise a 'ValueError' if 'ts[10]' is at start in 'ordered_on' column. """ - if on.iloc[0].loc[ordered_on] == ts[ref_idx]: + if on.loc[:, ordered_on].iloc[0] == ts[ref_idx]: raise ValueError( f"not possible to have {ts[ref_idx]} as first value in 'ordered_on' column.", ) + # Keep a result to check buffer recording and retrieving both work. + if not buffer: + buffer["seed_val"] = on.loc[:, "val"].iloc[-1] + else: + buffer["seed_val"] = on.loc[:, "val"].iloc[-1] + 10 key1 = Indexer("agg_2T") key1_cf = { @@ -364,9 +370,14 @@ def check(on, buffer=None): discard_last=False, final_write=True, ) - # Check 'restart_index' in results. - assert store[key1]._oups_metadata[KEY_AGGSTREAM][KEY_RESTART_INDEX] == ts[ref_idx - 1] - assert store[key2]._oups_metadata[KEY_AGGSTREAM][KEY_RESTART_INDEX] == ts[ref_idx - 1] + # Check 'restart_index' & 'pre_buffer' in results. + pre_buffer_ref = {"seed_val": rand_ints[ref_idx - 1]} + streamagg_md_key1 = store[key1]._oups_metadata[KEY_AGGSTREAM] + assert streamagg_md_key1[KEY_RESTART_INDEX] == ts[ref_idx - 1] + assert streamagg_md_key1[KEY_PRE_BUFFER] == pre_buffer_ref + streamagg_md_key2 = store[key2]._oups_metadata[KEY_AGGSTREAM] + assert streamagg_md_key2[KEY_RESTART_INDEX] == ts[ref_idx - 1] + assert streamagg_md_key2[KEY_PRE_BUFFER] == pre_buffer_ref # "Correct" seed. seed.iloc[ref_idx, seed.columns.get_loc(ordered_on)] = ts[ref_idx] + Timedelta("1s") # Restart with 'corrected' seed. @@ -389,6 +400,12 @@ def check(on, buffer=None): ordered_on=ordered_on, ) assert store[key2].pdf.equals(bin_res_ref_key2.reset_index()) + # Check 'pre_buffer' update. + pre_buffer_ref = {"seed_val": rand_ints[-1] + 10} + streamagg_md_key1 = store[key1]._oups_metadata[KEY_AGGSTREAM] + assert streamagg_md_key1[KEY_PRE_BUFFER] == pre_buffer_ref + streamagg_md_key2 = store[key2]._oups_metadata[KEY_AGGSTREAM] + assert streamagg_md_key2[KEY_PRE_BUFFER] == pre_buffer_ref def post(buffer: dict, bin_res: pDataFrame, snap_res: pDataFrame): diff --git a/tests/test_aggstream/test_aggstream_simple.py b/tests/test_aggstream/test_aggstream_simple.py index 372d6ad..b91223e 100644 --- a/tests/test_aggstream/test_aggstream_simple.py +++ b/tests/test_aggstream/test_aggstream_simple.py @@ -22,7 +22,7 @@ from fastparquet import ParquetFile from fastparquet import write as fp_write from pandas import NA as pNA -from pandas import DataFrame as pDataFrame +from pandas import DataFrame from pandas import DatetimeIndex from pandas import NaT as pNaT from pandas import Series as pSeries @@ -36,6 +36,7 @@ from oups import toplevel from oups.aggstream.aggstream import KEY_AGGSTREAM from oups.aggstream.aggstream import KEY_POST_BUFFER +from oups.aggstream.aggstream import KEY_PRE_BUFFER from oups.aggstream.aggstream import KEY_RESTART_INDEX from oups.aggstream.aggstream import NO_FILTER_ID from oups.aggstream.aggstream import SeedPreException @@ -132,7 +133,7 @@ def test_time_grouper_sum_agg(store, seed_path): date + "14:20", ], ) - seed_df = pDataFrame({ordered_on: ts, "val": range(1, len(ts) + 1)}) + seed_df = DataFrame({ordered_on: ts, "val": range(1, len(ts) + 1)}) row_group_offsets = [0, 4, 6, 10, 12, 13, 15] fp_write(seed_path, seed_df, row_group_offsets=row_group_offsets, file_scheme="hive") seed = ParquetFile(seed_path).iter_row_groups() @@ -156,7 +157,7 @@ def test_time_grouper_sum_agg(store, seed_path): date + "14:00", ], ) - ref_res = pDataFrame({ordered_on: dti_ref, agg_col: agg_sum_ref}) + ref_res = DataFrame({ordered_on: dti_ref, agg_col: agg_sum_ref}) rec_res = store[key].pdf assert rec_res.equals(ref_res) # Check 'last_seed_index' is last timestamp, and 'post_buffer' is empty. @@ -164,6 +165,7 @@ def test_time_grouper_sum_agg(store, seed_path): # but 'cumsegagg' scope. streamagg_md = store[key]._oups_metadata[KEY_AGGSTREAM] assert streamagg_md[KEY_RESTART_INDEX] == ts[-1] + assert not streamagg_md[KEY_PRE_BUFFER] assert not streamagg_md[KEY_POST_BUFFER] # 1st append. # Complete seed_df with new data and continue aggregation. @@ -176,7 +178,7 @@ def test_time_grouper_sum_agg(store, seed_path): # 15:10 1 2 15:00 | no stitching # 15:11 2 2 15:00 | ts = DatetimeIndex([date + "15:10", date + "15:11"]) - seed_df = pDataFrame({ordered_on: ts, "val": [1, 2]}) + seed_df = DataFrame({ordered_on: ts, "val": [1, 2]}) # Using a ParquetFile is counter performant as early data will be loaded, # but this also tests 'trim_start' parameter. # To prevent re-loading already processed data, use 'filters' in @@ -205,17 +207,18 @@ def test_time_grouper_sum_agg(store, seed_path): date + "15:00", ], ) - ref_res = pDataFrame({ordered_on: dti_ref, agg_col: agg_sum_ref}) + ref_res = DataFrame({ordered_on: dti_ref, agg_col: agg_sum_ref}) rec_res = store[key].pdf assert rec_res.equals(ref_res) # Check 'last_seed_index'. streamagg_md = store[key]._oups_metadata[KEY_AGGSTREAM] assert streamagg_md[KEY_RESTART_INDEX] == ts[-1] + assert not streamagg_md[KEY_PRE_BUFFER] assert not streamagg_md[KEY_POST_BUFFER] # 2nd append, with 'discard_last=False'. # Check aggregation till the end of seed data. ts = DatetimeIndex([date + "15:20", date + "15:21"]) - seed_df = pDataFrame({ordered_on: ts, "val": [11, 12]}) + seed_df = DataFrame({ordered_on: ts, "val": [11, 12]}) fp_write(seed_path, seed_df, file_scheme="hive", append=True) seed_pf = ParquetFile(seed_path) seed = seed_pf.iter_row_groups() @@ -231,6 +234,8 @@ def test_time_grouper_sum_agg(store, seed_path): # Check 'last_seed_index'. streamagg_md = store[key]._oups_metadata[KEY_AGGSTREAM] assert streamagg_md[KEY_RESTART_INDEX] == ts[-1] + assert not streamagg_md[KEY_PRE_BUFFER] + assert not streamagg_md[KEY_POST_BUFFER] def test_time_grouper_first_last_min_max_agg(store, seed_path): @@ -261,7 +266,7 @@ def test_time_grouper_first_last_min_max_agg(store, seed_path): N = 20 rand_ints = rr.integers(100, size=N) ts = [start + Timedelta(f"{mn}T") for mn in rand_ints] - seed_df = pDataFrame( + seed_df = DataFrame( {ordered_on: ts + ts, "val": np.append(rand_ints, rand_ints + 1)}, ).sort_values(ordered_on) fp_write(seed_path, seed_df, row_group_offsets=max_row_group_size, file_scheme="hive") @@ -279,7 +284,7 @@ def test_time_grouper_first_last_min_max_agg(store, seed_path): # 1st append of new data. start = seed_df[ordered_on].iloc[-1] ts = [start + Timedelta(f"{mn}T") for mn in rand_ints] - seed_df2 = pDataFrame({ordered_on: ts, "val": rand_ints + 100}).sort_values(ordered_on) + seed_df2 = DataFrame({ordered_on: ts, "val": rand_ints + 100}).sort_values(ordered_on) fp_write( seed_path, seed_df2, @@ -301,7 +306,7 @@ def test_time_grouper_first_last_min_max_agg(store, seed_path): # 2nd append of new data. start = seed_df2[ordered_on].iloc[-1] ts = [start + Timedelta(f"{mn}T") for mn in rand_ints] - seed_df3 = pDataFrame({ordered_on: ts, "val": rand_ints + 400}).sort_values(ordered_on) + seed_df3 = DataFrame({ordered_on: ts, "val": rand_ints + 400}).sort_values(ordered_on) fp_write( seed_path, seed_df3, @@ -368,7 +373,7 @@ def test_duration_weighted_mean_from_post(store, seed_path): } # Setup 'post'. - def post(buffer: dict, bin_res: pDataFrame): + def post(buffer: dict, bin_res: DataFrame): """ Compute duration, weighted mean and keep track of data to buffer. """ @@ -427,7 +432,7 @@ def post(buffer: dict, bin_res: pDataFrame): ], ) weights = [1, 2, 1, 0, 2, 1, 2, 1, 0, 3, 2, 1, 3, 0, 1, 2, 1] - seed_df = pDataFrame({ordered_on: ts, "val": range(1, len(ts) + 1), "weight": weights}) + seed_df = DataFrame({ordered_on: ts, "val": range(1, len(ts) + 1), "weight": weights}) # Setup weighted mean: need 'weight' x 'val'. seed_df["weighted_val"] = seed_df["weight"] * seed_df["val"] row_group_offsets = [0, 4, 6, 10, 12, 13, 15] @@ -452,6 +457,7 @@ def post(buffer: dict, bin_res: pDataFrame): # Check metadata. streamagg_md = store[key]._oups_metadata[KEY_AGGSTREAM] assert streamagg_md[KEY_RESTART_INDEX] == ts[-1] + assert not streamagg_md[KEY_PRE_BUFFER] assert streamagg_md[KEY_POST_BUFFER] == {"iter_num": 1} # 1st append of new data. start = seed_df[ordered_on].iloc[-1] @@ -459,7 +465,7 @@ def post(buffer: dict, bin_res: pDataFrame): N = 30 rand_ints = rr.integers(600, size=N) ts = [start + Timedelta(f"{mn}T") for mn in rand_ints] - seed_df2 = pDataFrame( + seed_df2 = DataFrame( {ordered_on: ts, "val": rand_ints + 100, "weight": rand_ints}, ).sort_values(ordered_on) # Setup weighted mean: need 'weight' x 'val'. @@ -485,6 +491,7 @@ def post(buffer: dict, bin_res: pDataFrame): # Check metadata. streamagg_md = store[key]._oups_metadata[KEY_AGGSTREAM] assert streamagg_md[KEY_POST_BUFFER] == {"iter_num": 2} + assert not streamagg_md[KEY_PRE_BUFFER] def test_seed_time_grouper_bin_on_as_tuple(store, seed_path): @@ -523,7 +530,7 @@ def test_seed_time_grouper_bin_on_as_tuple(store, seed_path): ], ) bin_on = "ts_bin" - seed_pdf = pDataFrame({ordered_on: ts, bin_on: ts, "val": range(1, len(ts) + 1)}) + seed_pdf = DataFrame({ordered_on: ts, bin_on: ts, "val": range(1, len(ts) + 1)}) row_group_offsets = [0, 4, 6] fp_write(seed_path, seed_pdf, row_group_offsets=row_group_offsets, file_scheme="hive") seed = ParquetFile(seed_path).iter_row_groups() @@ -540,7 +547,7 @@ def test_seed_time_grouper_bin_on_as_tuple(store, seed_path): assert rec_res.equals(ref_res) # 1st append of new data. ts2 = DatetimeIndex([date + "12:30", date + "13:00", date + "13:30", date + "14:00"]) - seed_pdf2 = pDataFrame( + seed_pdf2 = DataFrame( {ordered_on: ts2, bin_on: ts2, "val": range(len(ts) + 1, len(ts) + len(ts2) + 1)}, ) fp_write(seed_path, seed_pdf2, file_scheme="hive", append=True) @@ -628,7 +635,7 @@ def test_by_callable_wo_bin_on(store, seed_path): ], ) - seed_pdf = pDataFrame({ordered_on: ts, "val": range(1, len(ts) + 1)}) + seed_pdf = DataFrame({ordered_on: ts, "val": range(1, len(ts) + 1)}) # Forcing dtype of 'seed_pdf' to float. seed_pdf["val"] = seed_pdf["val"].astype("float64") fp_write(seed_path, seed_pdf, row_group_offsets=13, file_scheme="hive") @@ -652,6 +659,8 @@ def test_by_callable_wo_bin_on(store, seed_path): # Check metadata. streamagg_md = store[key]._oups_metadata[KEY_AGGSTREAM] assert streamagg_md[KEY_RESTART_INDEX] == ts[-1] + assert not streamagg_md[KEY_PRE_BUFFER] + assert not streamagg_md[KEY_POST_BUFFER] # 1st append of new data. # RG TS VAL ROW BIN LABEL | comments @@ -676,7 +685,7 @@ def test_by_callable_wo_bin_on(store, seed_path): date + "16:40", ], ) - seed_pdf2 = pDataFrame({ordered_on: ts2, "val": range(1, len(ts2) + 1)}) + seed_pdf2 = DataFrame({ordered_on: ts2, "val": range(1, len(ts2) + 1)}) # Forcing dtype of 'seed_pdf' to float. seed_pdf2["val"] = seed_pdf2["val"].astype("float64") seed_pdf2 = pconcat([seed_pdf, seed_pdf2], ignore_index=True) @@ -702,6 +711,8 @@ def test_by_callable_wo_bin_on(store, seed_path): # Check metadata. streamagg_md = store[key]._oups_metadata[KEY_AGGSTREAM] assert streamagg_md[KEY_RESTART_INDEX] == ts2[-1] + assert not streamagg_md[KEY_PRE_BUFFER] + assert not streamagg_md[KEY_POST_BUFFER] def test_by_callable_with_bin_on(store, seed_path): @@ -733,7 +744,7 @@ def test_by_callable_with_bin_on(store, seed_path): # 14:20 18 | 'discard_last' True # # Setup streamed aggregation. - def by_1val(on: pDataFrame, buffer: dict): + def by_1val(on: DataFrame, buffer: dict): """ Start a new bin each time a 1 is spot. @@ -832,7 +843,7 @@ def by_1val(on: pDataFrame, buffer: dict): val[8] = 1 val[12] = 1 val[16] = 1 - seed_pdf = pDataFrame({ordered_on: ts, bin_on: val}) + seed_pdf = DataFrame({ordered_on: ts, bin_on: val}) # Do not change this '13'. Test a restart right on a bin start. row_group_offsets = [0, 13] fp_write(seed_path, seed_pdf, row_group_offsets=row_group_offsets, file_scheme="hive") @@ -902,7 +913,7 @@ def agg_with_same_bin_labels(seed_pdf): ) val = np.arange(1, len(ts2) + 1) val[3] = 1 - seed_pdf = pconcat([seed_pdf, pDataFrame({ordered_on: ts2, bin_on: val})], ignore_index=True) + seed_pdf = pconcat([seed_pdf, DataFrame({ordered_on: ts2, bin_on: val})], ignore_index=True) fp_write(seed_path, seed_pdf, row_group_offsets=13, file_scheme="hive") seed = ParquetFile(seed_path).iter_row_groups() # Setup streamed aggregation. @@ -917,6 +928,8 @@ def agg_with_same_bin_labels(seed_pdf): # Check metadata. streamagg_md = store[key]._oups_metadata[KEY_AGGSTREAM] assert streamagg_md[KEY_RESTART_INDEX] == ts2[-1] + assert not streamagg_md[KEY_PRE_BUFFER] + assert not streamagg_md[KEY_POST_BUFFER] def test_time_grouper_trim_start(store, seed_path): @@ -938,7 +951,7 @@ def test_time_grouper_trim_start(store, seed_path): date = "2020/01/01 " ts = DatetimeIndex([date + "08:00", date + "08:30", date + "09:00", date + "09:30"]) - seed_pdf = pDataFrame({ordered_on: ts, "val": range(1, len(ts) + 1)}) + seed_pdf = DataFrame({ordered_on: ts, "val": range(1, len(ts) + 1)}) fp_write(seed_path, seed_pdf, file_scheme="hive") seed = ParquetFile(seed_path).iter_row_groups() # Streamed aggregation. @@ -954,9 +967,11 @@ def test_time_grouper_trim_start(store, seed_path): # Check 'last_seed_index'. streamagg_md = store[key]._oups_metadata[KEY_AGGSTREAM] assert streamagg_md[KEY_RESTART_INDEX] == ts[-1] + assert not streamagg_md[KEY_PRE_BUFFER] + assert not streamagg_md[KEY_POST_BUFFER] # 1st append. 2nd stremagg with 'trim_start=False'. ts2 = DatetimeIndex([date + "09:00", date + "09:30", date + "10:00", date + "10:30"]) - seed_pdf2 = pDataFrame({ordered_on: ts2, "val": range(1, len(ts) + 1)}) + seed_pdf2 = DataFrame({ordered_on: ts2, "val": range(1, len(ts) + 1)}) fp_write(seed_path, seed_pdf2, file_scheme="hive") seed = ParquetFile(seed_path).iter_row_groups() # Streamed aggregation. @@ -993,7 +1008,7 @@ def test_time_grouper_agg_first(store): # Seed data. date = "2020/01/01 " ts = DatetimeIndex([date + "08:00", date + "08:30", date + "09:00", date + "10:00"]) - seed = pDataFrame({ordered_on: ts, "val": range(1, len(ts) + 1)}) + seed = DataFrame({ordered_on: ts, "val": range(1, len(ts) + 1)}) # Streamed aggregation. as_.agg( seed=seed, @@ -1004,7 +1019,7 @@ def test_time_grouper_agg_first(store): assert rec_res.equals(ref_res) # 1st append, starting a new bin. ts2 = DatetimeIndex([date + "10:20", date + "10:40", date + "11:00", date + "11:30"]) - seed2 = pDataFrame({ordered_on: ts2, "val": range(1, len(ts2) + 1)}) + seed2 = DataFrame({ordered_on: ts2, "val": range(1, len(ts2) + 1)}) seed2 = pconcat([seed, seed2]) # Streamed aggregation. as_.agg( @@ -1035,7 +1050,7 @@ def test_single_row(store): # Test results date = "2020/01/01 " ts = DatetimeIndex([date + "08:00"]) - seed = pDataFrame({ordered_on: ts, "val": range(1, len(ts) + 1)}) + seed = DataFrame({ordered_on: ts, "val": range(1, len(ts) + 1)}) # Streamed aggregation: no aggregation, but no error message. as_.agg( seed=seed, @@ -1108,7 +1123,7 @@ def test_single_row_within_seed(store, seed_path): ], ) val = np.arange(1, len(ts) + 1) - seed_pdf = pDataFrame({ordered_on: ts, "val": val}) + seed_pdf = DataFrame({ordered_on: ts, "val": val}) fp_write(seed_path, seed_pdf, row_group_offsets=[0, 5, 6], file_scheme="hive") seed = ParquetFile(seed_path).iter_row_groups() # Streamed aggregation: no aggregation, but no error message. @@ -1132,7 +1147,7 @@ def test_time_grouper_duplicates_on_wo_bin_on(store): ordered_on = "ts_order" agg = {SUM: ("val", SUM)} - def post(buffer: dict, bin_res: pDataFrame): + def post(buffer: dict, bin_res: DataFrame): """ Remove 'bin_on' column. """ @@ -1161,14 +1176,14 @@ def post(buffer: dict, bin_res: pDataFrame): ts_bin = ts_order + +Timedelta("40T") val = range(1, len(ts_order) + 1) bin_on = "ts_bin" - seed = pDataFrame({ordered_on: ts_order, bin_on: ts_bin, "val": val}) + seed = DataFrame({ordered_on: ts_order, bin_on: ts_bin, "val": val}) # Streamed aggregation. as_.agg( seed=seed, discard_last=True, ) # Test results. - ref_res = pDataFrame({ordered_on: ts_order[:1], SUM: [6]}) + ref_res = DataFrame({ordered_on: ts_order[:1], SUM: [6]}) rec_res = store[key].pdf assert rec_res.equals(ref_res) @@ -1236,7 +1251,7 @@ def test_bin_on_col_sum_agg(store): date + "14:10", # 7 ], ) - seed = pDataFrame({ordered_on: ts, "val": range(1, len(ts) + 1)}) + seed = DataFrame({ordered_on: ts, "val": range(1, len(ts) + 1)}) # Setup streamed aggregation. as_.agg( seed=seed, @@ -1255,6 +1270,8 @@ def test_bin_on_col_sum_agg(store): # Check 'last_seed_index'. streamagg_md = store[key]._oups_metadata[KEY_AGGSTREAM] assert streamagg_md[KEY_RESTART_INDEX] == ts[-1] + assert not streamagg_md[KEY_PRE_BUFFER] + assert not streamagg_md[KEY_POST_BUFFER] # 1st append. # Complete seed_df with new data and continue aggregation. # 'discard_last=False' @@ -1272,7 +1289,7 @@ def test_bin_on_col_sum_agg(store): date + "15:10", # 8 ], ) - seed2 = pDataFrame({ordered_on: ts, "val": [1, 2]}) + seed2 = DataFrame({ordered_on: ts, "val": [1, 2]}) seed = pconcat([seed, seed2]) # Setup streamed aggregation. as_.agg( @@ -1286,6 +1303,8 @@ def test_bin_on_col_sum_agg(store): # Check 'last_seed_index'. streamagg_md = store[key]._oups_metadata[KEY_AGGSTREAM] assert streamagg_md[KEY_RESTART_INDEX] == ts[-1] + assert not streamagg_md[KEY_PRE_BUFFER] + assert not streamagg_md[KEY_POST_BUFFER] def test_time_grouper_agg_first_filters_and_no_filter(store): @@ -1318,14 +1337,14 @@ def test_time_grouper_agg_first_filters_and_no_filter(store): # Seed data. date = "2020/01/01 " ts = DatetimeIndex([date + "08:00", date + "08:30", date + "09:00", date + "10:00"]) - seed = pDataFrame({ordered_on: ts, "val": range(1, len(ts) + 1)}) + seed = DataFrame({ordered_on: ts, "val": range(1, len(ts) + 1)}) # Streamed aggregation. as_.agg( seed=seed, ) # 1st append, starting a new bin. ts2 = DatetimeIndex([date + "10:20", date + "10:40", date + "11:00", date + "11:30"]) - seed2 = pDataFrame({ordered_on: ts2, "val": range(1, len(ts2) + 1)}) + seed2 = DataFrame({ordered_on: ts2, "val": range(1, len(ts2) + 1)}) seed2 = pconcat([seed, seed2]) # Streamed aggregation. as_.agg( @@ -1376,7 +1395,7 @@ def test_different_ordered_on(store): key_ordered_on = "val" seed_ordered_on = "ts" - def post(buffer: dict, bin_res: pDataFrame): + def post(buffer: dict, bin_res: DataFrame): """ Remove some columns before recording. """ @@ -1421,7 +1440,7 @@ def post(buffer: dict, bin_res: pDataFrame): date + "14:10", # 7 ], ) - seed = pDataFrame({seed_ordered_on: ts, key_ordered_on: range(1, len(ts) + 1)}) + seed = DataFrame({seed_ordered_on: ts, key_ordered_on: range(1, len(ts) + 1)}) # Setup streamed aggregation. as_.agg( seed=seed, @@ -1457,7 +1476,7 @@ def test_exception_unordered_seed(store, seed_path): rand_ints.sort() ts = [start + Timedelta(f"{mn}T") for mn in rand_ints] ref_idx = 10 - seed = pDataFrame({ordered_on: ts, "val": rand_ints}) + seed = DataFrame({ordered_on: ts, "val": rand_ints}) # Set a 'NaT' in 'ordered_on' column, 2nd chunk for raising an exception. seed.iloc[ref_idx, seed.columns.get_loc(ordered_on)] = pNaT # Streamed aggregation, raising an exception, but 1st chunk should be @@ -1470,4 +1489,7 @@ def test_exception_unordered_seed(store, seed_path): final_write=True, ) # Check 'restart_index' in results. - assert store[key]._oups_metadata[KEY_AGGSTREAM][KEY_RESTART_INDEX] == ts[ref_idx - 1] + streamagg_md = store[key]._oups_metadata[KEY_AGGSTREAM] + assert streamagg_md[KEY_RESTART_INDEX] == ts[ref_idx - 1] + assert not streamagg_md[KEY_PRE_BUFFER] + assert not streamagg_md[KEY_POST_BUFFER] From 1836b59b0574f1734d3f3034789d1852db76908f Mon Sep 17 00:00:00 2001 From: yohplala Date: Sun, 16 Jun 2024 13:25:43 +0200 Subject: [PATCH 4/4] Formatting. --- .../test_aggstream/test_aggstream_advanced.py | 34 +++++++++---------- tests/test_aggstream/test_aggstream_init.py | 4 +-- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/tests/test_aggstream/test_aggstream_advanced.py b/tests/test_aggstream/test_aggstream_advanced.py index fcc43f1..02b167d 100644 --- a/tests/test_aggstream/test_aggstream_advanced.py +++ b/tests/test_aggstream/test_aggstream_advanced.py @@ -22,7 +22,7 @@ import pytest from fastparquet import ParquetFile from fastparquet import write as fp_write -from pandas import DataFrame as pDataFrame +from pandas import DataFrame from pandas import NaT as pNaT from pandas import Series as pSeries from pandas import Timedelta @@ -117,7 +117,7 @@ def test_3_keys_only_bins(store, seed_path): [1] * (N_third - 2) + [2] * (N_third - 4) + [3] * (N - 2 * N_third) + [4] * 6, ) bin_on = "direct_bin" - seed_df = pDataFrame({ordered_on: ts, "val": rand_ints, bin_on: bin_val}) + seed_df = DataFrame({ordered_on: ts, "val": rand_ints, bin_on: bin_val}) fp_write(seed_path, seed_df, row_group_offsets=max_row_group_size, file_scheme="hive") seed = ParquetFile(seed_path).iter_row_groups() # Streamed aggregation. @@ -169,7 +169,7 @@ def get_ref_results(seed_df): # 1st append of new data. start = seed_df[ordered_on].iloc[-1] ts = [start + Timedelta(f"{mn}T") for mn in rand_ints] - seed_df2 = pDataFrame({ordered_on: ts, "val": rand_ints + 100, bin_on: bin_val + 10}) + seed_df2 = DataFrame({ordered_on: ts, "val": rand_ints + 100, bin_on: bin_val + 10}) fp_write( seed_path, seed_df2, @@ -198,7 +198,7 @@ def get_ref_results(seed_df): # 2nd append of new data. start = seed_df2[ordered_on].iloc[-1] ts = [start + Timedelta(f"{mn}T") for mn in rand_ints] - seed_df3 = pDataFrame({ordered_on: ts, "val": rand_ints + 400, bin_on: bin_val + 40}) + seed_df3 = DataFrame({ordered_on: ts, "val": rand_ints + 400, bin_on: bin_val + 40}) fp_write( seed_path, seed_df3, @@ -252,7 +252,7 @@ def test_exception_different_indexes_at_restart(store, seed_path): rand_ints = rr.integers(100, size=N) rand_ints.sort() ts = [start + Timedelta(f"{mn}T") for mn in rand_ints] - seed_df = pDataFrame({ordered_on: ts, "val": rand_ints}) + seed_df = DataFrame({ordered_on: ts, "val": rand_ints}) fp_write(seed_path, seed_df, row_group_offsets=max_row_group_size, file_scheme="hive") seed = ParquetFile(seed_path).iter_row_groups() # Streamed aggregation for 'key1'. @@ -275,7 +275,7 @@ def test_exception_different_indexes_at_restart(store, seed_path): # Extend seed. start = seed_df[ordered_on].iloc[-1] ts = [start + Timedelta(f"{mn}T") for mn in rand_ints] - seed_df2 = pDataFrame({ordered_on: ts, "val": rand_ints + 100}) + seed_df2 = DataFrame({ordered_on: ts, "val": rand_ints + 100}) fp_write( seed_path, seed_df2, @@ -360,7 +360,7 @@ def check(on, buffer): # Seed data. filter_val = np.ones(len(ts), dtype=bool) filter_val[::2] = False - seed = pDataFrame({ordered_on: ts, "val": rand_ints, filter_on: filter_val}) + seed = DataFrame({ordered_on: ts, "val": rand_ints, filter_on: filter_val}) # Streamed aggregation, raising an exception, but 1st chunk should be # written. with pytest.raises(SeedPreException, match="^not possible to have"): @@ -408,7 +408,7 @@ def check(on, buffer): assert streamagg_md_key2[KEY_PRE_BUFFER] == pre_buffer_ref -def post(buffer: dict, bin_res: pDataFrame, snap_res: pDataFrame): +def post(buffer: dict, bin_res: DataFrame, snap_res: DataFrame): """ Aggregate previous and current bin aggregation results. @@ -520,7 +520,7 @@ def post(buffer: dict, bin_res: pDataFrame, snap_res: pDataFrame): return merged_res.dropna(subset=FIRST, ignore_index=True) -def reference_results(seed: pDataFrame, key_conf: dict): +def reference_results(seed: DataFrame, key_conf: dict): """ Get reference results from cumsegagg and post for 2 next test cases. """ @@ -600,7 +600,7 @@ def test_3_keys_bins_snaps_filters(store, seed_path): ts = [start + Timedelta(f"{mn}T") for mn in rand_ints] filter_val = np.ones(len(ts), dtype=bool) filter_val[::2] = False - seed_df = pDataFrame({ordered_on: ts, val: rand_ints, filter_on: filter_val}) + seed_df = DataFrame({ordered_on: ts, val: rand_ints, filter_on: filter_val}) # # filter = 'True' # ts val filt row 2T_agg_res @@ -728,7 +728,7 @@ def test_3_keys_bins_snaps_filters(store, seed_path): [117, -1, 117, -1], ], ) - key3_res_ref = pDataFrame(key3_data, index=snap_ts).reset_index() + key3_res_ref = DataFrame(key3_data, index=snap_ts).reset_index() key3_res_ref = key3_res_ref.rename( columns={ "index": ordered_on, @@ -762,7 +762,7 @@ def test_3_keys_bins_snaps_filters(store, seed_path): # - one at same timestamp than last one. # - one at a new timestamp. This one will not be considered because when # not writing final results, last row in agg res is set aside. - seed_df = pDataFrame( + seed_df = DataFrame( { ordered_on: [ts[-1], ts[-1] + Timedelta(snap_duration)], val: [rand_ints[-1] + 1, rand_ints[-1] + 10], @@ -819,7 +819,7 @@ def test_3_keys_bins_snaps_filters(store, seed_path): # --------------# # Last data appending considering a single row in seed with same timestamp # and 'final_write' as last concatenation check with snapshots. - seed_df = pDataFrame( + seed_df = DataFrame( { ordered_on: [seed_df.loc[:, ordered_on].iloc[-1]], val: [rand_ints[-1] + 50], @@ -853,7 +853,7 @@ def test_3_keys_bins_snaps_filters(store, seed_path): # Data stream 5 # # --------------# # Empty snapshots are generated between 2 row groups in key2. - seed_df = pDataFrame( + seed_df = DataFrame( { ordered_on: [ seed_df.loc[:, ordered_on].iloc[-1], @@ -895,7 +895,7 @@ def test_3_keys_bins_snaps_filters(store, seed_path): # Data stream 6 # # --------------# # Several seed chunks where neither bins, nor snaps end. - seed_df = pDataFrame( + seed_df = DataFrame( { ordered_on: [ seed_df.loc[:, ordered_on].iloc[-1], @@ -944,7 +944,7 @@ def test_3_keys_bins_snaps_filters(store, seed_path): rand_ints = np.array([1, 3, 4, 7, 10, 14, 14, 14, 16, 17]) start = Timestamp("2020/01/01 03:00:00") ts = [start] + [start + Timedelta(f"{mn}T") for mn in rand_ints[1:]] - seed = pDataFrame({ordered_on: ts, val: rand_ints, filter_on: [True] * len(ts)}) + seed = DataFrame({ordered_on: ts, val: rand_ints, filter_on: [True] * len(ts)}) seed = [seed.iloc[:4], seed.iloc[4:]] as_.agg( seed=seed, @@ -1038,7 +1038,7 @@ def test_3_keys_bins_snaps_filters_restart(store, seed_path): ts = [start + Timedelta(f"{mn}T") for mn in rand_ints] filter_val = np.ones(len(ts), dtype=bool) filter_val[::2] = False - seed_df = pDataFrame({ordered_on: ts, val: rand_ints, filter_on: filter_val}) + seed_df = DataFrame({ordered_on: ts, val: rand_ints, filter_on: filter_val}) seed_list = [seed_df.iloc[:17], seed_df.iloc[17:31]] as1.agg( seed=seed_list, diff --git a/tests/test_aggstream/test_aggstream_init.py b/tests/test_aggstream/test_aggstream_init.py index 2ab28a2..5e377cb 100644 --- a/tests/test_aggstream/test_aggstream_init.py +++ b/tests/test_aggstream/test_aggstream_init.py @@ -18,7 +18,7 @@ from os import path as os_path import pytest -from pandas import DataFrame as pDataFrame +from pandas import DataFrame from pandas import DatetimeIndex from pandas.core.resample import TimeGrouper @@ -415,7 +415,7 @@ def test_exception_not_key_of_streamagg_results(store): ts = DatetimeIndex([date + "08:00", date + "08:30"]) ordered_on = "ts_order" val = range(1, len(ts) + 1) - seed_pdf = pDataFrame({ordered_on: ts, "val": val}) + seed_pdf = DataFrame({ordered_on: ts, "val": val}) store[key] = seed_pdf # Setup aggregation. bin_by = TimeGrouper(key=ordered_on, freq="1H", closed="left", label="left")