Skip to content

Commit

Permalink
Issue #240 support to reformat the TCDiag linetype output from TC-Pai…
Browse files Browse the repository at this point in the history
…rs output, includes labelling all TCMPR headers
  • Loading branch information
bikegeek committed Mar 26, 2024
1 parent 43ba750 commit 91c7cbe
Showing 1 changed file with 290 additions and 11 deletions.
301 changes: 290 additions & 11 deletions METreformat/write_stat_ascii.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import logging
import time
import pathlib
from typing import List, Set
from typing import List
import numpy as np
import pandas as pd
import yaml
Expand Down Expand Up @@ -115,7 +115,7 @@ def write_stat_ascii(self, stat_data: pd.DataFrame, parms: dict) -> pd.DataFrame
# ----------------------------------
supported_linetypes = [cn.FHO, cn.CNT, cn.VCNT, cn.CTC,
cn.CTS, cn.MCTS, cn.SL1L2, cn.ECNT, cn.PCT,
cn.RHIST]
cn.RHIST, cn.TCDIAG]

# Different formats based on the line types. Most METplotpy plots accept the long format where
# all stats are under the stat_name and stat_value columns and the confidence limits under the
Expand All @@ -125,7 +125,12 @@ def write_stat_ascii(self, stat_data: pd.DataFrame, parms: dict) -> pd.DataFrame
working_df = stat_data.copy(deep=True)
linetype_requested = str( parms['line_type']).upper()
if linetype_requested in supported_linetypes:
working_df = working_df.loc[working_df['line_type'] == linetype_requested]
# If the TCDiag linetype is requested, keep both the TCDiag and TCMPR linetypes.
if linetype_requested == cn.TCDIAG:
working_df = working_df.loc[(working_df['line_type'] == linetype_requested) |
(working_df['line_type'] == cn.TCMPR)]
else:
working_df = working_df.loc[working_df['line_type'] == linetype_requested]
else:
logging.ERROR("Requested line type is currently not supported for reformatting")
raise ValueError("Requested line type ", linetype_requested, " is currently not supported for reformatting")
Expand Down Expand Up @@ -292,6 +297,12 @@ def process_by_stat_linetype(self, linetype: str, stat_data: pd.DataFrame, is_ag
linetype_data: pd.DataFrame = self.process_rhist(stat_data)
else:
linetype_data: pd.DataFrame = self.process_rhist_for_agg(stat_data)

# TCDIAG (from MET TC-Pairs output)
elif linetype == cn.TCDIAG:
# No need to support additional reformatting for agg_stat.
linetype_data: pd.DataFrame = self.process_tcdiag(stat_data)

else:
return None

Expand Down Expand Up @@ -432,7 +443,6 @@ def process_pct(self, stat_data: pd.DataFrame) -> pd.DataFrame:
on_cols = []
i_value = []
working_headers = working_copy_df.columns.to_list()
print(f"working headers: {working_headers}")
remaining_columns = working_headers[cn.NUM_STATIC_PCT_COLS:]
for cur in remaining_columns:
match_thresh = re.match(r'(thresh_)(\d+)', cur)
Expand Down Expand Up @@ -546,9 +556,6 @@ def process_rhist(self, stat_data: pd.DataFrame) -> pd.DataFrame:
# Replace the first two numbered labels (following the LINETYPE column) with the TOTAL and N_RANK labels
working_df.rename(columns={'0':'total', cn.LINE_VAR_COUNTER[cn.RHIST]:'n_rank'}, inplace=True)

# Relabel the remaining numbered column headers
last_column_name = len(working_df.columns) - cn.NUM_STATIC_RHIST_COLS

# Relabel the repeating columns (RANK_1, ..., RANK_n)
# column names are numbered '1','2','3',...,etc. by METdbLoad.
# Give them descriptive labels: rank_1, rank_2, etc.
Expand Down Expand Up @@ -1450,6 +1457,275 @@ def process_ecnt_for_agg(self, stat_data: pd.DataFrame) -> pd.DataFrame:

return merged_dfs

def process_tcdiag(self, stat_data: pd.DataFrame) -> pd.DataFrame:
"""
Reformat the TCMPR and TCDiag linetype data. Reformat the TCMPR linetype into one dataframe,
then the TCDiag linetype into another dataframe. Perform a left join to capture all the data into
a single row for the same model, init time, valid time, fcst time, etc. This results in fewer rows.
To reformat the TCMPR linetype data, label all the unnamed headers (i.e. those with numbers '1', '2', ...,)
with the corresponding name as specified in the MET User's Guide, section 24.2.
To reformat the TCDiag linetype data, collect the DIAG_i values into their own columns:
e.g. if N_DIAG is 4 then:
DIAG_1 = SHR_MAG with VALUE_1 = 1
DIAG_2 = STM_SPD with VALUE_2 = 15
DIAG_3 = TPW with VALUE_3 = 63
DIAG_4 = LAND with VALUE_4 = 307
will look like this (the standard/common columns will precede these columns):
SHR_MAG STM_SPD TPW LAND
1 15 63 307
This will resemble the TCMPR linetype's output file, where every column has a header name/column name.
Arguments:
@param stat_data: The original input data, containing both TCMPR and TCDIAG linetype rows.
Returns:
full_df: the reformatted dataframe with all unlabelled columns under the appropriate header/column name
for the TCMPR linetype. For the TCDIAG linetype, the DIAG_i VALUE_i pairs are consolidated
under the name of the DIAG_i value. The TCMPR and TCDIAG columns are consolidated into
the same rows via an inner join.
"""

begin_tcdiag = time.perf_counter()

# Provide appropriate names for the TCMPR headers (replacing numbered columns i.e. '1', '2',..., etc. with
# the column names specified in the MET User's Guide TC-Pairs section).
tcmpr_df = stat_data.loc[stat_data['line_type'] == cn.TCMPR]
reformatted_tcmpr = self.reformat_tcmpr(tcmpr_df)
reformatted_tcmpr.to_csv("/Users/minnawin/feature_240_reformat_tcdiag/METdataio/METreformat/output/tcmpr_reformatted.txt", sep='\t', index=False)

# Perform reformatting for the TCDiag linetype
# Determine the columns for the line type
linetype: str = cn.TCDIAG

#
# Subset the input dataframe to include only the TCDIAG columns and label the remaining
# "unlabelled" (i.e. labelled with numbers after data is read in by METdbLoad)
# columns/headers.
#
# Do not assume that the input data contains only the TCDIAG lines. Since the TCDIAG linetype
# is available from the MET TC-Pairs tool, it is very likely that TCMPR line type data will also be
# present in the input data file(s).
stat_data.to_csv('/Users/minnawin/feature_240_reformat_tcdiag/METdataio/METreformat/output/all_tcpairs_tcdiag.txt', sep='\t')
all_tcdiag_df = stat_data.loc[stat_data['line_type'] == linetype]
all_tcdiag_df.to_csv('/Users/minnawin/feature_240_reformat_tcdiag/METdataio/METreformat/output/all_tcdiag.txt', sep='\t')

# Subset based on the DIAG_SOURCE, these provide different diaganostic measurements (i.e. columns).
# Join all the subsets into one final dataframe.

# Get the diagnostic sources (DIAG_SOURCE column)
diag_src_col_name = cn.TCDIAG_DIAG_SOURCE_COLNAME
all_diag_sources:np.narray = all_tcdiag_df[diag_src_col_name].unique()
diag_sources:list = sorted(all_diag_sources)

reformatted_dfs = []
subset_df = all_tcdiag_df.copy(deep=True)

# Perform the subsetting by diagnostic source, then invoke the
# method to perform the reformatting.
for diag in diag_sources:
# Subset based on DIAG_SOURCE
ds_df = subset_df.loc[subset_df[diag_src_col_name] == diag]

ds_df_reformatted = self.reformat_tcdiag(ds_df)
reformatted_dfs.append(ds_df_reformatted)

# concat all the diagnostic source dataframes into one
all_tcdiag_reformatted = pd.concat(reformatted_dfs)

# Rename the columns. Replace fcst_lead with LEAD, fcst_init with INIT, fcst_valid with VALID, and convert
# the remaining column header names to all upper case to be compatible
# with METplotpy's TCMPR plotter.
lc_cols = all_tcdiag_reformatted.columns.to_list()
uc_cols = []

for cur_col in lc_cols:
if cur_col == 'fcst_lead':
uc_cur_col = 'LEAD'
uc_cols.append(uc_cur_col)
elif cur_col == 'fcst_init':
uc_cur_col = 'INIT'
uc_cols.append(uc_cur_col)
elif cur_col == 'fcst_valid':
uc_cur_col = 'VALID'
uc_cols.append(uc_cur_col)
else:
uc_cols.append(cur_col.upper())

all_tcdiag_reformatted.columns = uc_cols

# Ensure that the LEAD column is integer type
all_tcdiag_reformatted['LEAD'].astype(int)


# Join the TCMPR and TCDIAG dataframes into one and do some cleaning up of columns
uc_long_header_tcst = [hdr.upper() for hdr in cn.LONG_HEADER_TCST]
common_headers = uc_long_header_tcst[0:len(uc_long_header_tcst) - 1]
full_df = pd.merge(reformatted_tcmpr, all_tcdiag_reformatted, on=common_headers, how='inner')

# Clean up extraneous columns:
# TOTAL_x and TOTAL_y are identical, drop TOTAL_y and rename TOTAL_x to TOTAL
# LINE_TYPE_x is TCMPR, LINE_TYPE_y is TCDIAG, drop LINE_TYPE_x and rename LINE_TYPE_x to LINE_TYPE
cleanup_df = full_df.copy(deep=True)
cleanup_df.drop('TOTAL_y', axis=1, inplace=True)
cleanup_df.drop('LINE_TYPE_x', axis=1, inplace=True)
cleanup_df.rename({'TOTAL_x':'TOTAL', 'LINE_TYPE_y':'LINE_TYPE'}, axis=1, inplace=True)

end_tcdiag = time.perf_counter()
time_to_process_tcdiag = end_tcdiag - begin_tcdiag
logging.info(f"Total time for processing the TCDiag matched pair linetype: {time_to_process_tcdiag} seconds")

return cleanup_df

def reformat_tcdiag(self, tcdiag_df: pd.DataFrame) -> pd.DataFrame:

"""
Takes a TCDiag dataframe and reformats it by
replacing the VALUE_i column with the value of the corresponding DIAG_i
and removing the DIAG_i column.
e.g.
DIAG_1 VALUE_1 DIAG_2 VALUE_2
SHR_MAG 15.0 STM_SPD 63.0
becomes:
SHR_MAG STM_SPD
15.0 63.0
Args:
@param tcdiag_df: A dataframe containing only the TCDIAG linetype.
Returns: a reformatted df where the DIAG_i columns are removed and the VALUE_i columns are named
with the value of the corresponding DIAG_i
"""

begin_reformat = time.perf_counter()
logger.info("Reformat the TCDiag dataframe based on the DIAG_SOURCE ")
n_diag_col_name = cn.LINE_VAR_COUNTER[cn.TCDIAG]
ds_df = tcdiag_df.copy(deep=True)

# Subset the dataframe to contain only the relevant columns
num_repeating_col_labels = cn.LINE_VAR_REPEATS[cn.TCDIAG]

all_n_diags = ds_df[n_diag_col_name]
max_n_diag = int(all_n_diags.max())

# Calculate the total number of columns
num_relevant_columns = max_n_diag * num_repeating_col_labels
total_num_columns = num_relevant_columns + cn.NUM_STATIC_TCDIAG_COLS
idx_last_relevant_col = total_num_columns
relevant_df = ds_df.iloc[0:, 0:idx_last_relevant_col]

# Work on a copy
ds_df = relevant_df.copy(deep=True)

# Get column names for each DIAG_i, VALUE_i pair
start_diag_col_name = str(int(n_diag_col_name) + 1)
start_value_col_name = str(int(start_diag_col_name) + 1)

# Retrieve the DIAG_i value and replace the VALUE_i column name with this value
# i.e. if the DIAG_i value is SHR_MAG, then the corresponding VALUE_i column name will be replaced with
# SHR_MAG
start_diag = start_diag_col_name
start_value = start_value_col_name
num_diags = ds_df[n_diag_col_name].to_list()
num_diag = int(num_diags[0])

# Keep track of the DIAG_i columns to drop
diag_to_drop = []

for i in range(0, num_diag):
diag_names: list = ds_df[start_diag].to_list()
# All the diag names are identical in this column, use the first one in the list
diag_name = diag_names[0]

# Replace the VALUE_i column corresponding to the DIAG_i with the name of the diagnostic
ds_df.rename({start_value: diag_name}, axis='columns', inplace=True)
diag_to_drop.append(start_diag)
next_diag = str(int(start_diag) + 2)
next_value = str(int(start_value) + 2)
start_diag = next_diag
start_value = next_value

# Drop the columns containing the DIAG types
ds_df.drop(diag_to_drop, axis=1, inplace=True)
reformatted = ds_df.copy(deep=True)
reformatted.rename(
{'0': 'total', '1': 'index_pairs', '2': 'diag_source', '3': 'track_source', '4': 'field_source',
'5': 'n_diag'},
axis='columns', inplace=True)

# Replace the shear magnitude column with the common name since different DIAG_SOURCES use different
# 4 letter abbreviations for the same field (e.g. SHRD in SHIPS and SHR_MAG in CIRA RT are the identifiers
# for shear magnitude
reformatted_cols = reformatted.columns.to_list()
if 'SHR_MAG' in reformatted_cols:
reformatted.rename({'SHR_MAG':cn.TCDIAG_COMMON_NAMES['SHR_MAG']}, axis='columns', inplace=True)
elif 'SHRD' in reformatted_cols:
reformatted.rename({'SHRD':cn.TCDIAG_COMMON_NAMES['SHRD']}, axis='columns', inplace=True)
if 'LAND' in reformatted_cols:
reformatted.rename({'LAND': cn.TCDIAG_COMMON_NAMES['LAND']}, axis='columns', inplace=True)
elif 'DTL' in reformatted_cols:
reformatted.rename({'DTL': cn.TCDIAG_COMMON_NAMES['DTL']}, axis='columns', inplace=True)
if 'STM_SPD' in reformatted_cols:
reformatted.rename({'STM_SPD': cn.TCDIAG_COMMON_NAMES['STM_SPD']}, axis='columns', inplace=True)

# Clean up intermediate dataframes
del ds_df
gc.collect



end_reformat = time.perf_counter()
time_to_reformat = end_reformat - begin_reformat
logger.info(f"Finished reformatting TCDiag matched pair output in {time_to_reformat} seconds")

return reformatted


def reformat_tcmpr(self, tcmpr_df:pd.DataFrame)-> pd.DataFrame:
"""
Reformats the TCMPR data by providing explicit header (column) names as specified by the MET User's Guide
section 24.2.
Args:
@param: tcmpr_df:
Returns:
tcmpr_reformatted: A dataframe containing the "reformatted" TCMPR linetype data
"""

begin_reformat = time.perf_counter()
logger.info("Reformatting the TCMPR dataframe...")

# Keep only the TCMPR columns
tcmpr_columns:list = cn.COLUMNS[cn.TCMPR]
uc_tcmpr_columns = [col.upper() for col in tcmpr_columns]
long_header_tcst = cn.LONG_HEADER_TCST
uc_long_header_tcst = [header.upper() for header in long_header_tcst]
all_tcmpr_headers = uc_long_header_tcst + uc_tcmpr_columns

# Keep only the TCMPR relevant columns (extra columns may exist due to TCDIAG rows in the original data)
all_columns:list = tcmpr_df.columns.to_list()
cols_to_drop:list = all_columns[len(all_tcmpr_headers):]
tcmpr_relevant: pd.DataFrame = tcmpr_df.drop(cols_to_drop, axis=1)

# Give appropriate names to all the columns (all upper case and replace numbered columns with actual
# names).
tcmpr_relevant.columns = all_tcmpr_headers

end_reformat = time.perf_counter()
reformat_time = end_reformat - begin_reformat
logger.info("Reformatting the TCMPR dataframe took {reformat_time} seconds")

return tcmpr_relevant


def rename_confidence_level_columns(self, confidence_level_columns: List[str]) -> \
List[str]:
Expand Down Expand Up @@ -1550,7 +1826,6 @@ def main():
xml_loadfile_obj: XmlLoadFile = XmlLoadFile(None)

# Retrieve all the filenames in the data_dir specified in the YAML config file
beg_load = time.perf_counter()
load_files = xml_loadfile_obj.filenames_from_template(parms['input_data_dir'],
{})

Expand All @@ -1559,9 +1834,13 @@ def main():
beg_read_data = time.perf_counter()
rdf_obj.read_data(flags, load_files, line_types)
end_read_data = time.perf_counter()
time_to_read = end_read_data - beg_read_data
logger.info("Time to read input .stat data files using METdbLoad: {time_to_read}" )
file_df = rdf_obj.stat_data
read_data_total = end_read_data - beg_read_data
logger.info("Time to read input .stat data files using METdbLoad: {read_data_total} in seconds" )
if parms['line_type'] == 'TCDIAG':
file_df = rdf_obj.tcst_data
else:
file_df = rdf_obj.stat_data
file_df.to_csv('/Users/minnawin/feature_240_reformat_tcdiag/METdataio/METreformat/output/orig_input.txt', sep='\t', index=False)

# Check if the output file already exists, if so, delete it to avoid
# appending output from subsequent runs into the same file.
Expand Down

0 comments on commit 91c7cbe

Please sign in to comment.