Skip to content

Commit

Permalink
Improve function names and variable names across the whole project
Browse files Browse the repository at this point in the history
  • Loading branch information
OLILHR committed Nov 17, 2024
1 parent 029d707 commit 7736fb0
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 76 deletions.
157 changes: 86 additions & 71 deletions ahb_diff/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def parse_formatversions(formatversion: str) -> Tuple[int, int]:
return year, month


def get_available_formatversions() -> list[str]:
def _get_available_formatversions() -> list[str]:
"""
get all available <formatversion> directories in SUBMODULE, sorted from latest to oldest.
"""
Expand All @@ -52,30 +52,41 @@ def get_available_formatversions() -> list[str]:
return formatversion_dirs


def is_formatversion_empty(formatversion: str) -> bool:
def _get_nachrichtenformat_dirs(formatversion_dir: Path) -> list[Path]:
"""
get all <nachrichtenformat> directories that contain a csv subdirectory.
"""
if not formatversion_dir.exists():
logger.warning("❌formatversion directory not found: %s", formatversion_dir)
return []

return [d for d in formatversion_dir.iterdir() if d.is_dir() and (d / "csv").exists() and (d / "csv").is_dir()]


def _is_formatversion_dir_empty(formatversion: str) -> bool:
"""
check if a <formatversion> directory does not contain any <nachrichtenformat> directories.
"""
formatversion_dir = SUBMODULE / formatversion
if not formatversion_dir.exists():
return True

return len(get_nachrichtenformat_dirs(formatversion_dir)) == 0
return len(_get_nachrichtenformat_dirs(formatversion_dir)) == 0


def determine_consecutive_formatversions() -> list[Tuple[str, str]]:
"""
generate pairs of consecutive <formatversion> directories to compare and skip empty directories.
"""
formatversion_list = get_available_formatversions()
formatversion_list = _get_available_formatversions()
consecutive_formatversions = []

for i in range(len(formatversion_list) - 1):
subsequent_formatversion = formatversion_list[i]
previous_formatversion = formatversion_list[i + 1]

