diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b587b0e..f53e9e6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -89,14 +89,13 @@ repos: oups/store/__init__.py:F401 oups/store/writer.py:C901 S403 S301 oups/store/iter_data.py:C901 + oups/store/ordered_merge_info.py:C901 oups/aggstream/__init__.py:F401 oups/aggstream/jcumsegagg.py:C901 oups/aggstream/segmentby.py:C901 oups/aggstream/cumsegagg.py:C901 E203 oups/aggstream/aggstream.py:C901 tests/*.py:D103 - tests/test_store/test_ordered_merge_info.py:F401 F811 - tests/test_store/test_iter_data.py:F401 F811 ] # Pydocstyle diff --git a/oups/store/iter_data.py b/oups/store/iter_data.py index 771a3f2..08a7047 100644 --- a/oups/store/iter_data.py +++ b/oups/store/iter_data.py @@ -12,7 +12,7 @@ from pandas import DataFrame from pandas import concat -from oups.store.ordered_merge_info import OrderedMergeInfo +from oups.store.ordered_merge_info import analyze_chunks_to_merge def _validate_duplicate_on_param( @@ -112,7 +112,7 @@ def _get_next_chunk( def _iter_df( ordered_on: str, - max_row_group_size: int, + row_group_size_target: int, df: Union[DataFrame, List[DataFrame]], distinct_bounds: bool = False, duplicates_on: Optional[Union[str, List[str]]] = None, @@ -125,7 +125,7 @@ def _iter_df( ---------- ordered_on : str Column name by which data is ordered. Data must be in ascending order. - max_row_group_size : int + row_group_size_target : int Maximum number of rows per row group. df : Union[DataFrame, List[DataFrame]] Pandas DataFrame to split. If a list, they are merged and sorted back by @@ -137,20 +137,20 @@ def _iter_df( removed keeping last occurrence. yield_remainder : bool, default False If True, yields the last chunk of data even if it is smaller than - 'max_row_group_size'. + 'row_group_size_target'. Yields ------ DataFrame - Chunks of the DataFrame, each with size <= max_row_group_size, except + Chunks of the DataFrame, each with size <= row_group_size_target, except if distinct_bounds is True and there are more duplicates in the - 'ordered_on' column than max_row_group_size. + 'ordered_on' column than row_group_size_target. Returns ------- Optional[DataFrame] Remaining data if yield_remainder is False and final chunk is smaller - than max_row_group_size. + than row_group_size_target. """ if isinstance(df, list): @@ -161,11 +161,11 @@ def _iter_df( start_idx = 0 df_n_rows = len(df) - while df_n_rows - start_idx >= max_row_group_size: + while df_n_rows - start_idx >= row_group_size_target: chunk, next_idx = _get_next_chunk( df=df, start_idx=start_idx, - size=max_row_group_size, + size=row_group_size_target, distinct_bounds=distinct_bounds, ordered_on=ordered_on, ) @@ -185,7 +185,7 @@ def iter_merged_pf_df( df: DataFrame, pf: ParquetFile, ordered_on: str, - max_row_group_size: int, + row_group_size_target: int, distinct_bounds: Optional[bool] = False, duplicates_on: Optional[Union[str, Iterable[str]]] = None, ): @@ -196,7 +196,7 @@ def iter_merged_pf_df( ---------- ordered_on : str Column name by which data is ordered. Data must be in ascending order. - max_row_group_size : int + row_group_size_target : int Max number of rows per chunk. df : DataFrame In-memory pandas DataFrame to process. Must be ordered by 'ordered_on' @@ -212,7 +212,7 @@ def iter_merged_pf_df( Yields ------ DataFrame - Chunks of merged and ordered data, each with size <= max_row_group_size. + Chunks of merged and ordered data, each with size <= row_group_size_target. Raises ------ @@ -247,85 +247,88 @@ def iter_merged_pf_df( columns=list(df.columns), ) + # TODO: + # correct here!! + max_n_irgs = None + + # Identify overlapping row groups. + # If intent is to drop duplicates, 'analyze_chunks_to_merge' has to be + # applied on a DataFrame without duplicates, so that returned indices stay + # consistent (versus a scenario duplicates are dropped afterwards). if duplicates_on: df.drop_duplicates(duplicates_on, keep="last", ignore_index=True, inplace=True) - - # Identify overlapping row groups - overlap_info = OrderedMergeInfo.analyze( + chunk_counter, sort_rgs_after_write = analyze_chunks_to_merge( df=df, pf=pf, ordered_on=ordered_on, - max_row_group_size=max_row_group_size, + row_group_size_target=row_group_size_target, + drop_duplicates=duplicates_on is not None, + max_n_irgs=max_n_irgs, ) - # Handle data in 'df' before the loop over row groups in 'pf'. + has_pf = 0 + has_df = 0 + rg_idx_start = rg_idx = 0 + df_idx_start = _df_idx_start = 0 + if isinstance(row_group_size_target, str): + next_period_start = df.loc[:, ordered_on].iloc[0].ceil(row_group_size_target) + is_mixed_chunk = False + chunk_n_rows = 0 remainder = None - df_idx_start = 0 - if overlap_info.has_df_head: - # Case there is sufficient data in the pandas DataFrame to start a new - # row group. - # If 'duplicates_on' is provided, duplicates have been removed already, - # so no need to remove them again. - remainder = yield from _iter_df( - df=df.iloc[: overlap_info.df_idx_overlap_start], - ordered_on=ordered_on, - max_row_group_size=max_row_group_size, - distinct_bounds=distinct_bounds, - duplicates_on=None, - yield_remainder=False, - ) - # Correct then 'df_idx_start' to account for the dataframe head already - # yielded. - df_idx_start = overlap_info.df_idx_overlap_start if overlap_info.has_overlap else len(df) - - # Merge possibly overlapping data (full loop over 'pf' row groups). - rg_idx_start = 0 # HERE: overlap_info.rg_idx_overlap_start - buffer_num_rows = 0 if remainder is None else len(remainder) - for rg_idx_1, (_df_idx_start, df_idx_end) in enumerate( - zip(overlap_info.df_idx_rg_starts, overlap_info.df_idx_rg_ends_excl), - start=1, - ): - n_data_rows = df_idx_end - _df_idx_start - # HERE: overlap_info.rg_n_rows[rg_idx_1 - 1] + n_data_rows - buffer_num_rows += pf.row_groups[rg_idx_1 - 1].num_rows + n_data_rows - # HERE: ...or rg_idx_1 == overlap_info.n_rgs_overlap - if buffer_num_rows >= max_row_group_size or rg_idx_1 == len(pf): - chunk = pf[rg_idx_start:rg_idx_1].to_pandas() - if df_idx_start != df_idx_end: - # Merge with pandas DataFrame chunk. - # DataFrame chunk is added last in concat, to preserve - # its values in case of duplicates found with values in - # ParquetFile chunk. - chunk = [remainder, chunk, df.iloc[df_idx_start:df_idx_end]] - elif remainder is not None: - chunk = [remainder, chunk] - if buffer_num_rows >= max_row_group_size: - remainder = yield from _iter_df( - df=chunk, - ordered_on=ordered_on, - max_row_group_size=max_row_group_size, - distinct_bounds=distinct_bounds, - duplicates_on=duplicates_on, - yield_remainder=False, + + for chunk_idx, df_idx_end_excl in enumerate(chunk_counter, start=-len(chunk_counter)): + # 'chunk_idx' will be 0 when loop is over. + if df_idx_end_excl > _df_idx_start: + # Possible DataFrame contribution to chunk. + chunk_n_rows += df_idx_end_excl - 1 - _df_idx_start + has_df += 1 + if is_mixed_chunk: + # Add ParquetFile contribution to chunk. + chunk_n_rows += pf[rg_idx].num_rows + has_pf += 1 + rg_idx += 1 + + _df_idx_start = df_idx_end_excl + is_mixed_chunk = not is_mixed_chunk + + if ( + chunk_n_rows >= row_group_size_target + if isinstance(row_group_size_target, int) + else (_chunk_last_ts := df.loc[:, ordered_on].iloc[df_idx_end_excl - 1]) + >= next_period_start + ) or not chunk_idx: + if not has_pf: + chunk = ( + [remainder, df.iloc[df_idx_start:df_idx_end_excl]] + if remainder is not None + else df.iloc[df_idx_start:df_idx_end_excl] + ) + elif not has_df: + chunk = ( + [remainder, pf[rg_idx_start:rg_idx].to_pandas()] + if remainder is not None + else pf[rg_idx_start:rg_idx].to_pandas() ) - df_idx_start = df_idx_end - rg_idx_start = rg_idx_1 - buffer_num_rows = 0 if remainder is None else len(remainder) else: - remainder = chunk - - # Handle data after the last overlaps. - if overlap_info.has_df_tail: - # Case there is remaining data in pandas DataFrame. - df_idx_overlap_end_excl = overlap_info.df_idx_overlap_end_excl - yield from _iter_df( - df=[remainder, df.iloc[df_idx_overlap_end_excl:]], - ordered_on=ordered_on, - max_row_group_size=max_row_group_size, - distinct_bounds=distinct_bounds, - duplicates_on=None if remainder is None else duplicates_on, - yield_remainder=True, - ) - elif remainder is not None: - # Case there only is a remainder from previous iterations. - yield remainder + # Both pandas DataFrame and ParquetFile chunks. + # If 'remainder' is None, it does not raise trouble for concat + # step. + chunk = [ + remainder, + pf[rg_idx_start:rg_idx].to_pandas(), + df.iloc[df_idx_start:df_idx_end_excl], + ] + remainder = yield from _iter_df( + df=chunk, + ordered_on=ordered_on, + row_group_size_target=row_group_size_target, + distinct_bounds=distinct_bounds, + duplicates_on=duplicates_on, + yield_remainder=not chunk_idx, + ) + df_idx_start = df_idx_end_excl + if isinstance(row_group_size_target, str): + next_period_start = _chunk_last_ts.ceil(row_group_size_target) + rg_idx_start = rg_idx + chunk_n_rows = 0 if remainder is None else len(remainder) + has_pf = has_df = 0 diff --git a/oups/store/ordered_merge_info.py b/oups/store/ordered_merge_info.py index 1a7f4f0..9a3c591 100644 --- a/oups/store/ordered_merge_info.py +++ b/oups/store/ordered_merge_info.py @@ -5,10 +5,10 @@ @author: yoh """ -from dataclasses import dataclass -from typing import List, Optional, Union +from typing import List, Optional, Tuple, Union from fastparquet import ParquetFile +from numpy import empty from numpy import searchsorted from numpy import vstack from numpy.typing import NDArray @@ -21,7 +21,7 @@ MAX = "max" LEFT = "left" RIGHT = "right" -MAX_ROW_GROUP_SIZE_SCALE_FACTOR = 0.9 +MAX_ROW_GROUP_SIZE_SCALE_FACTOR = 0.8 def _incomplete_row_groups_start( @@ -131,8 +131,6 @@ def _incomplete_row_groups_start( # 'irg_idx_start' is '-1' with respect to its definition. irg_idx_start += 1 n_irgs = n_rgs - irg_idx_start - print("n_irgs") - print(n_irgs) if df_connected_to_set_of_incomplete_rgs and ( last_row_group_boundary_exceeded or n_irgs >= max_n_irgs @@ -142,281 +140,279 @@ def _incomplete_row_groups_start( return irg_idx_start -@dataclass -class OrderedMergeInfo: +def analyze_chunks_to_merge( + df: DataFrame, + pf: ParquetFile, + ordered_on: str, + row_group_size_target: Union[int, str], + drop_duplicates: bool, + max_n_irgs: Optional[int] = None, +) -> Tuple[List[int], bool]: """ - Information about how DataFrame and ParquetFile overlap. + Describe how DataFrame and ParquetFile chunks can be merged. + + Important: because this function returns indices in input DataFrame, if + duplicates are dropped, this function should be applied on a DataFrame + without duplicates. Parameters ---------- - has_df_head : bool - True if DataFrame head is large enough to make a new row group before - ParquetFile overlap. - If 'row_group_size_target' is a str, it is True if data in DataFrame - starts before first period in ParquetFile. - If 'row_group_size_target' is an int, it is True if there are more rows - in DataFrame than 'row_group_size_target' before ParquetFile overlap. - has_overlap : bool - True if DataFrame and ParquetFile have overlapping data. - has_df_tail : bool - True if DataFrame has rows after ParquetFile overlap. - df_idx_merge_start : Optional[int] - Index of first row in DataFrame where merge starts, if any. - df_idx_merge_end_excl : Optional[int] - Index of the row after the last row in DataFrame where merge ends, if - any. - rg_idx_merge_start : Optional[int] - Index in the complete ParquetFile of first row group where merge starts, - if any. - rg_idx_merge_end_excl : Optional[int] - Index in the complete ParquetFile of row group after last row group - where merge ends, if any. - df_idx_tmrg_starts : ndarray - Indices of the rows in DataFrame where each to-merge row group starts. - df_idx_tmrg_ends_excl : ndarray - Indices of the rows in DataFrame after the rows where each to-merge - row group ends. - tmrg_n_rows : ndarray - Number of rows in each to-merge row group. - n_tmrgs : int - Number of to-merge row groups. - sort_rgs_after_rewrite : bool - Flag indicating if row groups have to be sorted after rewrite. + df : DataFrame + Input DataFrame. Must contain the 'ordered_on' column. + pf : ParquetFile + Input ParquetFile. Must contain statistics for the 'ordered_on' + column. + ordered_on : str + Column name by which data is ordered. Must exist in both DataFrame + and ParquetFile. + row_group_size_target : Union[int, str] + Target row group size. + drop_duplicates : bool + Flag impacting how overlap boundaries have to be managed. + More exactly, 'pf' is considered as first data, and 'df' as second + data, coming after. In case of 'pf' leading 'df', if last value in + 'pf' is a duplicate of the first in 'df', then + - if True, at this index, overlap starts + - if False, no overlap at this index + max_n_irgs : Optional[int] + Max allowed number of 'incomplete' row groups. - """ + - ``None`` value induces no coalescing of row groups. If there is + no drop of duplicates, new data is systematically appended. + - A value of ``0`` or ``1`` means that new data should be + systematically merged to the last existing one to 'complete' it + (if it is not 'complete' already). - # Region flags - has_df_head: bool # Enough data before merge region to make a new row group - has_overlap: bool # Whether df and pf have overlapping data - has_df_tail: bool # Data after merge region - # Merge region boundaries - df_idx_merge_start: Optional[int] - df_idx_merge_end_excl: Optional[int] - rg_idx_merge_start: Optional[int] - rg_idx_merge_end_excl: Optional[int] - # To-merge row groups details (tmrg) - df_idx_tmrg_starts: NDArray - df_idx_tmrg_ends_excl: NDArray - tmrg_n_rows: NDArray - n_tmrgs: int - # Flag indicating if row groups have to be sorted after rewrite. - sort_rgs_after_rewrite: bool - - @classmethod - def analyze( - cls, - df: DataFrame, - pf: ParquetFile, - ordered_on: str, - row_group_size_target: Union[int, str], - drop_duplicates: bool, - max_n_irgs: Optional[int] = None, - ) -> "OrderedMergeInfo": - """ - Analyze how DataFrame and ParquetFile data overlap. - - Parameters - ---------- - df : DataFrame - Input DataFrame. Must contain the 'ordered_on' column. - pf : ParquetFile - Input ParquetFile. Must contain statistics for the 'ordered_on' - column. - ordered_on : str - Column name by which data is ordered. Must exist in both DataFrame - and ParquetFile. - row_group_size_target : Union[int, str] - Target row group size. - drop_duplicates : bool - Flag impacting how overlap boundaries have to be managed. - More exactly, 'pf' is considered as first data, and 'df' as second - data, coming after. In case of 'pf' leading 'df', if last value in - 'pf' is a duplicate of the first in 'df', then - - if True, at this index, overlap starts - - if False, no overlap at this index - max_n_irgs : Optional[int] - Max allowed number of 'incomplete' row groups. - - - ``None`` value induces no coalescing of row groups. If there is - no drop of duplicates, new data is systematically appended. - - A value of ``0`` or ``1`` means that new data should be - systematically merged to the last existing one to 'complete' it - (if it is not 'complete' already). - - Returns - ------- - OrderedMergeInfo - Instance containing merge analysis information. - - Notes - ----- - When a row in pf shares the same value in 'ordered_on' column as a row - in df, the row in pf is considered as leading the row in df i.e. - anterior to it. - This has an impact in overlap identification in case of not dropping - duplicates. - - When 'max_n_irgs' is specified, the method will analyze trailing - incomplete row groups and may adjust the overlap region to include them - for coalescing if necessary. - - """ - df_n_rows = len(df) - rg_n_rows = [rg.num_rows for rg in pf.row_groups] - n_rgs = len(rg_n_rows) - df_min = df.loc[:, ordered_on].iloc[0] - # Find overlapping regions in dataframe - rg_mins = pf.statistics[MIN][ordered_on] - rg_maxs = pf.statistics[MAX][ordered_on] - if drop_duplicates: - print("drop_duplicates") - # Determine overlap start/end indices in row groups - df_idx_tmrg_starts = searchsorted(df.loc[:, ordered_on], rg_mins, side=LEFT) - df_idx_tmrg_ends_excl = searchsorted(df.loc[:, ordered_on], rg_maxs, side=RIGHT) - else: - print("no drop_duplicates") - df_idx_tmrg_starts, df_idx_tmrg_ends_excl = searchsorted( - df.loc[:, ordered_on], - vstack((rg_mins, rg_maxs)), - side=LEFT, - ) + Returns + ------- + Tuple[NDArray[int], bool] + Indices in DataFrame describing where each chunk ends (excluded) in the + DataFrame. The first indices is the end (excluded) of a chunk in + DataFrame only. The array alternates then chunks mixing data from + DataFrame and a row group in ParquetFile, with chunks in DataFrame + only. - if df_idx_tmrg_ends_excl[-1] and df_idx_tmrg_starts[0] != df_n_rows: - rg_idx_merge_start = df_idx_tmrg_ends_excl.astype(bool).argmax() - rg_idx_merge_end_excl = df_idx_tmrg_ends_excl.argmax() + 1 - elif not df_idx_tmrg_ends_excl[-1]: - # df after last row group in pf - rg_idx_merge_start = rg_idx_merge_end_excl = n_rgs - else: - # df before first row group in pf - rg_idx_merge_start = rg_idx_merge_end_excl = 0 - - sort_rgs_after_rewrite = df_min <= rg_maxs[-1] if drop_duplicates else df_min < rg_maxs[-1] - - print("before trimming rgs lists") - print("df_n_rows") - print(df_n_rows) - print("df_idx_tmrg_starts") - print(df_idx_tmrg_starts) - print("df_idx_tmrg_ends_excl") - print(df_idx_tmrg_ends_excl) - print("rg_idx_merge_start") - print(rg_idx_merge_start) - print("rg_idx_merge_end_excl") - print(rg_idx_merge_end_excl) - - # Assess if trailing incomplete row groups in ParquetFile have to be - # included in the merge. - if max_n_irgs is not None: - irg_idx_start = _incomplete_row_groups_start( - row_group_size_target=row_group_size_target, - df_max=df.loc[:, ordered_on].iloc[-1], - df_n_rows=df_n_rows, - max_n_irgs=max_n_irgs, - n_rgs=n_rgs, - rg_n_rows=rg_n_rows, - rg_mins=rg_mins, - rg_maxs=rg_maxs, - ) - if irg_idx_start is not None: - print("irg_idx_start") - print(irg_idx_start) - # If not None, coalescing of trailing row groups is needed. + Flag indicating if row groups need to be resorted after merge. + + Notes + ----- + When a row in pf shares the same value in 'ordered_on' column as a row + in df, the row in pf is considered as leading the row in df i.e. + anterior to it. + This has an impact in overlap identification in case of not dropping + duplicates. + + When 'max_n_irgs' is specified, the method will analyze trailing + incomplete row groups and may adjust the overlap region to include them + for coalescing if necessary. + + """ + df_n_rows = len(df) + rg_n_rows = [rg.num_rows for rg in pf.row_groups] + n_rgs = len(rg_n_rows) + # df_min = df.loc[:, ordered_on].iloc[0] + # Find overlapping regions in dataframe + rg_mins = pf.statistics[MIN][ordered_on] + rg_maxs = pf.statistics[MAX][ordered_on] + if drop_duplicates: + print("drop_duplicates") + # Determine overlap start/end indices in row groups + df_idx_tmrg_starts = searchsorted(df.loc[:, ordered_on], rg_mins, side=LEFT) + df_idx_tmrg_ends_excl = searchsorted(df.loc[:, ordered_on], rg_maxs, side=RIGHT) + else: + print("no drop_duplicates") + df_idx_tmrg_starts, df_idx_tmrg_ends_excl = searchsorted( + df.loc[:, ordered_on], + vstack((rg_mins, rg_maxs)), + side=LEFT, + ) + + if not df_idx_tmrg_ends_excl[-1]: + # df after last row group in pf. + rg_idx_merge_start = rg_idx_merge_end_excl = n_rgs + elif df_idx_tmrg_starts[0] == df_n_rows: + # df before first row group in pf. + rg_idx_merge_start = rg_idx_merge_end_excl = 0 + else: + rg_idx_merge_start = df_idx_tmrg_ends_excl.astype(bool).argmax() + rg_idx_merge_end_excl = df_idx_tmrg_ends_excl.argmax() + 1 + + print("") + print("before irgs analysis") + print(f"df_n_rows: {df_n_rows}") + print(f"df_idx_tmrg_starts: {df_idx_tmrg_starts}") + print(f"df_idx_tmrg_ends_excl: {df_idx_tmrg_ends_excl}") + print(f"rg_idx_merge_start: {rg_idx_merge_start}") + print(f"rg_idx_merge_end_excl: {rg_idx_merge_end_excl}") + + # Assess if trailing incomplete row groups in ParquetFile have to be + # included in the merge. + if max_n_irgs is not None: + irg_idx_start = _incomplete_row_groups_start( + row_group_size_target=row_group_size_target, + df_max=df.loc[:, ordered_on].iloc[-1], + df_n_rows=df_n_rows, + max_n_irgs=max_n_irgs, + n_rgs=n_rgs, + rg_n_rows=rg_n_rows, + rg_mins=rg_mins, + rg_maxs=rg_maxs, + ) + print(f"irg_idx_start: {irg_idx_start}") + if irg_idx_start is not None: + # If not None, coalescing of trailing row groups is needed. + if rg_idx_merge_start != n_rgs: + rg_idx_merge_start = min(rg_idx_merge_start, irg_idx_start) + rg_idx_merge_end_excl = n_rgs + else: # Specific case when df is after last row group in pf, then # force integration of incomplete row groups in the merge. - rg_idx_merge_start = ( - min(rg_idx_merge_start, irg_idx_start) - if rg_idx_merge_start != n_rgs - else irg_idx_start - ) - rg_idx_merge_end_excl = n_rgs - sort_rgs_after_rewrite = True - - if rg_idx_merge_start != n_rgs: - # In case DataFrame is leading ParquetFile, but not big enough to make - # a new row group, force the merge to start at its first row. - df_idx_merge_start = df_idx_tmrg_starts[rg_idx_merge_start] - has_df_head = ( - df_idx_merge_start >= row_group_size_target - if isinstance(row_group_size_target, int) - else df_min < Timestamp(rg_mins[rg_idx_merge_start]).floor(row_group_size_target) + rg_idx_merge_start = irg_idx_start + # Force the last indice to encompass all remaining rows in DataFrame. + df_idx_tmrg_ends_excl[rg_idx_merge_end_excl - 1] = df_n_rows + print("") + print("after irgs analysis") + print(f"df_idx_tmrg_starts: {df_idx_tmrg_starts}") + print(f"df_idx_tmrg_ends_excl: {df_idx_tmrg_ends_excl}") + print(f"rg_idx_merge_start: {rg_idx_merge_start}") + print(f"rg_idx_merge_end_excl: {rg_idx_merge_end_excl}") + + # sort_rgs_after_write = True + # sort_rgs_after_write = df_min <= rg_maxs[-1] if drop_duplicates else df_min < rg_maxs[-1] + # print("df_idx_tmrg_starts[rg_idx_merge_start]") + # print(df_idx_tmrg_starts[rg_idx_merge_start]) + if rg_idx_merge_start == n_rgs: + # df after last row group in pf this is a simple append. + # chunk_counter = [df_n_rows] + return [df_n_rows], False + # elif (df_idx_merge_start := df_idx_tmrg_starts[rg_idx_merge_start]): + elif ( + df_idx_tmrg_starts[rg_idx_merge_start] + == df_idx_tmrg_ends_excl[rg_idx_merge_start] + == df_n_rows + ): + # First row group to merge starts after df. + print("") + print("has possible leading df chunk") + print(f"df_idx_tmrg_starts[rg_idx_merge_start]: {df_idx_tmrg_starts[rg_idx_merge_start]}") + # df is leading the first row group to merge in pf. + # In case DataFrame is leading ParquetFile, but is either not big enough + # to make a new row group, or there will be a remainder, then force a + # merge with the first row group starting at the first row of the + # remainder. + if isinstance(row_group_size_target, int): + print( + f"df_idx_tmrg_starts[rg_idx_merge_start] % row_group_size_target: {df_idx_tmrg_starts[rg_idx_merge_start] % row_group_size_target}", + ) + df_idx_remainder_start = ( + df_n_rows - df_idx_tmrg_starts[rg_idx_merge_start] % row_group_size_target ) - if not has_df_head and df_idx_merge_start: - # If amount of rows at start of df is not enough to make a new - # row group, but merge is not starting at the first row, - # force the merge to at least encompass the first row group, - rg_idx_merge_end_excl = max(1, rg_idx_merge_end_excl) - # and force the merge to start at the first row in the DataFrame. - df_idx_merge_start = df_idx_tmrg_starts[rg_idx_merge_start] = 0 - - # Trim row group related lists to the overlapping region. - df_idx_tmrg_starts = df_idx_tmrg_starts[rg_idx_merge_start:rg_idx_merge_end_excl] - df_idx_tmrg_ends_excl = df_idx_tmrg_ends_excl[rg_idx_merge_start:rg_idx_merge_end_excl] - tmrg_n_rows = rg_n_rows[rg_idx_merge_start:rg_idx_merge_end_excl] else: - # df after last row group in pf. - has_df_head = False - df_idx_tmrg_starts = df_idx_tmrg_ends_excl = tmrg_n_rows = [] - - # Analyze overlap patterns - # Assume no overlap. - print("after trimming rgs lists") - print("df_idx_tmrg_starts") - print(df_idx_tmrg_starts) - print("df_idx_tmrg_ends_excl") - print(df_idx_tmrg_ends_excl) - # 'df_idx_tmrg_starts' is a numpy array. Need to check its length to check - # if it is empty. bool(array([0])) is indeed False. - if len(df_idx_tmrg_starts): - # and (df_idx_overlap_start := df_idx_org_starts[0]) != df_n_rows - # and (df_idx_overlap_end_excl := df_idx_org_ends_excl[-1]) != 0): - # Overlap. - # df_idx_merge_start = df_idx_tmrg_starts[0] - # has_df_head = ( - # df_idx_merge_start >= row_group_size_target - # if isinstance(row_group_size_target, int) - # else df_min < Timestamp(rg_mins[rg_idx_merge_start]).floor(row_group_size_target) - # ) - # if not has_df_head and df_idx_merge_start: - # If amount of rows at start of df is not enough to make a new - # row group, but merge is not starting at the first row, force - # the merge to start at the 1st row. - # df_idx_merge_start = df_idx_tmrg_starts[0] = 0 - - has_overlap = True - if not (df_idx_merge_end_excl := df_idx_tmrg_ends_excl[-1]): - # Specific case when df is after last row group in pf. - # Then force df_idx_merge_end_excl to include the first row in - # df. - df_idx_merge_end_excl = df_idx_tmrg_ends_excl[-1] = 1 - has_df_tail = df_idx_merge_end_excl < df_n_rows - print("df_idx_merge_start") - print(df_idx_merge_start) - print("df_idx_merge_end_excl") - print(df_idx_merge_end_excl) - + df_idx_remainder_start = searchsorted( + df.loc[:, ordered_on], + Timestamp(rg_mins[rg_idx_merge_start]).floor(row_group_size_target), + side=LEFT, + ) + print(f"df_idx_remainder_start: {df_idx_remainder_start}") + + # has_leading_df_remainder = ( + # df_idx_merge_start % row_group_size_target + # if isinstance(row_group_size_target, int) + # else df.loc[:, ordered_on].iloc[df_idx_merge_start] >= Timestamp(rg_mins[rg_idx_merge_start]).floor(row_group_size_target) + # df_idx_remainder_start != df_n_rows-1 + # ) + if df_idx_remainder_start != df_n_rows: + # To make sure remainder is merged with next following row group, + # force merge to encompass it. + # rg_idx_merge_end_excl = max(rg_idx_merge_start+1, rg_idx_merge_end_excl) + # and force the merge to start at the first row of df remainder. + # df_idx_tmrg_starts[rg_idx_merge_start] = df_idx_remainder_start + return [df_idx_remainder_start, df_n_rows], True else: - # No overlap. - # Any value of df is ok to check its position with respect to pf. - df_idx_merge_start = None - df_idx_merge_end_excl = None - has_overlap = False - # has_df_head = df_min < rg_mins[0] - has_df_tail = not has_df_head - rg_idx_merge_start = None - rg_idx_merge_end_excl = None - - return cls( - has_df_head=has_df_head, - has_overlap=has_overlap, - has_df_tail=has_df_tail, - df_idx_merge_start=df_idx_merge_start, - df_idx_merge_end_excl=df_idx_merge_end_excl, - rg_idx_merge_start=rg_idx_merge_start, - rg_idx_merge_end_excl=rg_idx_merge_end_excl, - df_idx_tmrg_starts=df_idx_tmrg_starts, - df_idx_tmrg_ends_excl=df_idx_tmrg_ends_excl, - tmrg_n_rows=tmrg_n_rows, - n_tmrgs=len(tmrg_n_rows), - sort_rgs_after_rewrite=sort_rgs_after_rewrite, - ) + # No df leading remainder. + return [df_n_rows], True + # In case it has a leading df chunk + # In case DataFrame is leading ParquetFile, but not big enough to make + # a new row group, force the merge to start at its first row. + # df_idx_merge_start = df_idx_tmrg_starts[rg_idx_merge_start] + # has_df_head = ( + # df_idx_merge_start >= row_group_size_target + # if isinstance(row_group_size_target, int) + # else df_min < Timestamp(rg_mins[rg_idx_merge_start]).floor(row_group_size_target) + # ) + # if not has_df_head and df_idx_merge_start: + # If amount of rows at start of df is not enough to make a new + # row group, but merge is not starting at the first row, + # force the merge to at least encompass the first row group, + # rg_idx_merge_end_excl = max(1, rg_idx_merge_end_excl) + # and force the merge to start at the first row in the DataFrame. + # df_idx_merge_start = df_idx_tmrg_starts[rg_idx_merge_start] = 0 + + # Trim row group related lists to the overlapping region. + print("trimming step") + print("rg_idx_merge_start") + print(rg_idx_merge_start) + print("rg_idx_merge_end_excl") + print(rg_idx_merge_end_excl) + + df_idx_tmrg_starts = df_idx_tmrg_starts[rg_idx_merge_start:rg_idx_merge_end_excl] + df_idx_tmrg_ends_excl = df_idx_tmrg_ends_excl[rg_idx_merge_start:rg_idx_merge_end_excl] + chunk_counter = empty((2 * df_idx_tmrg_starts.size), dtype=df_idx_tmrg_starts.dtype) + chunk_counter[0::2] = df_idx_tmrg_starts + chunk_counter[1::2] = df_idx_tmrg_ends_excl + + print("df_idx_tmrg_starts") + print(df_idx_tmrg_starts) + print("df_idx_tmrg_ends_excl") + print(df_idx_tmrg_ends_excl) + return chunk_counter, True + + # Analyze overlap patterns + # Assume no overlap. + + # 'df_idx_tmrg_starts' is a numpy array. Need to check its length to check + # if it is empty. bool(array([0])) is indeed False. + + +# if len(df_idx_tmrg_starts): +# and (df_idx_overlap_start := df_idx_org_starts[0]) != df_n_rows +# and (df_idx_overlap_end_excl := df_idx_org_ends_excl[-1]) != 0): +# Overlap. +# df_idx_merge_start = df_idx_tmrg_starts[0] +# has_df_head = ( +# df_idx_merge_start >= row_group_size_target +# if isinstance(row_group_size_target, int) +# else df_min < Timestamp(rg_mins[rg_idx_merge_start]).floor(row_group_size_target) +# ) +# if not has_df_head and df_idx_merge_start: +# If amount of rows at start of df is not enough to make a new +# row group, but merge is not starting at the first row, force +# the merge to start at the 1st row. +# df_idx_merge_start = df_idx_tmrg_starts[0] = 0 + +# has_tmrgs = True +# if not (df_idx_merge_end_excl := df_idx_tmrg_ends_excl[-1]): +# Specific case when df is after last row group in pf. +# Then force df_idx_merge_end_excl to include all df rows. +# At this stage, all row groups will have been loaded into +# memory. Delaying the merge with the remaining of df in the +# 'has_df_tail' step not more memory efficient. +# df_idx_merge_end_excl = df_n_rows +# has_df_tail = df_idx_merge_end_excl < df_n_rows +# print("df_idx_merge_start") +# print(df_idx_merge_start) +# print("df_idx_merge_end_excl") +# print(df_idx_merge_end_excl) + +# else: +# No row group to merge with DataFrame. +# Any value of df is ok to check its position with respect to pf. +# df_idx_merge_start = None +# df_idx_merge_end_excl = None +# has_tmrgs = False +# has_df_head = df_min < rg_mins[0] +# has_df_tail = not has_df_head +# rg_idx_merge_start = None +# rg_idx_merge_end_excl = None + +# return chunk_counter, sort_rgs_after_write diff --git a/tests/test_store/conftest.py b/tests/test_store/conftest.py index d43c989..ddea0db 100644 --- a/tests/test_store/conftest.py +++ b/tests/test_store/conftest.py @@ -1,29 +1,32 @@ -import pytest from fastparquet import ParquetFile from fastparquet import write from pandas import DataFrame -@pytest.fixture -def create_parquet_file(tmp_path): +def create_parquet_file(tmp_path: str, df: DataFrame, row_group_offsets: int) -> ParquetFile: """ Create a temporary parquet file for testing. Parameters ---------- - tmp_path : Path + tmp_path : str Temporary directory provided by pytest. + df : DataFrame + Data to write to the parquet file. + row_group_offsets : int + Number of rows per row group. Returns ------- - callable - Function that creates a ParquetFile with specified row group size. + ParquetFile + The created parquet file object. - """ - - def _create_parquet(df: DataFrame, row_group_offsets: int) -> ParquetFile: - path = f"{tmp_path}/test.parquet" - write(path, df, row_group_offsets=row_group_offsets, file_scheme="hive") - return ParquetFile(path) + Notes + ----- + The file is created using the 'hive' file scheme and stored in a directory + named 'test_parquet' within the temporary directory. - return _create_parquet + """ + path = f"{tmp_path}/test_parquet" + write(path, df, row_group_offsets=row_group_offsets, file_scheme="hive") + return ParquetFile(path) diff --git a/tests/test_store/test_ordered_merge_info.py b/tests/test_store/test_ordered_merge_info.py index caeaa8c..647c924 100644 --- a/tests/test_store/test_ordered_merge_info.py +++ b/tests/test_store/test_ordered_merge_info.py @@ -11,7 +11,7 @@ from pandas import Timestamp from pandas import date_range -from oups.store.ordered_merge_info import OrderedMergeInfo +from oups.store.ordered_merge_info import analyze_chunks_to_merge from tests.test_store.conftest import create_parquet_file @@ -26,37 +26,25 @@ # 1/ Adding data at complete tail, testing 'drop_duplicates'. # 'max_n_irgs' is never triggered. ( - # Test 0 (0.a.a) / # Max row group size as int. # df connected to incomplete rgs. # Writing after pf data, no incomplete row groups. # rg: 0 1 # pf: [0,1], [2,3] - # df: [3] - "no_drop_duplicates_simple_append", + # df: [3] + "no_drop_duplicates_simple_append_int", [3], [0, 1, 2, 3], [0, 2], # row_group_offsets - 2, # row_group_size_target - False, # drop_duplicates - 2, # max_n_irgs + 2, # row_group_size_target | no irgs to merge with + False, # drop_duplicates | should not merge with preceding rg + 2, # max_n_irgs | no irgs to rewrite { - "has_df_head": False, - "has_overlap": False, - "has_df_tail": True, - "df_idx_merge_start": None, - "df_idx_merge_end_excl": None, - "rg_idx_merge_start": None, - "rg_idx_merge_end_excl": None, - "df_idx_tmrg_starts": [], - "df_idx_tmrg_ends_excl": [], - "tmrg_n_rows": [], - "n_tmrgs": 0, - "sort_rgs_after_rewrite": False, + "chunk_counter": [1], + "sort_rgs_after_write": False, }, ), ( - # Test 1 (0.a.b) / # Max row group size as int. # df connected to incomplete rgs. # Writing at end of pf data, with incomplete row groups at the end @@ -64,61 +52,39 @@ # rg: 0 1 2 # pf: [0,1,2], [6,7,8], [9] # df: [9] - "drop_duplicates_merge_tail", + "drop_duplicates_merge_tail_int", [9], [0, 1, 2, 6, 7, 8, 9], [0, 3, 6], # row_group_offsets - 3, # row_group_size_target - True, # drop_duplicates - 2, # max_n_irgs + 3, # row_group_size_target | should not merge irg + True, # drop_duplicates | should merge with irg + 2, # max_n_irgs | should not rewrite irg { - "has_df_head": False, - "has_overlap": True, - "has_df_tail": False, - "df_idx_merge_start": 0, - "df_idx_merge_end_excl": 1, - "rg_idx_merge_start": 2, - "rg_idx_merge_end_excl": 3, - "df_idx_tmrg_starts": [0], - "df_idx_tmrg_ends_excl": [1], - "tmrg_n_rows": [1], - "n_tmrgs": 1, - "sort_rgs_after_rewrite": True, + "chunk_counter": [0, 1], + "sort_rgs_after_write": True, }, ), ( - # Test 2 (0.b.a) / # Max row group size as freqstr. # df connected to incomplete rgs. # Values not on boundary to check 'floor()'. # Writing after pf data. # rg: 0 1 # pf: [8h10,9h10], [10h10] - # df: [10h10] + # df: [10h10] "no_drop_duplicates_simple_append_timestamp_not_on_boundary", [Timestamp(f"{REF_D}10:10")], date_range(Timestamp(f"{REF_D}08:10"), freq="1h", periods=3), [0, 2], - "2h", # row_group_size_target | not triggered - False, # drop_duplicates - 3, # max_n_irgs | not triggered + "2h", # row_group_size_target | should not merge irg + False, # drop_duplicates | should not merge with preceding rg + 3, # max_n_irgs | should not rewrite irg { - "has_df_head": False, - "has_overlap": False, - "has_df_tail": True, - "df_idx_merge_start": None, - "df_idx_merge_end_excl": None, - "rg_idx_merge_start": None, - "rg_idx_merge_end_excl": None, - "df_idx_tmrg_starts": [], - "df_idx_tmrg_ends_excl": [], - "tmrg_n_rows": [], - "n_tmrgs": 0, - "sort_rgs_after_rewrite": False, + "chunk_counter": [1], + "sort_rgs_after_write": False, }, ), ( - # Test 3 (0.b.b) / # Max row group size as freqstr. # df connected to incomplete rgs. # Values not on boundary to check 'floor()'. @@ -130,57 +96,35 @@ [Timestamp(f"{REF_D}10:10")], date_range(Timestamp(f"{REF_D}08:10"), freq="1h", periods=3), [0, 2], - "2h", # row_group_size_target | not triggered - True, # drop_duplicates - 3, # max_n_irgs | not triggered + "2h", # row_group_size_target | should not merge irg + True, # drop_duplicates | should merge with irg + 3, # max_n_irgs | should not rewrite irg { - "has_df_head": False, - "has_overlap": True, - "has_df_tail": False, - "df_idx_merge_start": 0, - "df_idx_merge_end_excl": 1, - "rg_idx_merge_start": 1, - "rg_idx_merge_end_excl": 2, - "df_idx_tmrg_starts": [0], - "df_idx_tmrg_ends_excl": [1], - "tmrg_n_rows": [1], - "n_tmrgs": 1, - "sort_rgs_after_rewrite": True, + "chunk_counter": [0, 1], + "sort_rgs_after_write": True, }, ), ( - # Test 4 (0.b.c) / # Max row group size as freqstr. # df connected to incomplete rgs. # Values on boundary. # Writing after pf data, incomplete row groups. # rg: 0 1 # pf: [8h00,9h00], [10h00] - # df: [10h00] + # df: [10h00] "no_drop_duplicates_simple_append_timestamp_on_boundary", [Timestamp(f"{REF_D}10:00")], date_range(Timestamp(f"{REF_D}08:00"), freq="1h", periods=3), [0, 2], # row_group_offsets - "2h", # row_group_size_target - False, # drop_duplicates - 3, # max_n_irgs + "2h", # row_group_size_target | should not merge irg + False, # drop_duplicates | should not merge with preceding rg + 3, # max_n_irgs | should not rewrite irg { - "has_df_head": False, - "has_overlap": False, - "has_df_tail": True, - "df_idx_merge_start": None, - "df_idx_merge_end_excl": None, - "rg_idx_merge_start": None, - "rg_idx_merge_end_excl": None, - "df_idx_tmrg_starts": [], - "df_idx_tmrg_ends_excl": [], - "tmrg_n_rows": [], - "n_tmrgs": 0, - "sort_rgs_after_rewrite": False, + "chunk_counter": [1], + "sort_rgs_after_write": False, }, ), ( - # Test 5 (0.b.d) / # Max row group size as freqstr. # df connected to incomplete rgs. # Values on boundary. @@ -192,22 +136,12 @@ [Timestamp(f"{REF_D}10:00")], date_range(Timestamp(f"{REF_D}8:00"), freq="1h", periods=3), [0, 2], # row_group_offsets - "2h", # row_group_size_target - True, # drop_duplicates - 3, # max_n_irgs + "2h", # row_group_size_target | should not merge irg + True, # drop_duplicates | should merge with irg + 3, # max_n_irgs | should not rewrite irg { - "has_df_head": False, - "has_overlap": True, - "has_df_tail": False, - "df_idx_merge_start": 0, - "df_idx_merge_end_excl": 1, - "rg_idx_merge_start": 1, - "rg_idx_merge_end_excl": 2, - "df_idx_tmrg_starts": [0], - "df_idx_tmrg_ends_excl": [1], - "tmrg_n_rows": [1], - "n_tmrgs": 1, - "sort_rgs_after_rewrite": True, + "chunk_counter": [0, 1], + "sort_rgs_after_write": True, }, ), # 2/ Adding data right at the start. @@ -215,8 +149,8 @@ # Test 21 (5.a) / # Max row group size as int. # df at the start of pf data. - # rg: 0 1 2 3 - # pf: [2, 6], [7, 8], [9], [10] + # rg: 0 1 2 3 + # pf: [2, 6], [7, 8], [9], [10] # df: [0,1] "drop_duplicates_insert_at_start_new_rg", [0, 1], @@ -226,18 +160,8 @@ True, # drop_duplicates 2, # max_n_irgs | not triggered { - "has_df_head": True, - "has_overlap": False, - "has_df_tail": False, - "df_idx_merge_start": None, - "df_idx_merge_end_excl": None, - "rg_idx_merge_start": None, - "rg_idx_merge_end_excl": None, - "df_idx_tmrg_starts": [], - "df_idx_tmrg_ends_excl": [], - "tmrg_n_rows": [], - "n_tmrgs": 0, - "sort_rgs_after_rewrite": True, + "chunk_counter": [2], + "sort_rgs_after_write": True, }, ), ( @@ -255,18 +179,8 @@ True, # drop_duplicates 2, # max_n_irgs | not triggered { - "has_df_head": False, - "has_overlap": True, - "has_df_tail": False, - "df_idx_merge_start": 0, - "df_idx_merge_end_excl": 1, - "rg_idx_merge_start": 0, - "rg_idx_merge_end_excl": 1, - "df_idx_tmrg_starts": [0], - "df_idx_tmrg_ends_excl": [1], - "tmrg_n_rows": [2], - "n_tmrgs": 1, - "sort_rgs_after_rewrite": True, + "chunk_counter": [0, 1], + "sort_rgs_after_write": True, }, ), ( @@ -274,8 +188,8 @@ # Max row group size as freqstr. # df at the very start. # df is not overlapping with existing row groups. - # rg: 0 1 2 - # pf: [8h00,9h00], [12h00], [13h00] + # rg: 0 1 2 + # pf: [8h00,9h00], [12h00], [13h00] # df: [7h30] "drop_duplicates_insert_at_start_new_rg_timestamp_not_on_boundary", [Timestamp(f"{REF_D}7:30")], @@ -290,18 +204,8 @@ True, # drop_duplicates 2, # max_n_irgs | should rewrite tail { - "has_df_head": True, - "has_overlap": False, - "has_df_tail": False, - "df_idx_merge_start": None, - "df_idx_merge_end_excl": None, - "rg_idx_merge_start": None, - "rg_idx_merge_end_excl": None, - "df_idx_tmrg_starts": [], - "df_idx_tmrg_ends_excl": [], - "tmrg_n_rows": [], - "n_tmrgs": 0, - "sort_rgs_after_rewrite": True, + "chunk_counter": [1], + "sort_rgs_after_write": True, }, ), ( @@ -325,18 +229,8 @@ True, # drop_duplicates 2, # max_n_irgs | should rewrite tail { - "has_df_head": False, - "has_overlap": True, - "has_df_tail": False, - "df_idx_merge_start": 0, - "df_idx_merge_end_excl": 1, - "rg_idx_merge_start": 0, - "rg_idx_merge_end_excl": 1, - "df_idx_tmrg_starts": [0], - "df_idx_tmrg_ends_excl": [1], - "tmrg_n_rows": [2], - "n_tmrgs": 1, - "sort_rgs_after_rewrite": True, + "chunk_counter": [0, 1], + "sort_rgs_after_write": True, }, ), # 3/ Adding data at complete tail, testing 'max_n_irgs'. @@ -348,7 +242,7 @@ # the end of pf data. # rg: 0 1 2 3 # pf: [0,1,2,6], [7,8,9,10], [11], [12] - # df: [12] + # df: [12] "max_n_irgs_not_reached_simple_append_int", [12], [0, 1, 2, 6, 7, 8, 9, 10, 11, 12], @@ -357,18 +251,8 @@ False, # drop_duplicates 3, # max_n_irgs { - "has_df_head": False, - "has_overlap": False, - "has_df_tail": True, - "df_idx_merge_start": None, - "df_idx_merge_end_excl": None, - "rg_idx_merge_start": None, - "rg_idx_merge_end_excl": None, - "df_idx_tmrg_starts": [], - "df_idx_tmrg_ends_excl": [], - "tmrg_n_rows": [], - "n_tmrgs": 0, - "sort_rgs_after_rewrite": False, + "chunk_counter": [1], + "sort_rgs_after_write": False, }, ), ( @@ -379,7 +263,7 @@ # the end of pf data. # rg: 0 1 2 3 # pf: [0,1,2,6], [7,8,9,10], [11], [12] - # df: [12] + # df: [12] "max_n_irgs_reached_rewrite_tail_int", [12], [0, 1, 2, 6, 7, 8, 9, 10, 11, 12], @@ -388,161 +272,203 @@ False, # drop_duplicates 2, # max_n_irgs { - "has_df_head": False, - "has_overlap": True, - "has_df_tail": False, - "df_idx_merge_start": 0, - "df_idx_merge_end_excl": 1, - "rg_idx_merge_start": 2, - "rg_idx_merge_end_excl": 4, - "df_idx_tmrg_starts": [0, 0], - "df_idx_tmrg_ends_excl": [0, 1], - "tmrg_n_rows": [1, 1], - "n_tmrgs": 2, - "sort_rgs_after_rewrite": True, + "chunk_counter": [0, 0, 0, 1], + "sort_rgs_after_write": True, }, ), ( # Test 6 (1.b) / - # Max row group size as freqstr | df connected to incomplete rgs. + # Max row group size as freqstr. + # df connected to incomplete rgs. # Values on boundary. # Writing after pf data, incomplete row groups. - # row grps: 0 1 2 + # rg: 0 1 2 # pf: [8h00,9h00], [10h00], [11h00] - # df: [11h00] - "append_tail_max_n_irgs_exceeded_timestamp", - DataFrame({"ordered_on": [Timestamp(f"{REF_D}11:00")]}), - DataFrame( - { - "ordered_on": date_range( - Timestamp(f"{REF_D}8:00"), - freq="1h", - periods=4, - ), - }, - ), - [0, 2, 3], + # df: [11h00] + "max_n_irgs_not_reached_simple_append_timestamp", + [Timestamp(f"{REF_D}11:00")], + date_range(Timestamp(f"{REF_D}8:00"), freq="1h", periods=4), + [0, 2, 3], # row_group_offsets "2h", # row_group_size_target False, # drop_duplicates 3, # max_n_irgs - (1, None, False), # bool: need to sort rgs after write + { + "chunk_counter": [1], + "sort_rgs_after_write": False, + }, + ), + ( + # Test 6 (1.b) / + # Max row group size as freqstr. + # df connected to incomplete rgs. + # Values on boundary. + # Writing after pf data, incomplete row groups. + # rg: 0 1 2 + # pf: [8h00,9h00], [10h00], [11h00] + # df: [11h00] + "max_n_irgs_reached_rewrite_tail_timestamp", + [Timestamp(f"{REF_D}11:00")], + date_range(Timestamp(f"{REF_D}8:00"), freq="1h", periods=4), + [0, 2, 3], # row_group_offsets + "2h", # row_group_size_target + False, # drop_duplicates + 2, # max_n_irgs + { + "chunk_counter": [0, 0, 0, 1], + "sort_rgs_after_write": True, + }, ), # 3/ Adding data at complete tail, testing 'max_n_irgs=None'. ( # Test 7 (2.a) / - # Max row group size as int | df connected to incomplete rgs. + # Max row group size as int. + # df connected to incomplete rgs. # Writing at end of pf data, with incomplete row groups at # the end of pf data. - # row grps: 0 1 2 3 + # rg: 0 1 2 3 # pf: [0,1,2,6], [7,8,9,10], [11], [12] - # df: [12] - "append_tail_max_n_irgs_none_int", - DataFrame({"ordered_on": [12]}), - DataFrame({"ordered_on": [0, 1, 2, 6, 7, 8, 9, 10, 11, 12]}), + # df: [12] + "max_n_irgs_none_simple_append_int", + [12], + [0, 1, 2, 6, 7, 8, 9, 10, 11, 12], [0, 4, 8, 9], 4, # row_group_size_target False, # drop_duplicates None, # max_n_irgs - (None, None, False), # bool: need to sort rgs after write + { + "chunk_counter": [1], + "sort_rgs_after_write": False, + }, ), ( # Test 8 (2.b) / - # Max row group size as freqstr | df connected to incomplete rgs. + # Max row group size as freqstr + # df connected to incomplete rgs. # Values on boundary. # Writing after pf data, incomplete row groups. - # row grps: 0 1 2 + # rg: 0 1 2 # pf: [8h00,9h00], [10h00], [11h00] - # df: [11h00] - "append_tail_max_n_irgs_none_timestamp", - DataFrame({"ordered_on": [Timestamp(f"{REF_D}11:00")]}), - DataFrame( - { - "ordered_on": date_range( - Timestamp(f"{REF_D}8:00"), - freq="1h", - periods=4, - ), - }, - ), - [0, 2, 3], + # df: [11h00] + "max_n_irgs_none_simple_append_timestamp", + [Timestamp(f"{REF_D}11:00")], + date_range(Timestamp(f"{REF_D}8:00"), freq="1h", periods=4), + [0, 2, 3], # row_group_offsets "2h", # row_group_size_target False, # drop_duplicates None, # max_n_irgs - (None, None, False), # bool: need to sort rgs after write + { + "chunk_counter": [1], + "sort_rgs_after_write": False, + }, ), # 4/ Adding data just before last incomplete row groups. ( # Test 9 (3.a) / - # Max row group size as int | df connected to incomplete rgs. + # Max row group size as int. + # df connected to incomplete rgs. # Writing at end of pf data, with incomplete row groups at # the end of pf data. # Enough rows to rewrite all tail. - # row grps: 0 1 2 - # pf: [0,1,2], [6,7,8], [10] - # df: [8, 9] - "insert_before_incomplete_rg_rewrite_tail", - DataFrame({"ordered_on": [8, 9]}), - DataFrame({"ordered_on": [0, 1, 2, 6, 7, 8, 10]}), + # rg: 0 1 2 + # pf: [0,1,2], [6,7,8], [11] + # df: [8, 9, 10] + "insert_before_irgs_simple_append_int", + [8, 9, 10], + [0, 1, 2, 6, 7, 8, 11], [0, 3, 6], - 3, # row_group_size_target | should rewrite tail + 3, # row_group_size_target | should not rewrite tail False, # drop_duplicates 3, # max_n_irgs | should not rewrite tail - (2, None, False), # bool: need to sort rgs after write + { + "chunk_counter": [3], + "sort_rgs_after_write": True, + }, ), ( - # Test 10 (3.b) / - # Max row group size as int | df connected to incomplete rgs. + # Test 9 (3.a) / + # Max row group size as int. + # df connected to incomplete rgs. # Writing at end of pf data, with incomplete row groups at # the end of pf data. - # Tail is rewritten because with df, 'row_group_size_target' is - # reached. - # row grps: 0 1 2 - # pf: [0,1,2], [6,7,8], [10] - # df: [8, 9] - "insert_before_incomplete_rg_max_size_reached", - DataFrame({"ordered_on": [8, 9]}), - DataFrame({"ordered_on": [0, 1, 2, 6, 7, 8, 10]}), + # Enough rows to rewrite all tail. + # rg: 0 1 2 + # pf: [0,1,2], [6,7,8], [11] + # df: [8, 9] + "insert_before_irgs_rewrite_tail_int", + [8, 9], + [0, 1, 2, 6, 7, 8, 11], [0, 3, 6], 3, # row_group_size_target | should rewrite tail - True, # drop_duplicates + False, # drop_duplicates + 3, # max_n_irgs | should not rewrite tail + { + "chunk_counter": [0, 2], + "sort_rgs_after_write": True, + }, + ), + ( + # Test 10 (3.b) / + # Max row group size as int. + # df connected to incomplete rgs. + # Incomplete row groups at the end of pf data. + # rg: 0 1 2 + # pf: [0,1,2], [6,7,8], [10] + # df: [8, 9] + "insert_before_irgs_drop_duplicates_rewrite_tail_int", + [8, 9], + [0, 1, 2, 6, 7, 8, 10], + [0, 3, 6], + 3, # row_group_size_target | should rewrite tail + True, # drop_duplicates | merge with preceding rg 3, # max_n_irgs | should not rewrite tail - (1, None, False), # bool: need to sort rgs after write + { + # Other acceptable solution: + # [0, 1, 1, 2] + "chunk_counter": [0, 1, 2, 2], + "sort_rgs_after_write": True, + }, ), ( # Test 11 (3.c) / - # Max row group size as int | df connected to incomplete rgs. + # Max row group size as int. + # df connected to incomplete rgs. # Writing at end of pf data, with incomplete row groups at # the end of pf data. # Tail is rewritten because with df, 'max_n_irgs' is reached. - # row grps: 0 1 2 - # pf: [0,1,2],[6,7,8], [10] - # df: [8] - "insert_before_incomplete_rg_max_n_irgs_reached", - DataFrame({"ordered_on": [8, 9]}), - DataFrame({"ordered_on": [0, 1, 2, 6, 7, 8, 10]}), + # rg: 0 1 2 + # pf: [0,1,2], [6,7,8], [10] + # df: [8] + "insert_before_irgs_drop_duplicates_rewrite_tail_int", + [8], + [0, 1, 2, 6, 7, 8, 10], [0, 3, 6], 3, # row_group_size_target | should not rewrite tail - True, # drop_duplicates - 2, # max_n_irgs | should rewrite tail - (1, None, False), # bool: need to sort rgs after write + True, # drop_duplicates | merge with preceding rg + 3, # max_n_irgs | should not rewrite tail + { + "chunk_counter": [0, 1], + "sort_rgs_after_write": True, + }, ), ( - # Test 12 (3.d) / - # Max row group size as int | df connected to incomplete rgs. - # Writing at end of pf data, with incomplete row groups at - # the end of pf data. - # Should not rewrite all tail. - # row grps: 0 1 2 - # pf: [0,1,2,3],[6,7,8,8], [10] - # df: [8, 9] - "insert_before_incomplete_rg_no_rewrite", - DataFrame({"ordered_on": [8, 9]}), - DataFrame({"ordered_on": [0, 1, 2, 3, 6, 7, 8, 8, 10]}), + # Max row group size as int. + # df connected to incomplete rgs. + # Incomplete row groups at the end of pf data. + # Write of last row group is triggered + # rg: 0 1 2 + # pf: [0,1,2,3],[6,7,8,8], [10] + # df: [8, 9] + "insert_before_irgs_rewrite_tail_int", + [8, 9], + [0, 1, 2, 3, 6, 7, 8, 8, 10], [0, 4, 8], - 4, # row_group_size_target | should not rewrite tail + 4, # row_group_size_target | should merge with next rg. False, # drop_duplicates 3, # max_n_irgs | should not rewrite tail - (None, None, True), # bool: need to sort rgs after write + { + "chunk_counter": [0, 2], + "sort_rgs_after_write": True, + }, ), ( # Test 13 (3.c) / @@ -716,9 +642,17 @@ 2, # max_n_irgs (None, None, True), ), + # Do "island" cases + # pf: [0, 1] [7, 9] [ 15, 16] + # df1 [4, 5 11, 12] # should have df_head, df_tail, no merge? + # df2 [ 8, 11, 12] # should have merge + df_tail? + # df3 [4, 5 8] # should have df_head + merge? + # df4 [4, 5] # here df not to be merged with following row group + # df5 [4, 5, 6] # here, should be merged + # df6 + same with row_group_size_target as str ], ) -def test_ordered_merge_info( +def test_analyze_chunks_to_merge( test_id, df_data, pf_data, @@ -727,12 +661,12 @@ def test_ordered_merge_info( drop_duplicates, max_n_irgs, expected, - create_parquet_file, + tmp_path, ): df = DataFrame({"ordered": df_data}) pf_data = DataFrame({"ordered": pf_data}) - pf = create_parquet_file(pf_data, row_group_offsets=row_group_offsets) - merge_info = OrderedMergeInfo.analyze( + pf = create_parquet_file(tmp_path, pf_data, row_group_offsets=row_group_offsets) + chunk_counter, sort_rgs_after_write = analyze_chunks_to_merge( df=df, pf=pf, ordered_on="ordered", @@ -741,18 +675,8 @@ def test_ordered_merge_info( max_n_irgs=max_n_irgs, ) - assert merge_info.has_df_head == expected["has_df_head"] - assert merge_info.has_overlap == expected["has_overlap"] - assert merge_info.has_df_tail == expected["has_df_tail"] - assert merge_info.df_idx_merge_start == expected["df_idx_merge_start"] - assert merge_info.df_idx_merge_end_excl == expected["df_idx_merge_end_excl"] - assert merge_info.rg_idx_merge_start == expected["rg_idx_merge_start"] - assert merge_info.rg_idx_merge_end_excl == expected["rg_idx_merge_end_excl"] - assert array_equal(merge_info.df_idx_tmrg_starts, expected["df_idx_tmrg_starts"]) - assert array_equal(merge_info.df_idx_tmrg_ends_excl, expected["df_idx_tmrg_ends_excl"]) - assert merge_info.tmrg_n_rows == expected["tmrg_n_rows"] - assert merge_info.n_tmrgs == expected["n_tmrgs"] - assert merge_info.sort_rgs_after_rewrite == expected["sort_rgs_after_rewrite"] + assert array_equal(chunk_counter, expected["chunk_counter"]) + assert sort_rgs_after_write == expected["sort_rgs_after_write"] """ @@ -763,7 +687,7 @@ def test_ordered_merge_info( { "has_pf_head": True, "has_df_head": False, - "has_overlap": True, + "has_tmrgs": True, "has_pf_tail": False, "has_df_tail": True, "df_idx_overlap_start": 0, @@ -782,7 +706,7 @@ def test_ordered_merge_info( { "has_pf_head": True, "has_df_head": False, - "has_overlap": True, + "has_tmrgs": True, "has_pf_tail": False, "has_df_tail": True, "df_idx_overlap_start": 0, @@ -801,7 +725,7 @@ def test_ordered_merge_info( { "has_pf_head": False, "has_df_head": True, - "has_overlap": True, + "has_tmrgs": True, "has_pf_tail": False, "has_df_tail": False, "df_idx_overlap_start": 3, @@ -820,7 +744,7 @@ def test_ordered_merge_info( { "has_pf_head": False, "has_df_head": True, - "has_overlap": True, + "has_tmrgs": True, "has_pf_tail": True, "has_df_tail": False, "df_idx_overlap_start": 4, @@ -839,7 +763,7 @@ def test_ordered_merge_info( { "has_pf_head": False, "has_df_head": True, - "has_overlap": True, + "has_tmrgs": True, "has_pf_tail": True, "has_df_tail": False, "df_idx_overlap_start": 2, @@ -858,7 +782,7 @@ def test_ordered_merge_info( { "has_pf_head": False, "has_df_head": False, - "has_overlap": True, + "has_tmrgs": True, "has_pf_tail": True, "has_df_tail": False, "df_idx_overlap_start": 0, @@ -877,7 +801,7 @@ def test_ordered_merge_info( { "has_pf_head": False, "has_df_head": False, - "has_overlap": True, + "has_tmrgs": True, "has_pf_tail": False, "has_df_tail": True, "df_idx_overlap_start": 0, @@ -896,7 +820,7 @@ def test_ordered_merge_info( { "has_pf_head": True, "has_df_head": False, - "has_overlap": False, + "has_tmrgs": False, "has_pf_tail": False, "has_df_tail": True, "df_idx_overlap_start": None, @@ -915,7 +839,7 @@ def test_ordered_merge_info( { "has_pf_head": False, "has_df_head": True, - "has_overlap": False, + "has_tmrgs": False, "has_pf_tail": True, "has_df_tail": False, "df_idx_overlap_start": None,