Skip to content

Commit

Permalink
Merge pull request #18 from yohplala/only_metadata
Browse files Browse the repository at this point in the history
aggstream accepts 'post()' user-defined function that has a warm-up period. It is now able to record 'post_buffer' in metadata even if without actual results / result files yet.
  • Loading branch information
yohplala authored Jul 26, 2024
2 parents ebf7920 + b18b33b commit 010bed4
Show file tree
Hide file tree
Showing 3 changed files with 210 additions and 80 deletions.
147 changes: 85 additions & 62 deletions oups/aggstream/aggstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from joblib import Parallel
from joblib import delayed
from numpy import ones
from pandas import DataFrame as pDataFrame
from pandas import DataFrame
from pandas import DatetimeIndex
from pandas import Series
from pandas import Timestamp as pTimestamp
Expand Down Expand Up @@ -342,11 +342,11 @@ def _reset_agg_buffers(agg_buffers: Optional[dict] = None) -> Optional[dict]:
- n_rows : int, number of rows in main aggregation results (snapshots
is snapshots are quested, or bins otherwise). It is reset here after
writing.
- agg_res : pDataFrame, last aggregation results, to reset to None
- agg_res : DataFrame, last aggregation results, to reset to None
after writing.
- bin_res : pDataFrame, last aggregation results, to reset to None
- bin_res : DataFrame, last aggregation results, to reset to None
after writing.
- agg_res_buffer : List[pDataFrame], list of chunks resulting from
- agg_res_buffer : List[DataFrame], list of chunks resulting from
aggregation (pandas DataFrame), either from bins if only bins
requested, or from snapshots if bins and snapshots requested.
- bin_res_buffer : List[pandas.DataFrame], list of bins resulting from
Expand Down Expand Up @@ -396,15 +396,15 @@ def __init__(self, message: str = None):


