Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add components for surface snow data #14

Open
wants to merge 10 commits into
base: develop
Choose a base branch
from
58 changes: 0 additions & 58 deletions dump/config/bufr2ioda_script_backend_satwnd_amv_goes.yaml

This file was deleted.

31 changes: 19 additions & 12 deletions dump/mapping/bufr_satwnd_amv_goes.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,27 @@
import time
import numpy as np
import bufr
from pyioda.ioda.Engines.Bufr import Encoder as iodaEncoder
from bufr.encoders.netcdf import Encoder as netcdfEncoder
from pyioda.ioda.Engines.Bufr import Encoder as iodaEncoder
from bufr.encoders.netcdf import Encoder as netcdfEncoder
from wxflow import Logger

# Initialize Logger
# Get log level from the environment variable, default to 'INFO it not set
log_level = os.getenv('LOG_LEVEL', 'INFO')
logger = Logger('BUFR2IODA_satwnd_amv_goes.py', level=log_level, colored_log=False)


def logging(comm, level, message):
"""
Logs a message to the console or log file, based on the specified logging level.

This function ensures that logging is only performed by the root process (`rank 0`)
in a distributed computing environment. The function maps the logging level to
This function ensures that logging is only performed by the root process (`rank 0`)
in a distributed computing environment. The function maps the logging level to
appropriate logger methods and defaults to the 'INFO' level if an invalid level is provided.

Parameters:
comm: object
The communicator object, typically from a distributed computing framework
The communicator object, typically from a distributed computing framework
(e.g., MPI). It must have a `rank()` method to determine the process rank.
level: str
The logging level as a string. Supported levels are:
Expand All @@ -33,7 +34,7 @@ def logging(comm, level, message):
- 'WARNING'
- 'ERROR'
- 'CRITICAL'
If an invalid level is provided, a warning will be logged, and the level
If an invalid level is provided, a warning will be logged, and the level
will default to 'INFO'.
message: str
The message to be logged.
Expand Down Expand Up @@ -73,6 +74,7 @@ def logging(comm, level, message):
# Call the logging method
log_method(message)


def _make_description(mapping_path, update=False):
description = bufr.encoders.Description(mapping_path)

Expand Down Expand Up @@ -116,6 +118,7 @@ def _make_description(mapping_path, update=False):

return description


def compute_wind_components(wdir, wspd):
"""
Compute the U and V wind components from wind direction and wind speed.
Expand All @@ -130,9 +133,10 @@ def compute_wind_components(wdir, wspd):
wdir_rad = np.radians(wdir) # Convert degrees to radians
u = -wspd * np.sin(wdir_rad)
v = -wspd * np.cos(wdir_rad)

return u.astype(np.float32), v.astype(np.float32)


