diff --git a/oups/store/iter_dataframe.py b/oups/store/iter_data.py similarity index 89% rename from oups/store/iter_dataframe.py rename to oups/store/iter_data.py index b1fcf04..460c729 100644 --- a/oups/store/iter_dataframe.py +++ b/oups/store/iter_data.py @@ -99,7 +99,7 @@ def _get_next_chunk( val_at_end = df[ordered_on].iloc[end_idx] # Find the leftmost index to not split duplicates. end_idx = searchsorted(df[ordered_on].to_numpy(), val_at_end) - if end_idx == 0: + if end_idx == start_idx: # All values in chunk are duplicates. # Return chunk of data that will be larger than 'size', but complies # with distinct bounds. @@ -111,10 +111,10 @@ def _iter_pandas_dataframe( df: DataFrame, max_row_group_size: int, ordered_on: str, - starter: Optional[DataFrame] = None, + start_df: Optional[DataFrame] = None, distinct_bounds: bool = False, duplicates_on: Optional[Union[str, List[str]]] = None, - yield_last: bool = False, + yield_remainder: bool = False, ) -> Iterable[DataFrame]: """ Split pandas DataFrame into row groups. @@ -126,14 +126,15 @@ def _iter_pandas_dataframe( max_row_group_size : int Maximum number of rows per row group. ordered_on : str - Column name by which data is ordered. Data must be in ascending order. # Added ascending order requirement - starter : Optional[DataFrame] - Data to start the iteration with. Must be ordered by the same column. # Added ordering requirement + Column name by which data is ordered. Data must be in ascending order. + start_df : Optional[DataFrame] + Data to start the iteration with. Must be ordered by the same column. distinct_bounds : bool, default False If True, ensures that row group boundaries do not split duplicate rows. duplicates_on : Optional[Union[str, List[str]]], default None - Column(s) to check for duplicates. If provided, duplicates will be removed keeping last occurrence. - yield_last : bool, default False + Column(s) to check for duplicates. If provided, duplicates will be + removed keeping last occurrence. + yield_remainder : bool, default False If True, yields the last chunk of data even if it is smaller than 'max_row_group_size'. @@ -145,16 +146,19 @@ def _iter_pandas_dataframe( Returns ------- Optional[DataFrame] - Remaining data if yield_last is False and final chunk is smaller than max_row_group_size. + Remaining data if yield_remainder is False and final chunk is smaller + than max_row_group_size. """ - if starter is None: - start_idx = 0 - else: - start_idx = len(starter) - df = concat([starter, df]) - del starter - while start_idx < len(df) and len(df) - start_idx > max_row_group_size: + start_idx = 0 + if start_df is not None: + df = concat([start_df, df]) + del start_df + + if duplicates_on: + df = df.drop_duplicates(duplicates_on, keep="last", ignore_index=True) + + while len(df) - start_idx >= max_row_group_size: chunk, next_idx = _get_next_chunk( df, start_idx, @@ -162,16 +166,13 @@ def _iter_pandas_dataframe( ordered_on, distinct_bounds, ) - if duplicates_on: - chunk = chunk.drop_duplicates(duplicates_on, keep="last") yield chunk start_idx = next_idx if start_idx < len(df): chunk = df.iloc[start_idx:].copy(deep=True) - if yield_last: - if duplicates_on: - chunk = chunk.drop_duplicates(duplicates_on, keep="last") + del df + if yield_remainder: yield chunk else: return chunk @@ -181,8 +182,9 @@ def _iter_resized_parquet_file( pf: ParquetFile, max_row_group_size: int, ordered_on: str, - starter: Optional[DataFrame] = None, + start_df: Optional[DataFrame] = None, distinct_bounds: bool = False, + yield_remainder: bool = False, ): """ Yield resized row groups from ParquetFile. @@ -200,11 +202,14 @@ def _iter_resized_parquet_file( Maximum number of rows per row group. ordered_on : str Column name by which data is ordered. Data must be in ascending order. - starter : Optional[DataFrame], default None + start_df : Optional[DataFrame], default None Data to start the iteration with. Must be ordered by the same column. distinct_bounds : bool, default False If True, ensures that row group boundaries do not split duplicate rows in the ordered_on column. + yield_remainder : bool, default False + If True, yields the last chunk of data even if it is smaller than + max_row_group_size. Yields ------ @@ -214,8 +219,8 @@ def _iter_resized_parquet_file( Returns ------- Optional[DataFrame] - Remaining data if the final chunk is smaller than max_row_group_size. - This data might need to be merged with subsequent operations. + Remaining data if yield_remainder is False and final chunk is smaller + than max_row_group_size. Notes ----- @@ -225,11 +230,11 @@ def _iter_resized_parquet_file( """ start_rg_idx = 0 - if starter is None: + if start_df is None: buffer_num_rows = 0 else: - buffer_num_rows = len(starter) - remainder = starter + buffer_num_rows = len(start_df) + remainder = start_df for rg_idx, rg in enumerate(pf.row_groups, start=1): buffer_num_rows += rg.num_rows @@ -251,7 +256,10 @@ def _iter_resized_parquet_file( start_rg_idx = rg_idx - 1 buffer_num_rows = len(remainder) if remainder is not None else 0 - return remainder + if yield_remainder: + yield remainder + else: + return remainder def iter_merged_pandas_parquet_file( @@ -327,6 +335,7 @@ def iter_merged_pandas_parquet_file( max_row_group_size, ordered_on, distinct_bounds, + yield_last=False, ) elif (no_overlap_df_last_idx := rg_df_start_idxs[0]) > max_row_group_size: # Case there is sufficient data in the pandas DataFrame to start a new row group. @@ -337,6 +346,7 @@ def iter_merged_pandas_parquet_file( ordered_on, distinct_bounds, duplicates_on, + yield_last=False, ) # Merge overlapping data. @@ -373,14 +383,14 @@ def iter_merged_pandas_parquet_file( # Handle data after the last overlaps. if pf_rg_overlap_end_idx: # Case there is parquet file data after the last overlapping row group. - remainder = yield from _iter_resized_parquet_file( + yield from _iter_resized_parquet_file( pf[pf_rg_overlap_end_idx:], max_row_group_size, ordered_on, remainder, distinct_bounds, + yield_last=True, ) - yield remainder elif _df_end_idx < len(df): yield from _iter_pandas_dataframe( df.iloc[_df_end_idx:], diff --git a/tests/test_store/test_iter_data.py b/tests/test_store/test_iter_data.py new file mode 100644 index 0000000..10b8cbf --- /dev/null +++ b/tests/test_store/test_iter_data.py @@ -0,0 +1,285 @@ +#!/usr/bin/env python3 +""" +Test cases for iter_dataframe private functions. + +Tests cover chunk extraction, pandas DataFrame iteration, and parquet file iteration +with various configurations of distinct bounds and duplicate handling. + +""" + +import pytest +from fastparquet import ParquetFile +from fastparquet import write +from pandas import DataFrame +from pandas import concat +from pandas.testing import assert_frame_equal + +from oups.store.iter_data import _get_next_chunk +from oups.store.iter_data import _iter_pandas_dataframe +from oups.store.iter_data import _iter_resized_parquet_file + + +@pytest.fixture +def sample_df(): + """ + Create a sample DataFrame for testing. + """ + return DataFrame( + { + "ordered": [1, 2, 2, 2, 3, 4, 4, 5], + "values": list("abcdefgh"), + }, + ) + + +@pytest.mark.parametrize( + "start_idx,size,distinct_bounds,expected_end_idx", + [ + # Case 1: No distinct bounds - simply takes size=3 rows + (0, 3, False, 3), + # Case 2: Distinct bounds with fewer duplicates than size + # Should only take value=1 to avoid splitting value=2 + (0, 3, True, 1), + # Case 3: Distinct bounds with more duplicates than size + # Starting at first 2, should take all three 2s + (1, 2, True, 4), + # Case 4: Starting mid-sequence with distinct bounds + # Starting at first 4, should take both 4s + (5, 2, True, 7), + # Case 5: End of dataframe + (6, 3, False, 8), + # Case 6: Distinct bounds at end of dataframe + (7, 2, True, 8), + ], +) +def test_get_next_chunk( + sample_df, + start_idx, + size, + distinct_bounds, + expected_end_idx, +): + """ + Test _get_next_chunk function with different boundary conditions. + + Parameters + ---------- + sample_df : DataFrame + Test DataFrame fixture. + start_idx : int + Starting index for chunk extraction. + size : int + Maximum chunk size. + distinct_bounds : bool + Whether to respect value boundaries. + expected_end_idx : int + Expected ending index. + + """ + chunk, end_idx = _get_next_chunk( + sample_df, + start_idx, + size, + "ordered", + distinct_bounds, + ) + assert_frame_equal(chunk, sample_df.iloc[start_idx:expected_end_idx]) + assert end_idx == expected_end_idx + + +@pytest.mark.parametrize( + "start_df, duplicates_on, yield_remainder", + [ + (None, None, True), # Basic case, no remainder + (DataFrame({"ordered": [0], "values": ["z"]}), None, True), # With start_df + (DataFrame({"ordered": [-1, 0], "values": ["w", "z"]}), "ordered", False), # With start_df + ( + DataFrame({"ordered": [-1, 0, 1], "values": ["w", "z", "u"]}), + "ordered", + False, + ), # With start_df + (None, "ordered", True), # With duplicates + (None, None, False), # Return remainder + ], +) +def test_iter_pandas_dataframe( + sample_df, + start_df, + duplicates_on, + yield_remainder, +): + """ + Test _iter_pandas_dataframe with various configurations. + + Parameters + ---------- + sample_df : DataFrame + Test DataFrame fixture. + start_df : DataFrame or None + Optional starter DataFrame. + duplicates_on : str or None + Column to check for duplicates. + yield_last : bool + Whether to yield the last chunk. + + """ + row_group_size = 3 + iterator = _iter_pandas_dataframe( + sample_df, + row_group_size, + "ordered", + start_df=start_df.copy(deep=True) if start_df is not None else None, + distinct_bounds=bool(duplicates_on), + duplicates_on=duplicates_on, + yield_remainder=yield_remainder, + ) + + expected = ( + concat([start_df, sample_df], ignore_index=True) if start_df is not None else sample_df + ) + if duplicates_on: + expected = expected.drop_duplicates(duplicates_on, keep="last", ignore_index=True) + has_remainder = len(expected) % row_group_size > 0 + + # Collect all chunks + if yield_remainder: + all_chunks = list(iterator) + yielded_chunks = all_chunks + else: + + def yield_all(): + remainder = yield from iterator + if remainder is not None: + yield remainder + + all_chunks = list(yield_all()) + # Do a 2nd time to check only yielded chunks. + iterator2 = _iter_pandas_dataframe( + sample_df, + row_group_size, + "ordered", + start_df=start_df.copy(deep=True) if start_df is not None else None, + distinct_bounds=bool(duplicates_on), + duplicates_on=duplicates_on, + yield_remainder=yield_remainder, + ) + yielded_chunks = list(iterator2) + print("yielded_chunks") + print(yielded_chunks) + + print("all_chunks") + print(all_chunks) + complete_chunks = all_chunks[:-1] if has_remainder else all_chunks + + # Verify chunk sizes + for chunk in complete_chunks: + assert len(chunk) == row_group_size + + # Verify yielded data + result = concat(yielded_chunks, ignore_index=True) + + if yield_remainder: + assert_frame_equal(result, expected) + else: + # When not yielding last chunk, expected data should exclude remainder + expected_without_remainder = expected.iloc[ + : (len(expected) // row_group_size) * row_group_size + ] + print("expected_without_remainder") + print(expected_without_remainder) + print("result") + print(result) + assert_frame_equal(result, expected_without_remainder) + + +@pytest.fixture +def create_parquet_file(tmp_path): + """ + Create a temporary parquet file for testing. + """ + + def _create_parquet(df): + path = f"{tmp_path}/test.parquet" + write(path, df, row_group_offsets=[0, 3, 6], file_scheme="hive") + return ParquetFile(path) + + return _create_parquet + + +@pytest.mark.parametrize( + "start_df,yield_remainder,has_remainder", + [ + (None, True, False), # Basic case + (DataFrame({"ordered": [0], "values": ["z"]}), True, False), # With start_df + (None, False, True), # Return remainder + ], +) +def test_iter_resized_parquet_file( + sample_df, + create_parquet_file, + start_df, + yield_remainder, + has_remainder, +): + """ + Test _iter_resized_parquet_file with various configurations. + + Parameters + ---------- + sample_df : DataFrame + Test DataFrame fixture. + create_parquet_file : callable + Fixture to create temporary parquet files. + start_df : DataFrame or None + Optional starter DataFrame. + yield_remainder : bool + Whether to yield the last chunk. + has_remainder : bool + Whether a remainder is expected. + + """ + pf = create_parquet_file(sample_df) + row_group_size = 3 + + iterator = _iter_resized_parquet_file( + pf, + row_group_size, + "ordered", + start_df=start_df, + distinct_bounds=False, + yield_remainder=yield_remainder, + ) + + # Collect all chunks + chunks = [] + returned_remainder = None + for chunk in iterator: + chunks.append(chunk) + + # Get remainder if it was returned + if not yield_remainder: + returned_remainder = iterator.send(None) + + # Verify chunk sizes + for chunk in chunks[:-1]: # All but last chunk + assert len(chunk) <= row_group_size + + # Verify total data + result = concat(chunks, ignore_index=True) + expected = ( + concat([start_df, sample_df], ignore_index=True) if start_df is not None else sample_df + ) + + if not yield_remainder: + # When not yielding last chunk, expected data should exclude remainder + expected_without_remainder = expected.iloc[ + : (len(expected) // row_group_size) * row_group_size + ] + n_expected_rows = len(expected_without_remainder) + expected_remainder = expected.iloc[n_expected_rows:] + assert returned_remainder is not None + assert_frame_equal(returned_remainder, expected_remainder) + assert_frame_equal(result, expected_without_remainder) + else: + assert returned_remainder is None + assert_frame_equal(result, expected)