Skip to content

Commit

Permalink
fold patch update and other updates into atm_analysis.py (NOAA-EMC#1313)
Browse files Browse the repository at this point in the history
  • Loading branch information
RussTreadon-NOAA committed Apr 15, 2023
1 parent 3501d59 commit 8492ccf
Showing 1 changed file with 64 additions and 33 deletions.
97 changes: 64 additions & 33 deletions ush/python/pygfs/task/atm_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,11 @@ def initialize(self: Analysis) -> None:

# stage berror files
# copy static background error files, otherwise it will assume ID matrix
if self.task_config.STATICB_TYPE in ['identity', 'bump', 'gsibec']:
logger.debug(f"Stage files for STATICB_TYPE {self.task_config.STATICB_TYPE}")
FileHandler(self.get_berror_dict(self.task_config)).sync()
else:
raise WorkflowException(f"***ERROR*** STATICB_TYPE = {self.task_config.STATICB_TYPE} is invalid")
logger.debug(f"Stage files for STATICB_TYPE {self.task_config.STATICB_TYPE}")
FileHandler(self.get_berror_dict(self.task_config)).sync()

# stage backgrounds
FileHandler(self.get_bkg_dict(AttrDict(self.task_config, **self.task_config))).sync()
FileHandler(self.get_bkg_dict(AttrDict(self.task_config))).sync()

# generate variational YAML file
logger.debug(f"Generate variational YAML file: {self.task_config.fv3jedi_yaml}")
Expand Down Expand Up @@ -156,27 +153,34 @@ def finalize(self: Analysis) -> None:
# get list of diag files to put in tarball
diags = glob.glob(os.path.join(self.task_config.DATA, 'diags', 'diag*nc4'))

logger.info(f"Compressing {len(diags)} diag files to {atmstat}.gz")

# gzip the files first
logger.debug(f"Gzipping {len(diags)} diag files")
for diagfile in diags:
with open(diagfile, 'rb') as f_in, gzip.open(f"{diagfile}.gz", 'wb') as f_out:
f_out.writelines(f_in)

# open tar file for writing
logger.debug(f"Creating tar file {atmstat} with {len(diags)} gzipped diag files")
with tarfile.open(atmstat, "w") as archive:
for diagfile in diags:
diaggzip = f"{diagfile}.gz"
archive.add(diaggzip, arcname=os.path.basename(diaggzip))

# copy full YAML from executable to ROTDIR
src = os.path.join(self.task_config.DATA, f"{self.task_config.CDUMP}.t{self.runtime_config.cyc:02d}z.atmvar.yaml")
dest = os.path.join(self.task_config.COMOUTatmos, f"{self.task_config.CDUMP}.t{self.runtime_config.cyc:02d}z.atmvar.yaml")
logger.info(f"Copying {self.task_config.fv3jedi_yaml} to {self.task_config.COMOUTatmos}")
src = os.path.join(self.task_config.DATA, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atmvar.yaml")
dest = os.path.join(self.task_config.COMOUTatmos, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atmvar.yaml")
logger.debug(f"Copying {src} to {dest}")
yaml_copy = {
'mkdir': [self.task_config.COMOUTatmos],
'copy': [[src, dest]]
}
FileHandler(yaml_copy).sync()

# copy bias correction files to ROTDIR
logger.info("Copy bias correction files from DATA/ to COM/")
biasdir = os.path.join(self.task_config.DATA, 'bc')
biasls = os.listdir(biasdir)
biaslist = []
Expand All @@ -186,12 +190,11 @@ def finalize(self: Analysis) -> None:
biaslist.append([src, dest])

gprefix = f"{self.task_config.GPREFIX}"
gdate = to_YMDH(self.task_config.previous_cycle)
gsuffix = f"{gdate}" + ".txt"
gsuffix = f"{to_YMDH(self.task_config.previous_cycle)}" + ".txt"
aprefix = f"{self.task_config.APREFIX}"
adate = to_YMDH(self.task_config.current_cycle)
asuffix = f"{adate}" + ".txt"
asuffix = f"{to_YMDH(self.task_config.current_cycle)}" + ".txt"

logger.info(f"Copying {gprefix}*{gsuffix} from DATA/ to COM/ as {aprefix}*{asuffix}")
obsdir = os.path.join(self.task_config.DATA, 'obs')
obsls = os.listdir(obsdir)
for ofile in obsls:
Expand All @@ -209,6 +212,7 @@ def finalize(self: Analysis) -> None:
FileHandler(bias_copy).sync()

# Create UFS model readable atm increment file from UFS-DA atm increment
logger.info("Create UFS model readable atm increment file from UFS-DA atm increment")
self.jedi2fv3inc()

def clean(self):
Expand Down Expand Up @@ -250,6 +254,7 @@ def get_bkg_dict(self, task_config: Dict[str, Any]) -> Dict[str, List[str]]:
for itile in range(1, task_config.ntiles + 1):
basename = template.format(tilenum=itile)
bkglist.append([os.path.join(rst_dir, basename), os.path.join(run_dir, basename)])

for ftype in ['phy_data', 'sfc_data']:
template = f'{to_fv3time(self.task_config.current_cycle)}.{ftype}.tile{{tilenum}}.nc'
for itile in range(1, task_config.ntiles + 1):
Expand Down Expand Up @@ -280,17 +285,39 @@ def get_berror_dict(self, config: Dict[str, Any]) -> Dict[str, List[str]]:
berror_dict: Dict
a dictionary containing the list of atm background error files to copy for FileHandler
"""
SUPPORTED_BERROR_STATIC_MAP = {'bump': self.__get_berror_bump_dict, 'gsibec': self.__get_berror_gsibec_dict}
berror_dict = SUPPORTED_BERROR_STATIC_MAP[config.STATICB_TYPE]()
SUPPORTED_BERROR_STATIC_MAP = {'identity': self._get_berror_dict_identity,
'bump': self._get_berror_dict_bump,
'gsibec': self._get_berror_dict_gsibec}

try:
berror_dict = SUPPORTED_BERROR_STATIC_MAP[config.STATICB_TYPE](config)
except KeyError:
raise KeyError(f"{config.STATICB_TYPE} is not a supported background error type.\n" +
f"Currently supported background error types are:\n" +
f'{" | ".join(SUPPORTED_BERROR_STATIC_MAP.keys())}')

return berror_dict

@staticmethod
@logit(logger)
def __get_berror_bump_dict(self: Analysis) -> Dict[str, List[str]]:
def _get_berror_dict_identity(config: Dict[str, Any]) -> Dict[str, List[str]]:
"""Compile a dictionary of atm identity background error files to copy
This method is not implemented yet.
This is a private method and should not be accessed directly.
"""
raise NotImplementedError("Identity background error not yet implemented")

@staticmethod
@logit(logger)
def _get_berror_dict_bump(config: Dict[str, Any]) -> Dict[str, List[str]]:
"""Compile a dictionary of atm bump background error files to copy
This method will construct a dictionary of atm bump background error
files for global atm DA and return said dictionary to the parent
This is a private method and should not be accessed directly.
Parameters
----------
config: Dict
Expand All @@ -303,41 +330,45 @@ def __get_berror_bump_dict(self: Analysis) -> Dict[str, List[str]]:
"""
# BUMP atm static-B needs nicas, cor_rh, cor_rv and stddev files.
b_dir = config.BERROR_DATA_DIR
b_datestr = to_fv3time(self.task_config.BERROR_DATE)
b_datestr = to_fv3time(config.BERROR_DATE)
berror_list = []
for ftype in ['cor_rh', 'cor_rv', 'stddev']:
coupler = f'{b_datestr}.{ftype}.coupler.res'
berror_list.append([
os.path.join(b_dir, coupler), os.path.join(self.task_config.DATA, 'berror', coupler)
os.path.join(b_dir, coupler), os.path.join(config.DATA, 'berror', coupler)
])

template = '{b_datestr}.{ftype}.fv_tracer.res.tile{{tilenum}}.nc'
for itile in range(1, self.task_config.ntiles + 1):
for itile in range(1, config.ntiles + 1):
tracer = template.format(tilenum=itile)
berror_list.append([
os.path.join(b_dir, tracer), os.path.join(self.task_config.DATA, 'berror', tracer)
os.path.join(b_dir, tracer), os.path.join(config.DATA, 'berror', tracer)
])

nproc = self.task_config.ntiles * self.task_config.layout_x * self.task_config.layout_y
nproc = config.ntiles * config.layout_x * config.layout_y
for nn in range(1, nproc + 1):
berror_list.append([
os.path.join(b_dir, f'nicas_aero_nicas_local_{nproc:06}-{nn:06}.nc'),
os.path.join(self.task_config.DATA, 'berror', f'nicas_aero_nicas_local_{nproc:06}-{nn:06}.nc')
os.path.join(config.DATA, 'berror', f'nicas_aero_nicas_local_{nproc:06}-{nn:06}.nc')
])

# create dictionary of background error files to stage
berror_dict = {
'mkdir': [os.path.join(self.task_config.DATA, 'berror')],
'mkdir': [os.path.join(config.DATA, 'berror')],
'copy': berror_list,
}
return berror_dict

@staticmethod
@logit(logger)
def __get_berror_gsibec_dict(self: Analysis) -> Dict[str, List[str]]:
def _get_berror_dict_gsibec(config: Dict[str, Any]) -> Dict[str, List[str]]:
"""Compile a dictionary of atm gsibec background error files to copy
This method will construct a dictionary of atm gsibec background error
files for global atm DA and return said dictionary to the parent
This is a private method and should not be accessed directly.
Parameters
----------
config: Dict
Expand All @@ -349,17 +380,17 @@ def __get_berror_gsibec_dict(self: Analysis) -> Dict[str, List[str]]:
a dictionary of atm gsibec background error files to copy for FileHandler
"""
# GSI atm static-B needs namelist and coefficient files.
b_dir = os.path.join(self.task_config.HOMEgfs, 'fix', 'gdas', 'gsibec', self.task_config.CASE_ANL)
b_dir = os.path.join(config.HOMEgfs, 'fix', 'gdas', 'gsibec', config.CASE_ANL)
berror_list = []
for ftype in ['gfs_gsi_global.nml', 'gsi-coeffs-gfs-global.nc4']:
berror_list.append([
os.path.join(b_dir, ftype),
os.path.join(self.task_config.DATA, 'berror', ftype)
os.path.join(config.DATA, 'berror', ftype)
])

# create dictionary of background error files to stage
berror_dict = {
'mkdir': [os.path.join(self.task_config.DATA, 'berror')],
'mkdir': [os.path.join(config.DATA, 'berror')],
'copy': berror_list,
}
return berror_dict
Expand All @@ -380,19 +411,18 @@ def jedi2fv3inc(self: Analysis) -> None:
"""
# Select the atm guess file based on the analysis and background resolutions
# Fields from the atm guess are used to compute the delp and delz increments
case_anl = int(self.config.CASE_ANL[1:])
case = int(self.config.CASE[1:])
if case_anl == case:
atmges_fv3 = os.path.join(self.task_config.comin_ges_atm, f"{self.task_config.GPREFIX}atmf006.nc")
else:
atmges_fv3 = os.path.join(self.task_config.comin_ges_atm, f"{self.task_config.GPREFIX}atmf006.ensres.nc")
case_anl = int(self.task_config.CASE_ANL[1:])
case = int(self.task_config.CASE[1:])

file = f"{self.task_config.GPREFIX}" + "atmf006" + f"{'' if case_anl == case else '.ensres'}" + ".nc"
atmges_fv3 = os.path.join(self.task_config.comin_ges_atm, file)

# Set the path/name to the input UFS-DA atm increment file (atminc_jedi)
# and the output UFS model atm increment file (atminc_fv3)
cdate = to_fv3time(self.task_config.current_cycle)
cdate_inc = cdate.replace('.', '_')
atminc_jedi = os.path.join(self.task_config.DATA, 'anl', f'atminc.{cdate_inc}z.nc4')
atminc_fv3 = os.path.join(self.task_config.COMOUTatmos, f"{self.task_config.CDUMP}.t{self.runtime_config.cyc:02d}z.atminc.nc")
atminc_fv3 = os.path.join(self.task_config.COMOUTatmos, f"{self.task_config.CDUMP}.t{self.task_config.cyc:02d}z.atminc.nc")

# Reference the python script which does the actual work
incpy = os.path.join(self.task_config.HOMEgfs, 'ush/jediinc2fv3.py')
Expand All @@ -402,4 +432,5 @@ def jedi2fv3inc(self: Analysis) -> None:
cmd.add_default_arg(atmges_fv3)
cmd.add_default_arg(atminc_jedi)
cmd.add_default_arg(atminc_fv3)
logger.debug(f"Executing {cmd}")
cmd(output='stdout', error='stderr')

0 comments on commit 8492ccf

Please sign in to comment.