def _get_obs_type(swcm, chanfreq):
"""
Determine the observation type based on `swcm` and `chanfreq`.
Expand Down Expand Up @@ -164,6 +168,7 @@ def _get_obs_type(swcm, chanfreq):

return obstype


def _make_obs(comm, input_path, mapping_path):

# Get container from mapping file first
Expand All @@ -175,7 +180,7 @@ def _make_obs(comm, input_path, mapping_path):
logging(comm, 'DEBUG', f'category map = {container.get_category_map()}')

# Add new/derived data into container
for cat in container.all_sub_categories():
for cat in container.all_sub_categories():

logging(comm, 'DEBUG', f'category = {cat}')

Expand All @@ -193,7 +198,7 @@ def _make_obs(comm, input_path, mapping_path):
container.add('variables/windNorthward', wob, paths, cat)

else:
# Add new variables: ObsType/windEastward & ObsType/windNorthward
# Add new variables: ObsType/windEastward & ObsType/windNorthward
swcm = container.get('variables/windComputationMethod', cat)
chanfreq = container.get('variables/sensorCentralFrequency', cat)

Expand All @@ -209,7 +214,7 @@ def _make_obs(comm, input_path, mapping_path):
container.add('variables/obstype_uwind', obstype, paths, cat)
container.add('variables/obstype_vwind', obstype, paths, cat)

# Add new variables: ObsValue/windEastward & ObsValue/windNorthward
# Add new variables: ObsValue/windEastward & ObsValue/windNorthward
wdir = container.get('variables/windDirection', cat)
wspd = container.get('variables/windSpeed', cat)

Expand All @@ -231,6 +236,7 @@ def _make_obs(comm, input_path, mapping_path):

return container


def create_obs_group(input_path, mapping_path, category, env):

comm = bufr.mpi.Comm(env["comm_name"])
Expand All @@ -250,7 +256,7 @@ def create_obs_group(input_path, mapping_path, category, env):

container = _make_obs(comm, input_path, mapping_path)

# Gather data from all tasks into all tasks. Each task will have the complete record
# Gather data from all tasks into all tasks. Each task will have the complete record
logging(comm, 'INFO', f'Gather data from all tasks into all tasks')
container.all_gather(comm)

Expand All @@ -269,6 +275,7 @@ def create_obs_group(input_path, mapping_path, category, env):
logging(comm, 'INFO', f'Return the encoded data for {category}')
return data


def create_obs_file(input_path, mapping_path, output_path):

comm = bufr.mpi.Comm("world")
Expand All @@ -279,7 +286,7 @@ def create_obs_file(input_path, mapping_path, output_path):

# Encode the data
if comm.rank() == 0:
netcdfEncoder(description).encode(container, output_path)
netcdfEncoder(description).encode(container, output_path)

logging(comm, 'INFO', f'Return the encoded data')

Expand Down
189 changes: 189 additions & 0 deletions dump/mapping/bufr_sfcsno.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
#!/usr/bin/env python3
import sys
import os
import argparse
import time
import numpy as np
import bufr
from pyioda.ioda.Engines.Bufr import Encoder as iodaEncoder
from bufr.encoders.netcdf import Encoder as netcdfEncoder
from wxflow import Logger

# Initialize Logger
# Get log level from the environment variable, default to 'INFO it not set
log_level = os.getenv('LOG_LEVEL', 'INFO')
logger = Logger('BUFR2IODA_sfcsno.py', level=log_level, colored_log=False)


def logging(comm, level, message):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to make a logging module that we can import, so that we do not need to redefine this function in every py mapping file.

We don't have to do this now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will do this in my next PR.

"""
Logs a message to the console or log file, based on the specified logging level.

This function ensures that logging is only performed by the root process (`rank 0`)
in a distributed computing environment. The function maps the logging level to
appropriate logger methods and defaults to the 'INFO' level if an invalid level is provided.

Parameters:
comm: object
The communicator object, typically from a distributed computing framework
(e.g., MPI). It must have a `rank()` method to determine the process rank.
level: str
The logging level as a string. Supported levels are:
- 'DEBUG'
- 'INFO'
- 'WARNING'
- 'ERROR'
- 'CRITICAL'
If an invalid level is provided, a warning will be logged, and the level
will default to 'INFO'.
message: str
The message to be logged.

Behavior:
- Logs messages only on the root process (`comm.rank() == 0`).
- Maps the provided logging level to a method of the logger object.
- Defaults to 'INFO' and logs a warning if an invalid logging level is given.
- Supports standard logging levels for granular control over log verbosity.

Example:
>>> logging(comm, 'DEBUG', 'This is a debug message.')
>>> logging(comm, 'ERROR', 'An error occurred!')

