diff --git a/ahb_diff/main.py b/ahb_diff/main.py index c90e53c2..35517124 100644 --- a/ahb_diff/main.py +++ b/ahb_diff/main.py @@ -35,7 +35,7 @@ def parse_formatversions(formatversion: str) -> Tuple[int, int]: return year, month -def get_available_formatversions() -> list[str]: +def _get_available_formatversions() -> list[str]: """ get all available directories in SUBMODULE, sorted from latest to oldest. """ @@ -52,7 +52,18 @@ def get_available_formatversions() -> list[str]: return formatversion_dirs -def is_formatversion_empty(formatversion: str) -> bool: +def _get_nachrichtenformat_dirs(formatversion_dir: Path) -> list[Path]: + """ + get all directories that contain a csv subdirectory. + """ + if not formatversion_dir.exists(): + logger.warning("❌formatversion directory not found: %s", formatversion_dir) + return [] + + return [d for d in formatversion_dir.iterdir() if d.is_dir() and (d / "csv").exists() and (d / "csv").is_dir()] + + +def _is_formatversion_dir_empty(formatversion: str) -> bool: """ check if a directory does not contain any directories. """ @@ -60,14 +71,14 @@ def is_formatversion_empty(formatversion: str) -> bool: if not formatversion_dir.exists(): return True - return len(get_nachrichtenformat_dirs(formatversion_dir)) == 0 + return len(_get_nachrichtenformat_dirs(formatversion_dir)) == 0 def determine_consecutive_formatversions() -> list[Tuple[str, str]]: """ generate pairs of consecutive directories to compare and skip empty directories. """ - formatversion_list = get_available_formatversions() + formatversion_list = _get_available_formatversions() consecutive_formatversions = [] for i in range(len(formatversion_list) - 1): @@ -75,7 +86,7 @@ def determine_consecutive_formatversions() -> list[Tuple[str, str]]: previous_formatversion = formatversion_list[i + 1] # skip if either directory is empty. - if is_formatversion_empty(subsequent_formatversion) or is_formatversion_empty(previous_formatversion): + if _is_formatversion_dir_empty(subsequent_formatversion) or _is_formatversion_dir_empty(previous_formatversion): logger.warning( "⚠️skipping empty consecutive formatversions: %s -> %s", subsequent_formatversion, @@ -88,18 +99,7 @@ def determine_consecutive_formatversions() -> list[Tuple[str, str]]: return consecutive_formatversions -def get_nachrichtenformat_dirs(formatversion_dir: Path) -> list[Path]: - """ - get all directories that contain a csv subdirectory. - """ - if not formatversion_dir.exists(): - logger.warning("❌formatversion directory not found: %s", formatversion_dir) - return [] - - return [d for d in formatversion_dir.iterdir() if d.is_dir() and (d / "csv").exists() and (d / "csv").is_dir()] - - -def get_ahb_files(csv_dir: Path) -> list[Path]: +def _get_pruefid_files(csv_dir: Path) -> list[Path]: """ get all ahb/.csv files in a given directory. """ @@ -109,7 +109,9 @@ def get_ahb_files(csv_dir: Path) -> list[Path]: # pylint:disable=too-many-locals -def get_matching_files(previous_formatversion: str, subsequent_formatversion: str) -> list[tuple[Path, Path, str, str]]: +def get_matching_pruefid_files( + previous_formatversion: str, subsequent_formatversion: str +) -> list[tuple[Path, Path, str, str]]: """ find matching ahb/.csv files across and directories. """ @@ -122,8 +124,8 @@ def get_matching_files(previous_formatversion: str, subsequent_formatversion: st matching_files = [] - previous_nachrichtenformat_dirs = get_nachrichtenformat_dirs(previous_formatversion_dir) - subsequent_nachrichtenformat_dirs = get_nachrichtenformat_dirs(subsequent_formatversion_dir) + previous_nachrichtenformat_dirs = _get_nachrichtenformat_dirs(previous_formatversion_dir) + subsequent_nachrichtenformat_dirs = _get_nachrichtenformat_dirs(subsequent_formatversion_dir) previous_nachrichtenformat_names = {d.name: d for d in previous_nachrichtenformat_dirs} subsequent_nachrichtenformat_names = {d.name: d for d in subsequent_nachrichtenformat_dirs} @@ -136,8 +138,8 @@ def get_matching_files(previous_formatversion: str, subsequent_formatversion: st previous_csv_dir = previous_nachrichtenformat_names[nachrichtentyp] / "csv" subsequent_csv_dir = subsequent_nachrichtenformat_names[nachrichtentyp] / "csv" - previous_files = {f.stem: f for f in get_ahb_files(previous_csv_dir)} - subsequent_files = {f.stem: f for f in get_ahb_files(subsequent_csv_dir)} + previous_files = {f.stem: f for f in _get_pruefid_files(previous_csv_dir)} + subsequent_files = {f.stem: f for f in _get_pruefid_files(subsequent_csv_dir)} common_ahbs = set(previous_files.keys()) & set(subsequent_files.keys()) @@ -147,7 +149,7 @@ def get_matching_files(previous_formatversion: str, subsequent_formatversion: st return matching_files -def get_csv(previous_ahb_path: Path, subsequent_ahb_path: Path) -> tuple[DataFrame, DataFrame]: +def _get_csv_content(previous_ahb_path: Path, subsequent_ahb_path: Path) -> tuple[DataFrame, DataFrame]: """ read csv input files. """ @@ -180,7 +182,7 @@ def _populate_row_values( # pylint: disable=too-many-arguments, too-many-positional-arguments def create_row( previous_df: DataFrame | None = None, - new_df: DataFrame | None = None, + subsequent_df: DataFrame | None = None, i: int | None = None, j: int | None = None, previous_formatversion: str = "", @@ -196,16 +198,16 @@ def create_row( if col != f"Segmentname_{previous_formatversion}": row[f"{col}_{previous_formatversion}"] = "" - if new_df is not None: - for col in new_df.columns: + if subsequent_df is not None: + for col in subsequent_df.columns: if col != f"Segmentname_{subsequent_formatversion}": row[f"{col}_{subsequent_formatversion}"] = "" _populate_row_values(previous_df, row, i, previous_formatversion, is_segmentname=True) - _populate_row_values(new_df, row, j, subsequent_formatversion, is_segmentname=True) + _populate_row_values(subsequent_df, row, j, subsequent_formatversion, is_segmentname=True) _populate_row_values(previous_df, row, i, previous_formatversion, is_segmentname=False) - _populate_row_values(new_df, row, j, subsequent_formatversion, is_segmentname=False) + _populate_row_values(subsequent_df, row, j, subsequent_formatversion, is_segmentname=False) return row @@ -221,71 +223,77 @@ def align_columns( aligns `Segmentname` columns by adding empty cells each time the cell values do not match. """ # add corresponding formatversions as suffixes to columns. - df_old = previous_pruefid.copy() - df_new = subsequent_pruefid.copy() - df_old = df_old.rename(columns={"Segmentname": f"Segmentname_{previous_formatversion}"}) - df_new = df_new.rename(columns={"Segmentname": f"Segmentname_{subsequent_formatversion}"}) + df_of_previous_formatversion = previous_pruefid.copy() + df_of_subsequent_formatversion = subsequent_pruefid.copy() + df_of_previous_formatversion = df_of_previous_formatversion.rename( + columns={"Segmentname": f"Segmentname_{previous_formatversion}"} + ) + df_of_subsequent_formatversion = df_of_subsequent_formatversion.rename( + columns={"Segmentname": f"Segmentname_{subsequent_formatversion}"} + ) # preserve column order. - old_columns = [col for col in previous_pruefid.columns if col != "Segmentname"] - new_columns = [col for col in subsequent_pruefid.columns if col != "Segmentname"] + columns_of_previous_formatversion = [col for col in previous_pruefid.columns if col != "Segmentname"] + columns_of_subsequent_formatversion = [col for col in subsequent_pruefid.columns if col != "Segmentname"] column_order = ( [f"Segmentname_{previous_formatversion}"] - + [f"{col}_{previous_formatversion}" for col in old_columns] + + [f"{col}_{previous_formatversion}" for col in columns_of_previous_formatversion] + ["diff"] + [f"Segmentname_{subsequent_formatversion}"] - + [f"{col}_{subsequent_formatversion}" for col in new_columns] + + [f"{col}_{subsequent_formatversion}" for col in columns_of_subsequent_formatversion] ) - if df_old.empty and df_new.empty: + if df_of_previous_formatversion.empty and df_of_subsequent_formatversion.empty: return pd.DataFrame({col: pd.Series([], dtype="float64") for col in column_order}) - if df_new.empty: + if df_of_subsequent_formatversion.empty: result_rows = [ create_row( - previous_df=df_old, - new_df=df_new, + previous_df=df_of_previous_formatversion, + subsequent_df=df_of_subsequent_formatversion, i=i, previous_formatversion=previous_formatversion, subsequent_formatversion=subsequent_formatversion, ) - for i in range(len(df_old)) + for i in range(len(df_of_previous_formatversion)) ] for row in result_rows: row["diff"] = "REMOVED" result_df = pd.DataFrame(result_rows) return result_df[column_order] - if df_old.empty: + if df_of_previous_formatversion.empty: result_rows = [ create_row( - previous_df=df_old, - new_df=df_new, + previous_df=df_of_previous_formatversion, + subsequent_df=df_of_subsequent_formatversion, j=j, previous_formatversion=previous_formatversion, subsequent_formatversion=subsequent_formatversion, ) - for j in range(len(df_new)) + for j in range(len(df_of_subsequent_formatversion)) ] for row in result_rows: row["diff"] = "NEW" result_df = pd.DataFrame(result_rows) return result_df[column_order] - segments_old = df_old[f"Segmentname_{previous_formatversion}"].tolist() - segments_new = df_new[f"Segmentname_{subsequent_formatversion}"].tolist() + segments_of_previous_formatversion = df_of_previous_formatversion[f"Segmentname_{previous_formatversion}"].tolist() + segments_of_subsequent_formatversion = df_of_subsequent_formatversion[ + f"Segmentname_{subsequent_formatversion}" + ].tolist() result_rows = [] i = 0 j = 0 # iterate through both lists until reaching their ends. - while i < len(segments_old) or j < len(segments_new): - if i >= len(segments_old): + while i < len(segments_of_previous_formatversion) or j < len(segments_of_subsequent_formatversion): + if i >= len(segments_of_previous_formatversion): row = create_row( - previous_df=df_old, - new_df=df_new, + previous_df=df_of_previous_formatversion, + subsequent_df=df_of_subsequent_formatversion, j=j, previous_formatversion=previous_formatversion, subsequent_formatversion=subsequent_formatversion, @@ -293,10 +301,10 @@ def align_columns( row["diff"] = "NEW" result_rows.append(row) j += 1 - elif j >= len(segments_new): + elif j >= len(segments_of_subsequent_formatversion): row = create_row( - previous_df=df_old, - new_df=df_new, + previous_df=df_of_previous_formatversion, + subsequent_df=df_of_subsequent_formatversion, i=i, previous_formatversion=previous_formatversion, subsequent_formatversion=subsequent_formatversion, @@ -304,10 +312,10 @@ def align_columns( row["diff"] = "REMOVED" result_rows.append(row) i += 1 - elif segments_old[i] == segments_new[j]: + elif segments_of_previous_formatversion[i] == segments_of_subsequent_formatversion[j]: row = create_row( - previous_df=df_old, - new_df=df_new, + previous_df=df_of_previous_formatversion, + subsequent_df=df_of_subsequent_formatversion, i=i, j=j, previous_formatversion=previous_formatversion, @@ -320,11 +328,11 @@ def align_columns( else: try: # try to find next matching value. - next_match_new = segments_new[j:].index(segments_old[i]) - for _ in range(next_match_new): + next_match = segments_of_subsequent_formatversion[j:].index(segments_of_previous_formatversion[i]) + for _ in range(next_match): row = create_row( - previous_df=df_old, - new_df=df_new, + previous_df=df_of_previous_formatversion, + subsequent_df=df_of_subsequent_formatversion, j=j, previous_formatversion=previous_formatversion, subsequent_formatversion=subsequent_formatversion, @@ -336,8 +344,8 @@ def align_columns( except ValueError: # no match found: add old value and empty new cell. row = create_row( - previous_df=df_old, - new_df=df_new, + previous_df=df_of_previous_formatversion, + subsequent_df=df_of_subsequent_formatversion, i=i, previous_formatversion=previous_formatversion, subsequent_formatversion=subsequent_formatversion, @@ -460,11 +468,11 @@ def _try_convert_to_number(cell: str) -> int | float | str: logger.info("✅successfully exported XLSX file to: %s", {output_path_xlsx}) -def process_files(previous_formatversion: str, subsequent_formatversion: str) -> None: +def _process_files(previous_formatversion: str, subsequent_formatversion: str) -> None: """ process all matching ahb/.csv files between two directories. """ - matching_files = get_matching_files(previous_formatversion, subsequent_formatversion) + matching_files = get_matching_pruefid_files(previous_formatversion, subsequent_formatversion) if not matching_files: logger.warning("No matching files found to compare") @@ -472,12 +480,19 @@ def process_files(previous_formatversion: str, subsequent_formatversion: str) -> output_base = OUTPUT_DIR / f"{subsequent_formatversion}_{previous_formatversion}" - for old_file, new_file, nachrichtentyp, pruefid in matching_files: + for previous_pruefid, subsequent_pruefid, nachrichtentyp, pruefid in matching_files: logger.info("Processing %s - %s", nachrichtentyp, pruefid) try: - df_old, df_new = get_csv(old_file, new_file) - merged_df = align_columns(df_old, df_new, previous_formatversion, subsequent_formatversion) + df_of_previous_formatversion, df_of_subsequent_formatversion = _get_csv_content( + previous_pruefid, subsequent_pruefid + ) + merged_df = align_columns( + df_of_previous_formatversion, + df_of_subsequent_formatversion, + previous_formatversion, + subsequent_formatversion, + ) output_dir = output_base / nachrichtentyp output_dir.mkdir(parents=True, exist_ok=True) @@ -498,7 +513,7 @@ def process_files(previous_formatversion: str, subsequent_formatversion: str) -> logger.error("❌data processing error for %s/%s: %s", nachrichtentyp, pruefid, str(e)) -def process_submodule() -> None: +def _process_submodule() -> None: """ processes all valid consecutive subdirectories. """ @@ -513,7 +528,7 @@ def process_submodule() -> None: "⌛processing consecutive formatversions: %s -> %s", subsequent_formatversion, previous_formatversion ) try: - process_files(previous_formatversion, subsequent_formatversion) + _process_files(previous_formatversion, subsequent_formatversion) except (OSError, pd.errors.EmptyDataError, ValueError) as e: logger.error( "❌error processing formatversions %s -> %s: %s", @@ -525,4 +540,4 @@ def process_submodule() -> None: if __name__ == "__main__": - process_submodule() + _process_submodule() diff --git a/unittests/test_parse_submodule.py b/unittests/test_parse_submodule.py index b3ae583c..a3f2578b 100644 --- a/unittests/test_parse_submodule.py +++ b/unittests/test_parse_submodule.py @@ -3,7 +3,7 @@ import pytest from _pytest.monkeypatch import MonkeyPatch -from ahb_diff.main import determine_consecutive_formatversions, get_matching_files, parse_formatversions +from ahb_diff.main import determine_consecutive_formatversions, get_matching_pruefid_files, parse_formatversions def test_parse_valid_formatversions() -> None: @@ -35,7 +35,7 @@ def test_parse_invalid_formatversions() -> None: def test_get_matching_files(tmp_path: Path, monkeypatch: MonkeyPatch) -> None: """ - test finding matching files across formatversions. + test find matching files across formatversions. """ monkeypatch.setattr("ahb_diff.main.SUBMODULE", tmp_path) @@ -59,7 +59,7 @@ def test_get_matching_files(tmp_path: Path, monkeypatch: MonkeyPatch) -> None: for file, content in files.items(): (nachrichtenformat_dir / file).write_text(content) - matches = get_matching_files("FV2410", "FV2504") + matches = get_matching_pruefid_files("FV2410", "FV2504") assert len(matches) == 2 assert matches[0][2] == "nachrichtenformat_1" @@ -69,14 +69,14 @@ def test_get_matching_files(tmp_path: Path, monkeypatch: MonkeyPatch) -> None: def test_determine_consecutive_formatversions(tmp_path: Path, monkeypatch: MonkeyPatch) -> None: """ - test determining consecutive formatversions. + test successful determination of consecutive formatversions. """ monkeypatch.setattr("ahb_diff.main.SUBMODULE", tmp_path) submodule: dict[str, dict[str, bool | dict[str, str]]] = { "FV2504": {"nachrichtenformat_1": True}, "FV2410": {"nachrichtenformat_1": True}, - "FV2404": {}, # Empty directory + "FV2404": {}, "FV2310": {"nachrichtenformat_1": True}, }