Skip to content

Commit

Permalink
Test case development for iter_data module.
Browse files Browse the repository at this point in the history
  • Loading branch information
yohplala committed Nov 10, 2024
1 parent 40cad22 commit 7429172
Show file tree
Hide file tree
Showing 2 changed files with 326 additions and 31 deletions.
72 changes: 41 additions & 31 deletions oups/store/iter_dataframe.py → oups/store/iter_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def _get_next_chunk(
val_at_end = df[ordered_on].iloc[end_idx]
# Find the leftmost index to not split duplicates.
end_idx = searchsorted(df[ordered_on].to_numpy(), val_at_end)
if end_idx == 0:
if end_idx == start_idx:
# All values in chunk are duplicates.
# Return chunk of data that will be larger than 'size', but complies
# with distinct bounds.
Expand All @@ -111,10 +111,10 @@ def _iter_pandas_dataframe(
df: DataFrame,
max_row_group_size: int,
ordered_on: str,
starter: Optional[DataFrame] = None,
start_df: Optional[DataFrame] = None,
distinct_bounds: bool = False,
duplicates_on: Optional[Union[str, List[str]]] = None,
yield_last: bool = False,
yield_remainder: bool = False,
) -> Iterable[DataFrame]:
"""
Split pandas DataFrame into row groups.
Expand All @@ -126,14 +126,15 @@ def _iter_pandas_dataframe(
max_row_group_size : int
Maximum number of rows per row group.
ordered_on : str
Column name by which data is ordered. Data must be in ascending order. # Added ascending order requirement
starter : Optional[DataFrame]
Data to start the iteration with. Must be ordered by the same column. # Added ordering requirement
Column name by which data is ordered. Data must be in ascending order.
start_df : Optional[DataFrame]
Data to start the iteration with. Must be ordered by the same column.
distinct_bounds : bool, default False
If True, ensures that row group boundaries do not split duplicate rows.
duplicates_on : Optional[Union[str, List[str]]], default None
Column(s) to check for duplicates. If provided, duplicates will be removed keeping last occurrence.
yield_last : bool, default False
Column(s) to check for duplicates. If provided, duplicates will be
removed keeping last occurrence.
yield_remainder : bool, default False
If True, yields the last chunk of data even if it is smaller than
'max_row_group_size'.
Expand All @@ -145,33 +146,33 @@ def _iter_pandas_dataframe(
Returns
-------
Optional[DataFrame]
Remaining data if yield_last is False and final chunk is smaller than max_row_group_size.
Remaining data if yield_remainder is False and final chunk is smaller
than max_row_group_size.
"""
if starter is None:
start_idx = 0
else:
start_idx = len(starter)
df = concat([starter, df])
del starter
while start_idx < len(df) and len(df) - start_idx > max_row_group_size:
start_idx = 0
if start_df is not None:
df = concat([start_df, df])
del start_df

if duplicates_on:
df = df.drop_duplicates(duplicates_on, keep="last", ignore_index=True)

while len(df) - start_idx >= max_row_group_size:
chunk, next_idx = _get_next_chunk(
df,
start_idx,
max_row_group_size,
ordered_on,
distinct_bounds,
)
if duplicates_on:
chunk = chunk.drop_duplicates(duplicates_on, keep="last")
yield chunk
start_idx = next_idx

if start_idx < len(df):
chunk = df.iloc[start_idx:].copy(deep=True)
if yield_last:
if duplicates_on:
chunk = chunk.drop_duplicates(duplicates_on, keep="last")
del df
if yield_remainder:
yield chunk
else:
return chunk
Expand All @@ -181,8 +182,9 @@ def _iter_resized_parquet_file(
pf: ParquetFile,
max_row_group_size: int,
ordered_on: str,
starter: Optional[DataFrame] = None,
start_df: Optional[DataFrame] = None,
distinct_bounds: bool = False,
yield_remainder: bool = False,
):
"""
Yield resized row groups from ParquetFile.
Expand All @@ -200,11 +202,14 @@ def _iter_resized_parquet_file(
Maximum number of rows per row group.
ordered_on : str
Column name by which data is ordered. Data must be in ascending order.
starter : Optional[DataFrame], default None
start_df : Optional[DataFrame], default None
Data to start the iteration with. Must be ordered by the same column.
distinct_bounds : bool, default False
If True, ensures that row group boundaries do not split duplicate rows
in the ordered_on column.
yield_remainder : bool, default False
If True, yields the last chunk of data even if it is smaller than
max_row_group_size.
Yields
------
Expand All @@ -214,8 +219,8 @@ def _iter_resized_parquet_file(
Returns
-------
Optional[DataFrame]
Remaining data if the final chunk is smaller than max_row_group_size.
This data might need to be merged with subsequent operations.
Remaining data if yield_remainder is False and final chunk is smaller
than max_row_group_size.
Notes
-----
Expand All @@ -225,11 +230,11 @@ def _iter_resized_parquet_file(
"""
start_rg_idx = 0
if starter is None:
if start_df is None:
buffer_num_rows = 0
else:
buffer_num_rows = len(starter)
remainder = starter
buffer_num_rows = len(start_df)
remainder = start_df

for rg_idx, rg in enumerate(pf.row_groups, start=1):
buffer_num_rows += rg.num_rows
Expand All @@ -251,7 +256,10 @@ def _iter_resized_parquet_file(
start_rg_idx = rg_idx - 1
buffer_num_rows = len(remainder) if remainder is not None else 0

return remainder
if yield_remainder:
yield remainder
else:
return remainder


def iter_merged_pandas_parquet_file(
Expand Down Expand Up @@ -327,6 +335,7 @@ def iter_merged_pandas_parquet_file(
max_row_group_size,
ordered_on,
distinct_bounds,
yield_last=False,
)
elif (no_overlap_df_last_idx := rg_df_start_idxs[0]) > max_row_group_size:
# Case there is sufficient data in the pandas DataFrame to start a new row group.
Expand All @@ -337,6 +346,7 @@ def iter_merged_pandas_parquet_file(
ordered_on,
distinct_bounds,
duplicates_on,
yield_last=False,
)

# Merge overlapping data.
Expand Down Expand Up @@ -373,14 +383,14 @@ def iter_merged_pandas_parquet_file(
# Handle data after the last overlaps.
if pf_rg_overlap_end_idx:
# Case there is parquet file data after the last overlapping row group.
remainder = yield from _iter_resized_parquet_file(
yield from _iter_resized_parquet_file(
pf[pf_rg_overlap_end_idx:],
max_row_group_size,
ordered_on,
remainder,
distinct_bounds,
yield_last=True,
)
yield remainder
elif _df_end_idx < len(df):
yield from _iter_pandas_dataframe(
df.iloc[_df_end_idx:],
Expand Down
Loading

0 comments on commit 7429172

Please sign in to comment.