def _iter_data(
seed: Iterable[pDataFrame],
seed: Iterable[DataFrame],
ordered_on: str,
restart_index: Union[int, float, pTimestamp, None],
pre: Union[Callable, None],
pre_buffer: dict,
filters: Union[dict, None],
trim_start: bool,
discard_last: bool,
) -> Tuple[str, pDataFrame]:
) -> Tuple[str, DataFrame]:
"""
Iterate provided seed, applying sequentially (optionally) filters.
Expand All @@ -413,7 +413,7 @@ def _iter_data(
Parameters
----------
seed : Iterable[pDataFrame]
seed : Iterable[DataFrame]
Iterable of pandas Dataframe.
ordered_on : str
Name of column with respect to which seed data is in ascending
Expand Down Expand Up @@ -465,7 +465,7 @@ def _iter_data(
chunk.
- 'filter_id', str, indicating which set of filters has been
applied for the seed chunk provided.
- 'filtered_chunk', pDataFrame, from the seed Iterable, with
- 'filtered_chunk', DataFrame, from the seed Iterable, with
optionally filters applied.
Notes
Expand Down Expand Up @@ -553,8 +553,8 @@ def _iter_data(


def _concat_agg_res(
agg_res_buffers: List[pDataFrame],
agg_res: pDataFrame,
agg_res_buffers: List[DataFrame],
agg_res: DataFrame,
append_last_res: bool,
index_name: str,
):
Expand All @@ -563,9 +563,9 @@ def _concat_agg_res(
Parameters
----------
agg_res_buffers : List[pDataFrame]
agg_res_buffers : List[DataFrame]
List of aggregation results to concatenate.
agg_res : pDataFrame
agg_res : DataFrame
Last aggregation results (all rows from last iteration).
append_last_res : bool
If 'agg_res' should be appended to 'agg_res_buffer' and if 'bin_res'
Expand All @@ -576,7 +576,7 @@ def _concat_agg_res(
Returns
-------
pDataFrame
DataFrame
List of aggregation results concatenated in a single DataFrame.
"""
Expand Down Expand Up @@ -617,11 +617,11 @@ def _post_n_write_agg_chunks(
- n_rows : int, number of rows in main aggregation results (snapshots
is snapshots are quested, or bins otherwise). It is reset here after
writing.
- agg_res : pDataFrame, last aggregation results, to reset to None
- agg_res : DataFrame, last aggregation results, to reset to None
after writing.
- bin_res : pDataFrame, last aggregation results, to reset to None
- bin_res : DataFrame, last aggregation results, to reset to None
after writing.
- agg_res_buffer : List[pDataFrame], list of chunks resulting from
- agg_res_buffer : List[DataFrame], list of chunks resulting from
aggregation (pandas DataFrame), either from bins if only bins
requested, or from snapshots if bins and snapshots requested.
It contains 'agg_res' (last aggregation results),but without last
Expand Down Expand Up @@ -685,41 +685,49 @@ def _post_n_write_agg_chunks(
proceeding with preprocessing of individual seed chunk.
"""
if (agg_res := agg_buffers[KEY_AGG_RES]) is None:
# No iteration has been achieved, as no data.
if last_seed_index:
# If 'last_seed_index', at least set it in oups metadata.
# It is possible new seed data has been streamed and taken into
# account, but used for this key, because having been filtered out.
# Also set 'pre_buffer' if not null.
OUPS_METADATA[key] = {
KEY_AGGSTREAM: {KEY_RESTART_INDEX: last_seed_index, KEY_PRE_BUFFER: pre_buffer},
}
write_metadata(pf=store[key].pf, md_key=key)
return
# Concat list of aggregation results.
agg_res = _concat_agg_res(agg_buffers[KEY_AGG_RES_BUFFER], agg_res, append_last_res, index_name)
# Same if needed with 'bin_res_buffer'
bin_res = agg_buffers[KEY_BIN_RES]
if bin_res is not None:
bin_res = _concat_agg_res(
agg_buffers[KEY_BIN_RES_BUFFER],
bin_res,
post_buffer = agg_buffers[KEY_POST_BUFFER]
# When there is no result, 'agg_res' is None.
if isinstance((agg_res := agg_buffers[KEY_AGG_RES]), DataFrame):
# To keep track there has been res in the 1st place.
not_null_res = True
# Concat list of aggregation results.
agg_res = _concat_agg_res(
agg_buffers[KEY_AGG_RES_BUFFER],
agg_res,
append_last_res,
index_name,
)
post_buffer = agg_buffers[KEY_POST_BUFFER]
if post:
# Post processing if any.
# 'post_buffer' has to be modified in-place.
agg_res = (
post(buffer=post_buffer, bin_res=agg_res)
if bin_res is None
else post(buffer=post_buffer, bin_res=bin_res, snap_res=agg_res)
)
# Same if needed with 'bin_res_buffer'
bin_res = agg_buffers[KEY_BIN_RES]
if bin_res is not None:
bin_res = _concat_agg_res(
agg_buffers[KEY_BIN_RES_BUFFER],
bin_res,
append_last_res,
index_name,
)
if post:
# Post processing if any.
# 'post_buffer' has to be modified in-place.
# It is possible 'agg_res' is None, if 'post' needs a minimal
# number of rows before outputting results (warm-up).
agg_res = (
post(buffer=post_buffer, bin_res=agg_res)
if bin_res is None
else post(buffer=post_buffer, bin_res=bin_res, snap_res=agg_res)
)
else:
not_null_res = False
if last_seed_index:
# If 'last_seed_index', set oups metadata.
# Also set 'pre_buffer' if not null.
# It is possible there is no result yet to write for different reasons:
# - new seed data has been streamed and needs to be taken into account,
# but there is no result for this key, because all related seed data
# has been filtered out.
# - or maybe 'post' has a wamr up period and has not released results
# yet.
# But 'last_seed_index' has to be recorded, and so do possibly
# 'pre_buffer', 'segagg_buffer' and 'post_buffer'.
OUPS_METADATA[key] = {
KEY_AGGSTREAM: {
KEY_RESTART_INDEX: last_seed_index,
Expand All @@ -728,14 +736,29 @@ def _post_n_write_agg_chunks(
KEY_POST_BUFFER: post_buffer,
},
}
# Record data.
store[key] = write_config, agg_res
# Reset aggregation buffers and counters.
_reset_agg_buffers(agg_buffers)
# When there is no result, 'agg_res' is None.
if isinstance(agg_res, DataFrame):
# Record data (with metadata possibly updated).
store[key] = write_config, agg_res
elif last_seed_index:
# If no result, metadata is possibly to be written, as this is the
# flag indicating the last 'aggstream' local iteration.
try:
write_metadata(pf=store[key].pf, md_key=key)
except FileNotFoundError:
# In case no Parquet file exist yet, need to initiate one to start
# storing metadata.
store[key] = DataFrame()
if not_null_res:
# If there have been results, they have been processed (either written
# directly or through 'post()'). Time to reset aggregation buffers and
# counters.
_reset_agg_buffers(agg_buffers)
return


def agg_iter(
seed_chunk: pDataFrame,
seed_chunk: DataFrame,
store: ParquetSet,
key: dataclass,
keys_config: dict,
Expand All @@ -747,7 +770,7 @@ def agg_iter(
Parameters
----------
seed_chunk : pDataFrame
seed_chunk : DataFrame
Chunk of seed data.
store : ParquetSet
ParquetSet to which recording aggregation results.
Expand Down Expand Up @@ -776,7 +799,7 @@ def agg_iter(
agg_res_buffer = agg_buffers[KEY_AGG_RES_BUFFER]
if agg_res_len > 1:
# Add 'agg_res' to 'agg_res_buffer' ignoring last row.
# It is incimplete, so useless to write it to results while
# It is incomplete, so useless to write it to results while
# aggregation iterations are on-going.
agg_res_buffer.append(agg_res.iloc[:-1])
# Remove last row that is not recorded from total number of rows.
Expand Down Expand Up @@ -863,14 +886,14 @@ class AggStream:
intermediate results.
``{key: {'agg_n_rows' : int, number of rows in aggregation results,
for bins (if snapshots not requested) or snapshots.
'agg_res' : None or pDataFrame, last aggregation results,
'agg_res' : None or DataFrame, last aggregation results,
for bins (if snapshots not requested) or snapshots,
'bin_res' : None or pDataFrame, last aggregation results,
'bin_res' : None or DataFrame, last aggregation results,
for bins (if snapshots requested),
'agg_res_buffer' : list of pDataFrame, buffer to keep
'agg_res_buffer' : list of DataFrame, buffer to keep
aggregagation results, bins (if snapshots not
requested) or snapshots,
'bin_res_buffer' : list of pDataFrame, buffer to keep bin
'bin_res_buffer' : list of DataFrame, buffer to keep bin
aggregagation results (if snapshots requested)
'segagg_buffer' : dict, possibly empty, keeping track of
segmentation and aggregation intermediate
Expand Down Expand Up @@ -1238,7 +1261,7 @@ def __init__(
# Store attribute.
self.store = store

def _init_agg_cs(self, seed: Iterable[pDataFrame]):
def _init_agg_cs(self, seed: Iterable[DataFrame]):
"""
Initialize ``self.agg_cs``.
Expand All @@ -1248,7 +1271,7 @@ def _init_agg_cs(self, seed: Iterable[pDataFrame]):
Parameters
----------
seed : Iterable[pDataFrame]
seed : Iterable[DataFrame]
Seed data, from which getting pandas DataFrame dtypes.
Returns
Expand All @@ -1271,7 +1294,7 @@ def _init_agg_cs(self, seed: Iterable[pDataFrame]):

def agg(
self,
seed: Union[pDataFrame, Iterable[pDataFrame]] = None,
seed: Union[DataFrame, Iterable[DataFrame]] = None,
trim_start: Optional[bool] = True,
discard_last: Optional[bool] = True,
final_write: Optional[bool] = True,
Expand All @@ -1285,7 +1308,7 @@ def agg(
Parameters
----------
seed : Union[pDataFrame, Iterable[pDataFrame]]
seed : Union[DataFrame, Iterable[DataFrame]]
Seed data over which conducting streamed aggregations.
trim_start : bool, default True
If ``True``, and if aggregated results already exist, then
Expand Down Expand Up @@ -1352,7 +1375,7 @@ def agg(
# snapshots results will be easily merged.
# TODO: change default settings:
# discard_last = trim_start = final_write = False
if isinstance(seed, pDataFrame):
if isinstance(seed, DataFrame):
# Make the seed an iterable.
seed = [seed]
# Seed can be an empty list or None.
Expand Down
16 changes: 10 additions & 6 deletions oups/store/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,16 @@ def iter_dataframe(
# Case 'duplicates_on' is a single column name, but not
# 'sharp_on'.
duplicates_on = [duplicates_on, sharp_on]
# Define bins to split into row groups.
# Acknowledging this piece of code to be an extract from fastparquet.
n_rows = len(data)
n_parts = (n_rows - 1) // max_row_group_size + 1
row_group_size = min((n_rows - 1) // n_parts + 1, n_rows)
starts = list(range(0, n_rows, row_group_size))
if n_rows:
# Define bins to split into row groups.
# Acknowledging this piece of code to be an extract from fastparquet.
n_parts = (n_rows - 1) // max_row_group_size + 1
row_group_size = min((n_rows - 1) // n_parts + 1, n_rows)
starts = list(range(0, n_rows, row_group_size))
else:
# If n_rows=0
starts = [0]
if sharp_on:
# Adjust bins so that they do not end in the middle of duplicate values
# in `sharp_on` column.
Expand Down Expand Up @@ -424,7 +428,7 @@ def write(
all_cols = data.get_column_names()
if ordered_on not in all_cols:
raise ValueError(f"column '{ordered_on}' does not exist in input data.")
if os_path.isdir(dirpath) and os_listdir(dirpath):
if os_path.isdir(dirpath) and any(file.endswith(".parquet") for file in os_listdir(dirpath)):
# Case updating an existing dataset.
# Identify overlaps in row groups between new data and recorded data.
# Recorded row group start and end indexes.
Expand Down
Loading

0 comments on commit 010bed4

Please sign in to comment.