# skip if either directory is empty.
if is_formatversion_empty(subsequent_formatversion) or is_formatversion_empty(previous_formatversion):
if _is_formatversion_dir_empty(subsequent_formatversion) or _is_formatversion_dir_empty(previous_formatversion):
logger.warning(
"⚠️skipping empty consecutive formatversions: %s -> %s",
subsequent_formatversion,
Expand All @@ -88,18 +99,7 @@ def determine_consecutive_formatversions() -> list[Tuple[str, str]]:
return consecutive_formatversions


def get_nachrichtenformat_dirs(formatversion_dir: Path) -> list[Path]:
"""
get all <nachrichtenformat> directories that contain a csv subdirectory.
"""
if not formatversion_dir.exists():
logger.warning("❌formatversion directory not found: %s", formatversion_dir)
return []

return [d for d in formatversion_dir.iterdir() if d.is_dir() and (d / "csv").exists() and (d / "csv").is_dir()]


def get_ahb_files(csv_dir: Path) -> list[Path]:
def _get_pruefid_files(csv_dir: Path) -> list[Path]:
"""
get all ahb/<pruefid>.csv files in a given directory.
"""
Expand All @@ -109,7 +109,9 @@ def get_ahb_files(csv_dir: Path) -> list[Path]:


# pylint:disable=too-many-locals
def get_matching_files(previous_formatversion: str, subsequent_formatversion: str) -> list[tuple[Path, Path, str, str]]:
def get_matching_pruefid_files(
previous_formatversion: str, subsequent_formatversion: str
) -> list[tuple[Path, Path, str, str]]:
"""
find matching ahb/<pruefid>.csv files across <formatversion> and <nachrichtenformat> directories.
"""
Expand All @@ -122,8 +124,8 @@ def get_matching_files(previous_formatversion: str, subsequent_formatversion: st

matching_files = []

previous_nachrichtenformat_dirs = get_nachrichtenformat_dirs(previous_formatversion_dir)
subsequent_nachrichtenformat_dirs = get_nachrichtenformat_dirs(subsequent_formatversion_dir)
previous_nachrichtenformat_dirs = _get_nachrichtenformat_dirs(previous_formatversion_dir)
subsequent_nachrichtenformat_dirs = _get_nachrichtenformat_dirs(subsequent_formatversion_dir)

previous_nachrichtenformat_names = {d.name: d for d in previous_nachrichtenformat_dirs}
subsequent_nachrichtenformat_names = {d.name: d for d in subsequent_nachrichtenformat_dirs}
Expand All @@ -136,8 +138,8 @@ def get_matching_files(previous_formatversion: str, subsequent_formatversion: st
previous_csv_dir = previous_nachrichtenformat_names[nachrichtentyp] / "csv"
subsequent_csv_dir = subsequent_nachrichtenformat_names[nachrichtentyp] / "csv"

previous_files = {f.stem: f for f in get_ahb_files(previous_csv_dir)}
subsequent_files = {f.stem: f for f in get_ahb_files(subsequent_csv_dir)}
previous_files = {f.stem: f for f in _get_pruefid_files(previous_csv_dir)}
subsequent_files = {f.stem: f for f in _get_pruefid_files(subsequent_csv_dir)}

common_ahbs = set(previous_files.keys()) & set(subsequent_files.keys())

Expand All @@ -147,7 +149,7 @@ def get_matching_files(previous_formatversion: str, subsequent_formatversion: st
return matching_files


def get_csv(previous_ahb_path: Path, subsequent_ahb_path: Path) -> tuple[DataFrame, DataFrame]:
def _get_csv_content(previous_ahb_path: Path, subsequent_ahb_path: Path) -> tuple[DataFrame, DataFrame]:
"""
read csv input files.
"""
Expand Down Expand Up @@ -180,7 +182,7 @@ def _populate_row_values(
# pylint: disable=too-many-arguments, too-many-positional-arguments
def create_row(
previous_df: DataFrame | None = None,
new_df: DataFrame | None = None,
subsequent_df: DataFrame | None = None,
i: int | None = None,
j: int | None = None,
previous_formatversion: str = "",
Expand All @@ -196,16 +198,16 @@ def create_row(
if col != f"Segmentname_{previous_formatversion}":
row[f"{col}_{previous_formatversion}"] = ""

if new_df is not None:
for col in new_df.columns:
if subsequent_df is not None:
for col in subsequent_df.columns:
if col != f"Segmentname_{subsequent_formatversion}":
row[f"{col}_{subsequent_formatversion}"] = ""

_populate_row_values(previous_df, row, i, previous_formatversion, is_segmentname=True)
_populate_row_values(new_df, row, j, subsequent_formatversion, is_segmentname=True)
_populate_row_values(subsequent_df, row, j, subsequent_formatversion, is_segmentname=True)

_populate_row_values(previous_df, row, i, previous_formatversion, is_segmentname=False)
_populate_row_values(new_df, row, j, subsequent_formatversion, is_segmentname=False)
_populate_row_values(subsequent_df, row, j, subsequent_formatversion, is_segmentname=False)

return row

Expand All @@ -221,93 +223,99 @@ def align_columns(
aligns `Segmentname` columns by adding empty cells each time the cell values do not match.
"""
# add corresponding formatversions as suffixes to columns.
df_old = previous_pruefid.copy()
df_new = subsequent_pruefid.copy()
df_old = df_old.rename(columns={"Segmentname": f"Segmentname_{previous_formatversion}"})
df_new = df_new.rename(columns={"Segmentname": f"Segmentname_{subsequent_formatversion}"})
df_of_previous_formatversion = previous_pruefid.copy()
df_of_subsequent_formatversion = subsequent_pruefid.copy()
df_of_previous_formatversion = df_of_previous_formatversion.rename(
columns={"Segmentname": f"Segmentname_{previous_formatversion}"}
)
df_of_subsequent_formatversion = df_of_subsequent_formatversion.rename(
columns={"Segmentname": f"Segmentname_{subsequent_formatversion}"}
)

# preserve column order.
old_columns = [col for col in previous_pruefid.columns if col != "Segmentname"]
new_columns = [col for col in subsequent_pruefid.columns if col != "Segmentname"]
columns_of_previous_formatversion = [col for col in previous_pruefid.columns if col != "Segmentname"]
columns_of_subsequent_formatversion = [col for col in subsequent_pruefid.columns if col != "Segmentname"]

column_order = (
[f"Segmentname_{previous_formatversion}"]
+ [f"{col}_{previous_formatversion}" for col in old_columns]
+ [f"{col}_{previous_formatversion}" for col in columns_of_previous_formatversion]
+ ["diff"]
+ [f"Segmentname_{subsequent_formatversion}"]
+ [f"{col}_{subsequent_formatversion}" for col in new_columns]
+ [f"{col}_{subsequent_formatversion}" for col in columns_of_subsequent_formatversion]
)

if df_old.empty and df_new.empty:
if df_of_previous_formatversion.empty and df_of_subsequent_formatversion.empty:
return pd.DataFrame({col: pd.Series([], dtype="float64") for col in column_order})

if df_new.empty:
if df_of_subsequent_formatversion.empty:
result_rows = [
create_row(
previous_df=df_old,
new_df=df_new,
previous_df=df_of_previous_formatversion,
subsequent_df=df_of_subsequent_formatversion,
i=i,
previous_formatversion=previous_formatversion,
subsequent_formatversion=subsequent_formatversion,
)
for i in range(len(df_old))
for i in range(len(df_of_previous_formatversion))
]
for row in result_rows:
row["diff"] = "REMOVED"
result_df = pd.DataFrame(result_rows)
return result_df[column_order]

if df_old.empty:
if df_of_previous_formatversion.empty:
result_rows = [
create_row(
previous_df=df_old,
new_df=df_new,
previous_df=df_of_previous_formatversion,
subsequent_df=df_of_subsequent_formatversion,
j=j,
previous_formatversion=previous_formatversion,
subsequent_formatversion=subsequent_formatversion,
)
for j in range(len(df_new))
for j in range(len(df_of_subsequent_formatversion))
]
for row in result_rows:
row["diff"] = "NEW"
result_df = pd.DataFrame(result_rows)
return result_df[column_order]

segments_old = df_old[f"Segmentname_{previous_formatversion}"].tolist()
segments_new = df_new[f"Segmentname_{subsequent_formatversion}"].tolist()
segments_of_previous_formatversion = df_of_previous_formatversion[f"Segmentname_{previous_formatversion}"].tolist()
segments_of_subsequent_formatversion = df_of_subsequent_formatversion[
f"Segmentname_{subsequent_formatversion}"
].tolist()
result_rows = []

i = 0
j = 0

# iterate through both lists until reaching their ends.
while i < len(segments_old) or j < len(segments_new):
if i >= len(segments_old):
while i < len(segments_of_previous_formatversion) or j < len(segments_of_subsequent_formatversion):
if i >= len(segments_of_previous_formatversion):
row = create_row(
previous_df=df_old,
new_df=df_new,
previous_df=df_of_previous_formatversion,
subsequent_df=df_of_subsequent_formatversion,
j=j,
previous_formatversion=previous_formatversion,
subsequent_formatversion=subsequent_formatversion,
)
row["diff"] = "NEW"
result_rows.append(row)
j += 1
elif j >= len(segments_new):
elif j >= len(segments_of_subsequent_formatversion):
row = create_row(
previous_df=df_old,
new_df=df_new,
previous_df=df_of_previous_formatversion,
subsequent_df=df_of_subsequent_formatversion,
i=i,
previous_formatversion=previous_formatversion,
subsequent_formatversion=subsequent_formatversion,
)
row["diff"] = "REMOVED"
result_rows.append(row)
i += 1
elif segments_old[i] == segments_new[j]:
elif segments_of_previous_formatversion[i] == segments_of_subsequent_formatversion[j]:
row = create_row(
previous_df=df_old,
new_df=df_new,
previous_df=df_of_previous_formatversion,
subsequent_df=df_of_subsequent_formatversion,
i=i,
j=j,
previous_formatversion=previous_formatversion,
Expand All @@ -320,11 +328,11 @@ def align_columns(
else:
try:
# try to find next matching value.
next_match_new = segments_new[j:].index(segments_old[i])
for _ in range(next_match_new):
next_match = segments_of_subsequent_formatversion[j:].index(segments_of_previous_formatversion[i])
for _ in range(next_match):
row = create_row(
previous_df=df_old,
new_df=df_new,
previous_df=df_of_previous_formatversion,
subsequent_df=df_of_subsequent_formatversion,
j=j,
previous_formatversion=previous_formatversion,
subsequent_formatversion=subsequent_formatversion,
Expand All @@ -336,8 +344,8 @@ def align_columns(
except ValueError:
# no match found: add old value and empty new cell.
row = create_row(
previous_df=df_old,
new_df=df_new,
previous_df=df_of_previous_formatversion,
subsequent_df=df_of_subsequent_formatversion,
i=i,
previous_formatversion=previous_formatversion,
subsequent_formatversion=subsequent_formatversion,
Expand Down Expand Up @@ -460,24 +468,31 @@ def _try_convert_to_number(cell: str) -> int | float | str:
logger.info("✅successfully exported XLSX file to: %s", {output_path_xlsx})


def process_files(previous_formatversion: str, subsequent_formatversion: str) -> None:
def _process_files(previous_formatversion: str, subsequent_formatversion: str) -> None:
"""
process all matching ahb/<pruefid>.csv files between two <formatversion> directories.
"""
matching_files = get_matching_files(previous_formatversion, subsequent_formatversion)
matching_files = get_matching_pruefid_files(previous_formatversion, subsequent_formatversion)

if not matching_files:
logger.warning("No matching files found to compare")
return

output_base = OUTPUT_DIR / f"{subsequent_formatversion}_{previous_formatversion}"

for old_file, new_file, nachrichtentyp, pruefid in matching_files:
for previous_pruefid, subsequent_pruefid, nachrichtentyp, pruefid in matching_files:
logger.info("Processing %s - %s", nachrichtentyp, pruefid)

try:
df_old, df_new = get_csv(old_file, new_file)
merged_df = align_columns(df_old, df_new, previous_formatversion, subsequent_formatversion)
df_of_previous_formatversion, df_of_subsequent_formatversion = _get_csv_content(
previous_pruefid, subsequent_pruefid
)
merged_df = align_columns(
df_of_previous_formatversion,
df_of_subsequent_formatversion,
previous_formatversion,
subsequent_formatversion,
)

output_dir = output_base / nachrichtentyp
output_dir.mkdir(parents=True, exist_ok=True)
Expand All @@ -498,7 +513,7 @@ def process_files(previous_formatversion: str, subsequent_formatversion: str) ->
logger.error("❌data processing error for %s/%s: %s", nachrichtentyp, pruefid, str(e))


def process_submodule() -> None:
def _process_submodule() -> None:
"""
processes all valid consecutive <formatversion> subdirectories.
"""
Expand All @@ -513,7 +528,7 @@ def process_submodule() -> None:
"⌛processing consecutive formatversions: %s -> %s", subsequent_formatversion, previous_formatversion
)
try:
process_files(previous_formatversion, subsequent_formatversion)
_process_files(previous_formatversion, subsequent_formatversion)
except (OSError, pd.errors.EmptyDataError, ValueError) as e:
logger.error(
"❌error processing formatversions %s -> %s: %s",
Expand All @@ -525,4 +540,4 @@ def process_submodule() -> None:


if __name__ == "__main__":
process_submodule()
_process_submodule()
Loading

0 comments on commit 7736fb0

Please sign in to comment.