Skip to content

Commit

Permalink
Set of 'TODO' added when they are still approximately clear in mind...
Browse files Browse the repository at this point in the history
  • Loading branch information
yohplala committed May 1, 2024
1 parent 5360c49 commit b9df2d3
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 6 deletions.
12 changes: 12 additions & 0 deletions oups/aggstream/aggstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -1323,6 +1323,18 @@ def agg(
"""
# TODO: add 'snap_by' parameter to 'agg()' to allow using list of
# timestamps. 'cumsegagg()' is already compatible.
# TODO: add a writing step once aggregation on a seed chunk is done
# (keeping track of '_last_seed_index': as soon as it changes from
# one iteration to the next, trigger the intermediate writing step)
# Aggregation results to keep are listed through an additional
# 'group_res' parameter, in the form:
# {key_to_write_grouped_res_to: [key1, key2, key3],
# ...}
# Motivation is to be able to gather results for different filters and
# 'bin_by' value, and post-process them in a single 'post' function and
# write results in a single file.
# This particularly make sense if there is a single 'snap_by' value, as
# snapshots results will be easily merged.
if isinstance(seed, pDataFrame):
# Make the seed an iterable.
seed = [seed]
Expand Down
11 changes: 8 additions & 3 deletions oups/aggstream/cumsegagg.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,9 +379,14 @@ def cumsegagg(
# without snapshot in 1st iteration, an empty 'snap_res' gets returned
# nonetheless and that concatenation can be managed with subsequent
# 'snap_res' from next iterations.
# TODO: if requesting snapshots, bin aggregation results are not necessary.
# Consider just outputting label of bins in snapshot results, without
# agrgegation results for bins? (memory savings).
# TODO: make possible to pass several 'bin_by' (with same 'snap_by'):
# - in segmentby, has to produce an array providing for each 'bin_by' when
# the next bin start. Modify the existing 'mergesort' function for this.
# - in jcumsegagg, change logic to have aggregation function without
# 'preserve_res'. Instead, create companion function that can be called
# when storing results, achieving reconciliation of existing results
# with new results (if not a new bin), or restarting the aggregation
# if new results.
len_data = len(data)
if not len_data:
# 'data' is empty. Simply return.
Expand Down
12 changes: 10 additions & 2 deletions oups/aggstream/jcumsegagg.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,8 +320,6 @@ def jcsagg(
# TODO: when creation 'null_bin_indices' and 'null_snap_indices', only
# trim the trailing '-1' if there are less null indices than their initial
# size.
# TODO: integrate in 'jcsagg()' a loopover the dtypes, with all input
# arrays for a dtype at same positions in different input tuples.
bin_start = -1 if preserve_res else 0
chunk_start = 0
bin_res_idx = snap_res_idx = 0
Expand Down Expand Up @@ -383,6 +381,16 @@ def jcsagg(
# If no data in current chunk, 'chunk_res' is naturally forwarded
# to next iteration, no need to update it.
if len(chunk) != 0:
# TODO: integrate in 'jcsagg()' a loopover the dtypes, with all
# input arrays and 'chunk_res' for a dtype at same positions in
# different input tuples. Motivation is that chunks are the
# same size whatever the dtype of seed data. It would prevent
# restarting 'jcsagg' for different dtypes.
# TODO: is usage of a tuple and 'literal_unroll' really
# necessary? 'aggs' is always a tuple of 3 components here.
# Create a parameter in 'jcsagg' for each component, and then
# use an index to iterate through the iterable inputs with the
# index.
# for agg in aggs:
for agg in literal_unroll(aggs):
agg_func, cols_data, cols_res = agg
Expand Down
3 changes: 2 additions & 1 deletion oups/store/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ def iter_dataframe(
have to share the same value in ``sharp_on`` column.
"""
# TODO: implement 'replicate_groups' (use of 'ordered_on' column).
# TODO: implement 'group_as' (use of 'ordered_on' column) to replicate
# row group 'boundaries' between 2 different data sets.
if max_row_group_size is None:
max_row_group_size = MAX_ROW_GROUP_SIZE
if isinstance(data, vDataFrame):
Expand Down

0 comments on commit b9df2d3

Please sign in to comment.