From 91c7cbe439c40bf8769500655732227de563a90a Mon Sep 17 00:00:00 2001 From: bikegeek Date: Tue, 26 Mar 2024 17:54:02 -0600 Subject: [PATCH] Issue #240 support to reformat the TCDiag linetype output from TC-Pairs output, includes labelling all TCMPR headers --- METreformat/write_stat_ascii.py | 301 ++++++++++++++++++++++++++++++-- 1 file changed, 290 insertions(+), 11 deletions(-) diff --git a/METreformat/write_stat_ascii.py b/METreformat/write_stat_ascii.py index 4d41889f..ed09bbae 100644 --- a/METreformat/write_stat_ascii.py +++ b/METreformat/write_stat_ascii.py @@ -23,7 +23,7 @@ import logging import time import pathlib -from typing import List, Set +from typing import List import numpy as np import pandas as pd import yaml @@ -115,7 +115,7 @@ def write_stat_ascii(self, stat_data: pd.DataFrame, parms: dict) -> pd.DataFrame # ---------------------------------- supported_linetypes = [cn.FHO, cn.CNT, cn.VCNT, cn.CTC, cn.CTS, cn.MCTS, cn.SL1L2, cn.ECNT, cn.PCT, - cn.RHIST] + cn.RHIST, cn.TCDIAG] # Different formats based on the line types. Most METplotpy plots accept the long format where # all stats are under the stat_name and stat_value columns and the confidence limits under the @@ -125,7 +125,12 @@ def write_stat_ascii(self, stat_data: pd.DataFrame, parms: dict) -> pd.DataFrame working_df = stat_data.copy(deep=True) linetype_requested = str( parms['line_type']).upper() if linetype_requested in supported_linetypes: - working_df = working_df.loc[working_df['line_type'] == linetype_requested] + # If the TCDiag linetype is requested, keep both the TCDiag and TCMPR linetypes. + if linetype_requested == cn.TCDIAG: + working_df = working_df.loc[(working_df['line_type'] == linetype_requested) | + (working_df['line_type'] == cn.TCMPR)] + else: + working_df = working_df.loc[working_df['line_type'] == linetype_requested] else: logging.ERROR("Requested line type is currently not supported for reformatting") raise ValueError("Requested line type ", linetype_requested, " is currently not supported for reformatting") @@ -292,6 +297,12 @@ def process_by_stat_linetype(self, linetype: str, stat_data: pd.DataFrame, is_ag linetype_data: pd.DataFrame = self.process_rhist(stat_data) else: linetype_data: pd.DataFrame = self.process_rhist_for_agg(stat_data) + + # TCDIAG (from MET TC-Pairs output) + elif linetype == cn.TCDIAG: + # No need to support additional reformatting for agg_stat. + linetype_data: pd.DataFrame = self.process_tcdiag(stat_data) + else: return None @@ -432,7 +443,6 @@ def process_pct(self, stat_data: pd.DataFrame) -> pd.DataFrame: on_cols = [] i_value = [] working_headers = working_copy_df.columns.to_list() - print(f"working headers: {working_headers}") remaining_columns = working_headers[cn.NUM_STATIC_PCT_COLS:] for cur in remaining_columns: match_thresh = re.match(r'(thresh_)(\d+)', cur) @@ -546,9 +556,6 @@ def process_rhist(self, stat_data: pd.DataFrame) -> pd.DataFrame: # Replace the first two numbered labels (following the LINETYPE column) with the TOTAL and N_RANK labels working_df.rename(columns={'0':'total', cn.LINE_VAR_COUNTER[cn.RHIST]:'n_rank'}, inplace=True) - # Relabel the remaining numbered column headers - last_column_name = len(working_df.columns) - cn.NUM_STATIC_RHIST_COLS - # Relabel the repeating columns (RANK_1, ..., RANK_n) # column names are numbered '1','2','3',...,etc. by METdbLoad. # Give them descriptive labels: rank_1, rank_2, etc. @@ -1450,6 +1457,275 @@ def process_ecnt_for_agg(self, stat_data: pd.DataFrame) -> pd.DataFrame: return merged_dfs + def process_tcdiag(self, stat_data: pd.DataFrame) -> pd.DataFrame: + """ + Reformat the TCMPR and TCDiag linetype data. Reformat the TCMPR linetype into one dataframe, + then the TCDiag linetype into another dataframe. Perform a left join to capture all the data into + a single row for the same model, init time, valid time, fcst time, etc. This results in fewer rows. + + To reformat the TCMPR linetype data, label all the unnamed headers (i.e. those with numbers '1', '2', ...,) + with the corresponding name as specified in the MET User's Guide, section 24.2. + + To reformat the TCDiag linetype data, collect the DIAG_i values into their own columns: + e.g. if N_DIAG is 4 then: + DIAG_1 = SHR_MAG with VALUE_1 = 1 + DIAG_2 = STM_SPD with VALUE_2 = 15 + DIAG_3 = TPW with VALUE_3 = 63 + DIAG_4 = LAND with VALUE_4 = 307 + + will look like this (the standard/common columns will precede these columns): + + SHR_MAG STM_SPD TPW LAND + 1 15 63 307 + + This will resemble the TCMPR linetype's output file, where every column has a header name/column name. + + Arguments: + @param stat_data: The original input data, containing both TCMPR and TCDIAG linetype rows. + + Returns: + full_df: the reformatted dataframe with all unlabelled columns under the appropriate header/column name + for the TCMPR linetype. For the TCDIAG linetype, the DIAG_i VALUE_i pairs are consolidated + under the name of the DIAG_i value. The TCMPR and TCDIAG columns are consolidated into + the same rows via an inner join. + + """ + + begin_tcdiag = time.perf_counter() + + # Provide appropriate names for the TCMPR headers (replacing numbered columns i.e. '1', '2',..., etc. with + # the column names specified in the MET User's Guide TC-Pairs section). + tcmpr_df = stat_data.loc[stat_data['line_type'] == cn.TCMPR] + reformatted_tcmpr = self.reformat_tcmpr(tcmpr_df) + reformatted_tcmpr.to_csv("/Users/minnawin/feature_240_reformat_tcdiag/METdataio/METreformat/output/tcmpr_reformatted.txt", sep='\t', index=False) + + # Perform reformatting for the TCDiag linetype + # Determine the columns for the line type + linetype: str = cn.TCDIAG + + # + # Subset the input dataframe to include only the TCDIAG columns and label the remaining + # "unlabelled" (i.e. labelled with numbers after data is read in by METdbLoad) + # columns/headers. + # + # Do not assume that the input data contains only the TCDIAG lines. Since the TCDIAG linetype + # is available from the MET TC-Pairs tool, it is very likely that TCMPR line type data will also be + # present in the input data file(s). + stat_data.to_csv('/Users/minnawin/feature_240_reformat_tcdiag/METdataio/METreformat/output/all_tcpairs_tcdiag.txt', sep='\t') + all_tcdiag_df = stat_data.loc[stat_data['line_type'] == linetype] + all_tcdiag_df.to_csv('/Users/minnawin/feature_240_reformat_tcdiag/METdataio/METreformat/output/all_tcdiag.txt', sep='\t') + + # Subset based on the DIAG_SOURCE, these provide different diaganostic measurements (i.e. columns). + # Join all the subsets into one final dataframe. + + # Get the diagnostic sources (DIAG_SOURCE column) + diag_src_col_name = cn.TCDIAG_DIAG_SOURCE_COLNAME + all_diag_sources:np.narray = all_tcdiag_df[diag_src_col_name].unique() + diag_sources:list = sorted(all_diag_sources) + + reformatted_dfs = [] + subset_df = all_tcdiag_df.copy(deep=True) + + # Perform the subsetting by diagnostic source, then invoke the + # method to perform the reformatting. + for diag in diag_sources: + # Subset based on DIAG_SOURCE + ds_df = subset_df.loc[subset_df[diag_src_col_name] == diag] + + ds_df_reformatted = self.reformat_tcdiag(ds_df) + reformatted_dfs.append(ds_df_reformatted) + + # concat all the diagnostic source dataframes into one + all_tcdiag_reformatted = pd.concat(reformatted_dfs) + + # Rename the columns. Replace fcst_lead with LEAD, fcst_init with INIT, fcst_valid with VALID, and convert + # the remaining column header names to all upper case to be compatible + # with METplotpy's TCMPR plotter. + lc_cols = all_tcdiag_reformatted.columns.to_list() + uc_cols = [] + + for cur_col in lc_cols: + if cur_col == 'fcst_lead': + uc_cur_col = 'LEAD' + uc_cols.append(uc_cur_col) + elif cur_col == 'fcst_init': + uc_cur_col = 'INIT' + uc_cols.append(uc_cur_col) + elif cur_col == 'fcst_valid': + uc_cur_col = 'VALID' + uc_cols.append(uc_cur_col) + else: + uc_cols.append(cur_col.upper()) + + all_tcdiag_reformatted.columns = uc_cols + + # Ensure that the LEAD column is integer type + all_tcdiag_reformatted['LEAD'].astype(int) + + + # Join the TCMPR and TCDIAG dataframes into one and do some cleaning up of columns + uc_long_header_tcst = [hdr.upper() for hdr in cn.LONG_HEADER_TCST] + common_headers = uc_long_header_tcst[0:len(uc_long_header_tcst) - 1] + full_df = pd.merge(reformatted_tcmpr, all_tcdiag_reformatted, on=common_headers, how='inner') + + # Clean up extraneous columns: + # TOTAL_x and TOTAL_y are identical, drop TOTAL_y and rename TOTAL_x to TOTAL + # LINE_TYPE_x is TCMPR, LINE_TYPE_y is TCDIAG, drop LINE_TYPE_x and rename LINE_TYPE_x to LINE_TYPE + cleanup_df = full_df.copy(deep=True) + cleanup_df.drop('TOTAL_y', axis=1, inplace=True) + cleanup_df.drop('LINE_TYPE_x', axis=1, inplace=True) + cleanup_df.rename({'TOTAL_x':'TOTAL', 'LINE_TYPE_y':'LINE_TYPE'}, axis=1, inplace=True) + + end_tcdiag = time.perf_counter() + time_to_process_tcdiag = end_tcdiag - begin_tcdiag + logging.info(f"Total time for processing the TCDiag matched pair linetype: {time_to_process_tcdiag} seconds") + + return cleanup_df + + def reformat_tcdiag(self, tcdiag_df: pd.DataFrame) -> pd.DataFrame: + + """ + Takes a TCDiag dataframe and reformats it by + replacing the VALUE_i column with the value of the corresponding DIAG_i + and removing the DIAG_i column. + + e.g. + DIAG_1 VALUE_1 DIAG_2 VALUE_2 + SHR_MAG 15.0 STM_SPD 63.0 + + becomes: + SHR_MAG STM_SPD + 15.0 63.0 + + + Args: + @param tcdiag_df: A dataframe containing only the TCDIAG linetype. + + Returns: a reformatted df where the DIAG_i columns are removed and the VALUE_i columns are named + with the value of the corresponding DIAG_i + """ + + begin_reformat = time.perf_counter() + logger.info("Reformat the TCDiag dataframe based on the DIAG_SOURCE ") + n_diag_col_name = cn.LINE_VAR_COUNTER[cn.TCDIAG] + ds_df = tcdiag_df.copy(deep=True) + + # Subset the dataframe to contain only the relevant columns + num_repeating_col_labels = cn.LINE_VAR_REPEATS[cn.TCDIAG] + + all_n_diags = ds_df[n_diag_col_name] + max_n_diag = int(all_n_diags.max()) + + # Calculate the total number of columns + num_relevant_columns = max_n_diag * num_repeating_col_labels + total_num_columns = num_relevant_columns + cn.NUM_STATIC_TCDIAG_COLS + idx_last_relevant_col = total_num_columns + relevant_df = ds_df.iloc[0:, 0:idx_last_relevant_col] + + # Work on a copy + ds_df = relevant_df.copy(deep=True) + + # Get column names for each DIAG_i, VALUE_i pair + start_diag_col_name = str(int(n_diag_col_name) + 1) + start_value_col_name = str(int(start_diag_col_name) + 1) + + # Retrieve the DIAG_i value and replace the VALUE_i column name with this value + # i.e. if the DIAG_i value is SHR_MAG, then the corresponding VALUE_i column name will be replaced with + # SHR_MAG + start_diag = start_diag_col_name + start_value = start_value_col_name + num_diags = ds_df[n_diag_col_name].to_list() + num_diag = int(num_diags[0]) + + # Keep track of the DIAG_i columns to drop + diag_to_drop = [] + + for i in range(0, num_diag): + diag_names: list = ds_df[start_diag].to_list() + # All the diag names are identical in this column, use the first one in the list + diag_name = diag_names[0] + + # Replace the VALUE_i column corresponding to the DIAG_i with the name of the diagnostic + ds_df.rename({start_value: diag_name}, axis='columns', inplace=True) + diag_to_drop.append(start_diag) + next_diag = str(int(start_diag) + 2) + next_value = str(int(start_value) + 2) + start_diag = next_diag + start_value = next_value + + # Drop the columns containing the DIAG types + ds_df.drop(diag_to_drop, axis=1, inplace=True) + reformatted = ds_df.copy(deep=True) + reformatted.rename( + {'0': 'total', '1': 'index_pairs', '2': 'diag_source', '3': 'track_source', '4': 'field_source', + '5': 'n_diag'}, + axis='columns', inplace=True) + + # Replace the shear magnitude column with the common name since different DIAG_SOURCES use different + # 4 letter abbreviations for the same field (e.g. SHRD in SHIPS and SHR_MAG in CIRA RT are the identifiers + # for shear magnitude + reformatted_cols = reformatted.columns.to_list() + if 'SHR_MAG' in reformatted_cols: + reformatted.rename({'SHR_MAG':cn.TCDIAG_COMMON_NAMES['SHR_MAG']}, axis='columns', inplace=True) + elif 'SHRD' in reformatted_cols: + reformatted.rename({'SHRD':cn.TCDIAG_COMMON_NAMES['SHRD']}, axis='columns', inplace=True) + if 'LAND' in reformatted_cols: + reformatted.rename({'LAND': cn.TCDIAG_COMMON_NAMES['LAND']}, axis='columns', inplace=True) + elif 'DTL' in reformatted_cols: + reformatted.rename({'DTL': cn.TCDIAG_COMMON_NAMES['DTL']}, axis='columns', inplace=True) + if 'STM_SPD' in reformatted_cols: + reformatted.rename({'STM_SPD': cn.TCDIAG_COMMON_NAMES['STM_SPD']}, axis='columns', inplace=True) + + # Clean up intermediate dataframes + del ds_df + gc.collect + + + + end_reformat = time.perf_counter() + time_to_reformat = end_reformat - begin_reformat + logger.info(f"Finished reformatting TCDiag matched pair output in {time_to_reformat} seconds") + + return reformatted + + + def reformat_tcmpr(self, tcmpr_df:pd.DataFrame)-> pd.DataFrame: + """ + Reformats the TCMPR data by providing explicit header (column) names as specified by the MET User's Guide + section 24.2. + + Args: + @param: tcmpr_df: + + Returns: + tcmpr_reformatted: A dataframe containing the "reformatted" TCMPR linetype data + """ + + begin_reformat = time.perf_counter() + logger.info("Reformatting the TCMPR dataframe...") + + # Keep only the TCMPR columns + tcmpr_columns:list = cn.COLUMNS[cn.TCMPR] + uc_tcmpr_columns = [col.upper() for col in tcmpr_columns] + long_header_tcst = cn.LONG_HEADER_TCST + uc_long_header_tcst = [header.upper() for header in long_header_tcst] + all_tcmpr_headers = uc_long_header_tcst + uc_tcmpr_columns + + # Keep only the TCMPR relevant columns (extra columns may exist due to TCDIAG rows in the original data) + all_columns:list = tcmpr_df.columns.to_list() + cols_to_drop:list = all_columns[len(all_tcmpr_headers):] + tcmpr_relevant: pd.DataFrame = tcmpr_df.drop(cols_to_drop, axis=1) + + # Give appropriate names to all the columns (all upper case and replace numbered columns with actual + # names). + tcmpr_relevant.columns = all_tcmpr_headers + + end_reformat = time.perf_counter() + reformat_time = end_reformat - begin_reformat + logger.info("Reformatting the TCMPR dataframe took {reformat_time} seconds") + + return tcmpr_relevant + def rename_confidence_level_columns(self, confidence_level_columns: List[str]) -> \ List[str]: @@ -1550,7 +1826,6 @@ def main(): xml_loadfile_obj: XmlLoadFile = XmlLoadFile(None) # Retrieve all the filenames in the data_dir specified in the YAML config file - beg_load = time.perf_counter() load_files = xml_loadfile_obj.filenames_from_template(parms['input_data_dir'], {}) @@ -1559,9 +1834,13 @@ def main(): beg_read_data = time.perf_counter() rdf_obj.read_data(flags, load_files, line_types) end_read_data = time.perf_counter() - time_to_read = end_read_data - beg_read_data - logger.info("Time to read input .stat data files using METdbLoad: {time_to_read}" ) - file_df = rdf_obj.stat_data + read_data_total = end_read_data - beg_read_data + logger.info("Time to read input .stat data files using METdbLoad: {read_data_total} in seconds" ) + if parms['line_type'] == 'TCDIAG': + file_df = rdf_obj.tcst_data + else: + file_df = rdf_obj.stat_data + file_df.to_csv('/Users/minnawin/feature_240_reformat_tcdiag/METdataio/METreformat/output/orig_input.txt', sep='\t', index=False) # Check if the output file already exists, if so, delete it to avoid # appending output from subsequent runs into the same file.