Skip to content

Commit

Permalink
New iter_pf_df logic, seemingly much simpler. Test adjustments on-going.
Browse files Browse the repository at this point in the history
  • Loading branch information
yohplala committed Nov 27, 2024
1 parent 102b49a commit ea49098
Show file tree
Hide file tree
Showing 5 changed files with 583 additions and 658 deletions.
3 changes: 1 addition & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,13 @@ repos:
oups/store/__init__.py:F401
oups/store/writer.py:C901 S403 S301
oups/store/iter_data.py:C901
oups/store/ordered_merge_info.py:C901
oups/aggstream/__init__.py:F401
oups/aggstream/jcumsegagg.py:C901
oups/aggstream/segmentby.py:C901
oups/aggstream/cumsegagg.py:C901 E203
oups/aggstream/aggstream.py:C901
tests/*.py:D103
tests/test_store/test_ordered_merge_info.py:F401 F811
tests/test_store/test_iter_data.py:F401 F811
]

# Pydocstyle
Expand Down
171 changes: 87 additions & 84 deletions oups/store/iter_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from pandas import DataFrame
from pandas import concat

from oups.store.ordered_merge_info import OrderedMergeInfo
from oups.store.ordered_merge_info import analyze_chunks_to_merge


def _validate_duplicate_on_param(
Expand Down Expand Up @@ -112,7 +112,7 @@ def _get_next_chunk(

def _iter_df(
ordered_on: str,
max_row_group_size: int,
row_group_size_target: int,
df: Union[DataFrame, List[DataFrame]],
distinct_bounds: bool = False,
duplicates_on: Optional[Union[str, List[str]]] = None,
Expand All @@ -125,7 +125,7 @@ def _iter_df(
----------
ordered_on : str
Column name by which data is ordered. Data must be in ascending order.
max_row_group_size : int
row_group_size_target : int
Maximum number of rows per row group.
df : Union[DataFrame, List[DataFrame]]
Pandas DataFrame to split. If a list, they are merged and sorted back by
Expand All @@ -137,20 +137,20 @@ def _iter_df(
removed keeping last occurrence.
yield_remainder : bool, default False
If True, yields the last chunk of data even if it is smaller than
'max_row_group_size'.
'row_group_size_target'.
Yields
------
DataFrame
Chunks of the DataFrame, each with size <= max_row_group_size, except
Chunks of the DataFrame, each with size <= row_group_size_target, except
if distinct_bounds is True and there are more duplicates in the
'ordered_on' column than max_row_group_size.
'ordered_on' column than row_group_size_target.
Returns
-------
Optional[DataFrame]
Remaining data if yield_remainder is False and final chunk is smaller
than max_row_group_size.
than row_group_size_target.
"""
if isinstance(df, list):
Expand All @@ -161,11 +161,11 @@ def _iter_df(

start_idx = 0
df_n_rows = len(df)
while df_n_rows - start_idx >= max_row_group_size:
while df_n_rows - start_idx >= row_group_size_target:
chunk, next_idx = _get_next_chunk(
df=df,
start_idx=start_idx,
size=max_row_group_size,
size=row_group_size_target,
distinct_bounds=distinct_bounds,
ordered_on=ordered_on,
)
Expand All @@ -185,7 +185,7 @@ def iter_merged_pf_df(
df: DataFrame,
pf: ParquetFile,
ordered_on: str,
max_row_group_size: int,
row_group_size_target: int,
distinct_bounds: Optional[bool] = False,
duplicates_on: Optional[Union[str, Iterable[str]]] = None,
):
Expand All @@ -196,7 +196,7 @@ def iter_merged_pf_df(
----------
ordered_on : str
Column name by which data is ordered. Data must be in ascending order.
max_row_group_size : int
row_group_size_target : int
Max number of rows per chunk.
df : DataFrame
In-memory pandas DataFrame to process. Must be ordered by 'ordered_on'
Expand All @@ -212,7 +212,7 @@ def iter_merged_pf_df(
Yields
------
DataFrame
Chunks of merged and ordered data, each with size <= max_row_group_size.
Chunks of merged and ordered data, each with size <= row_group_size_target.
Raises
------
Expand Down Expand Up @@ -247,85 +247,88 @@ def iter_merged_pf_df(
columns=list(df.columns),
)

# TODO:
# correct here!!
max_n_irgs = None

# Identify overlapping row groups.
# If intent is to drop duplicates, 'analyze_chunks_to_merge' has to be
# applied on a DataFrame without duplicates, so that returned indices stay
# consistent (versus a scenario duplicates are dropped afterwards).
if duplicates_on:
df.drop_duplicates(duplicates_on, keep="last", ignore_index=True, inplace=True)

# Identify overlapping row groups
overlap_info = OrderedMergeInfo.analyze(
chunk_counter, sort_rgs_after_write = analyze_chunks_to_merge(
df=df,
pf=pf,
ordered_on=ordered_on,
max_row_group_size=max_row_group_size,
row_group_size_target=row_group_size_target,
drop_duplicates=duplicates_on is not None,
max_n_irgs=max_n_irgs,
)

# Handle data in 'df' before the loop over row groups in 'pf'.
has_pf = 0
has_df = 0
rg_idx_start = rg_idx = 0
df_idx_start = _df_idx_start = 0
if isinstance(row_group_size_target, str):
next_period_start = df.loc[:, ordered_on].iloc[0].ceil(row_group_size_target)
is_mixed_chunk = False
chunk_n_rows = 0
remainder = None
df_idx_start = 0
if overlap_info.has_df_head:
# Case there is sufficient data in the pandas DataFrame to start a new
# row group.
# If 'duplicates_on' is provided, duplicates have been removed already,
# so no need to remove them again.
remainder = yield from _iter_df(
df=df.iloc[: overlap_info.df_idx_overlap_start],
ordered_on=ordered_on,
max_row_group_size=max_row_group_size,
distinct_bounds=distinct_bounds,
duplicates_on=None,
yield_remainder=False,
)
# Correct then 'df_idx_start' to account for the dataframe head already
# yielded.
df_idx_start = overlap_info.df_idx_overlap_start if overlap_info.has_overlap else len(df)

# Merge possibly overlapping data (full loop over 'pf' row groups).
rg_idx_start = 0 # HERE: overlap_info.rg_idx_overlap_start
buffer_num_rows = 0 if remainder is None else len(remainder)
for rg_idx_1, (_df_idx_start, df_idx_end) in enumerate(
zip(overlap_info.df_idx_rg_starts, overlap_info.df_idx_rg_ends_excl),
start=1,
):
n_data_rows = df_idx_end - _df_idx_start
# HERE: overlap_info.rg_n_rows[rg_idx_1 - 1] + n_data_rows
buffer_num_rows += pf.row_groups[rg_idx_1 - 1].num_rows + n_data_rows
# HERE: ...or rg_idx_1 == overlap_info.n_rgs_overlap
if buffer_num_rows >= max_row_group_size or rg_idx_1 == len(pf):
chunk = pf[rg_idx_start:rg_idx_1].to_pandas()
if df_idx_start != df_idx_end:
# Merge with pandas DataFrame chunk.
# DataFrame chunk is added last in concat, to preserve
# its values in case of duplicates found with values in
# ParquetFile chunk.
chunk = [remainder, chunk, df.iloc[df_idx_start:df_idx_end]]
elif remainder is not None:
chunk = [remainder, chunk]
if buffer_num_rows >= max_row_group_size:
remainder = yield from _iter_df(
df=chunk,
ordered_on=ordered_on,
max_row_group_size=max_row_group_size,
distinct_bounds=distinct_bounds,
duplicates_on=duplicates_on,
yield_remainder=False,

for chunk_idx, df_idx_end_excl in enumerate(chunk_counter, start=-len(chunk_counter)):
# 'chunk_idx' will be 0 when loop is over.
if df_idx_end_excl > _df_idx_start:
# Possible DataFrame contribution to chunk.
chunk_n_rows += df_idx_end_excl - 1 - _df_idx_start
has_df += 1
if is_mixed_chunk:
# Add ParquetFile contribution to chunk.
chunk_n_rows += pf[rg_idx].num_rows
has_pf += 1
rg_idx += 1

_df_idx_start = df_idx_end_excl
is_mixed_chunk = not is_mixed_chunk

if (
chunk_n_rows >= row_group_size_target
if isinstance(row_group_size_target, int)
else (_chunk_last_ts := df.loc[:, ordered_on].iloc[df_idx_end_excl - 1])
>= next_period_start
) or not chunk_idx:
if not has_pf:
chunk = (
[remainder, df.iloc[df_idx_start:df_idx_end_excl]]
if remainder is not None
else df.iloc[df_idx_start:df_idx_end_excl]
)
elif not has_df:
chunk = (
[remainder, pf[rg_idx_start:rg_idx].to_pandas()]
if remainder is not None
else pf[rg_idx_start:rg_idx].to_pandas()
)
df_idx_start = df_idx_end
rg_idx_start = rg_idx_1
buffer_num_rows = 0 if remainder is None else len(remainder)
else:
remainder = chunk

# Handle data after the last overlaps.
if overlap_info.has_df_tail:
# Case there is remaining data in pandas DataFrame.
df_idx_overlap_end_excl = overlap_info.df_idx_overlap_end_excl
yield from _iter_df(
df=[remainder, df.iloc[df_idx_overlap_end_excl:]],
ordered_on=ordered_on,
max_row_group_size=max_row_group_size,
distinct_bounds=distinct_bounds,
duplicates_on=None if remainder is None else duplicates_on,
yield_remainder=True,
)
elif remainder is not None:
# Case there only is a remainder from previous iterations.
yield remainder
# Both pandas DataFrame and ParquetFile chunks.
# If 'remainder' is None, it does not raise trouble for concat
# step.
chunk = [
remainder,
pf[rg_idx_start:rg_idx].to_pandas(),
df.iloc[df_idx_start:df_idx_end_excl],
]
remainder = yield from _iter_df(
df=chunk,
ordered_on=ordered_on,
row_group_size_target=row_group_size_target,
distinct_bounds=distinct_bounds,
duplicates_on=duplicates_on,
yield_remainder=not chunk_idx,
)
df_idx_start = df_idx_end_excl
if isinstance(row_group_size_target, str):
next_period_start = _chunk_last_ts.ceil(row_group_size_target)
rg_idx_start = rg_idx
chunk_n_rows = 0 if remainder is None else len(remainder)
has_pf = has_df = 0
Loading

0 comments on commit ea49098

Please sign in to comment.