Notes:
- Ensure that a global `logger` object is configured before using this function.
- The `comm` object should conform to MPI-like conventions (e.g., `rank()` method).
"""

if comm.rank() == 0:
# Define a dictionary to map levels to logger methods
log_methods = {
'DEBUG': logger.debug,
'INFO': logger.info,
'WARNING': logger.warning,
'ERROR': logger.error,
'CRITICAL': logger.critical,
}

# Get the appropriate logging method, default to 'INFO'
log_method = log_methods.get(level.upper(), logger.info)

if log_method == logger.info and level.upper() not in log_methods:
# Log a warning if the level is invalid
logger.warning(f'log level = {level}: not a valid level --> set to INFO')

# Call the logging method
log_method(message)


def _mask_container(container, mask):

new_container = bufr.DataContainer()
for var_name in container.list():
var = container.get(var_name)
paths = container.get_paths(var_name)
new_container.add(var_name, var[mask], paths)

return new_container


def _make_description(mapping_path, update=False):

description = bufr.encoders.Description(mapping_path)

return description


def _make_obs(comm, input_path, mapping_path):
"""
Create the ioda snow depth observations:
- reads state of ground (sogr) and snow depth (snod)
- applys sogr conditions to the missing snod values
- removes the filled/missing snow values and creates the masked container

Parameters
----------
comm: object
The communicator object (e.g., MPI)
input_path: str
The input bufr file
mapping_path: str
The input bufr2ioda mapping file
"""

# Get container from mapping file first
logging(comm, 'INFO', 'Get container from bufr')
container = bufr.Parser(input_path, mapping_path).parse(comm)

logging(comm, 'DEBUG', f'container list (original): {container.list()}')

# Add new/derived data into container
sogr = container.get('variables/groundState')
snod = container.get('variables/totalSnowDepth')
snod[(sogr <= 11.0) & snod.mask] = 0.0
snod[(sogr == 15.0) & snod.mask] = 0.0
container.replace('variables/totalSnowDepth', snod)
snod_upd = container.get('variables/totalSnowDepth')

masked_container = _mask_container(container, (~snod.mask))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of making those snod values equal 0, did you mean to mask them out instead? In this case you can modify the snod.mask directly (the True value means to mask out for numpy masked arrays).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rmclaren I took the sfcsno Python code from GDASApp PR #1368 and modified it to following our Python code template. I did not change how the derived variable was done. For sfcsno converter, the data is read out from BUFR (10K+ data) and then applies data filtering to remove missing data. The output file ends up with much less data (~3K).


return masked_container


def create_obs_group(input_path, mapping_path, env):

comm = bufr.mpi.Comm(env["comm_name"])

description = _make_description(mapping_path, update=False)
container = _make_obs(comm, input_path, mapping_path)

# Gather data from all tasks into all tasks. Each task will have the complete record
logging(comm, 'INFO', f'Gather data from all tasks into all tasks')
container.all_gather(comm)

# Encode the data
logging(comm, 'INFO', f'Encode data')
data = next(iter(iodaEncoder(mapping_path).encode(container).values()))

logging(comm, 'INFO', f'Return the encoded data')

return data


def create_obs_file(input_path, mapping_path, output_path):

comm = bufr.mpi.Comm("world")
container = _make_obs(comm, input_path, mapping_path)
container.gather(comm)

description = _make_description(mapping_path, update=False)

# Encode the data
if comm.rank() == 0:
netcdfEncoder(description).encode(container, output_path)

logging(comm, 'INFO', f'Return the encoded data')


if __name__ == '__main__':

start_time = time.time()

bufr.mpi.App(sys.argv)
comm = bufr.mpi.Comm("world")

# Required input arguments as positional arguments
parser = argparse.ArgumentParser(description="Convert BUFR to NetCDF using a mapping file.")
parser.add_argument('input', type=str, help='Input BUFR file')
parser.add_argument('mapping', type=str, help='BUFR2IODA Mapping File')
parser.add_argument('output', type=str, help='Output NetCDF file')

args = parser.parse_args()
mapping = args.mapping
infile = args.input
output = args.output

create_obs_file(infile, mapping, output)

end_time = time.time()
running_time = end_time - start_time
logging(comm, 'INFO', f'Total running time: {running_time}')
Loading