Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Copy necessary rapidpy modules to remove incompatible dependencies #19

Merged
merged 2 commits into from
Jul 10, 2024

Conversation

rileyhales
Copy link
Contributor

@rileyhales rileyhales commented Jul 9, 2024

Summary by CodeRabbit

  • New Features

    • Added capabilities for handling RAPID Qout outputs and data manipulation.
    • Introduced new class RAPID for managing RAPID program execution.
  • Bug Fixes

    • Updated import statements to ensure correct module loading.
  • Chores

    • Updated project version to 3.0.0.
    • Added requests library to dependencies and commented out RAPIDpy.
  • Improvements

    • Enhanced data handling with new helper functions for CSV and NetCDF files.
    • Improved metadata handling for time series at discrete point feature locations.

@rileyhales rileyhales requested a review from msouff July 9, 2024 22:53
Copy link
Contributor

coderabbitai bot commented Jul 9, 2024

Walkthrough

The update focuses on enhancing the geoglows_ecflow environment and functionalities mainly within the RAPIDpy module. This includes adding the requests library, commenting out RAPIDpy from the environment file, introducing new functions and classes for handling datasets, updating helper functions, refining the NetCDF conversion process, and adjusting import statements and version numbers for better maintainability and functionality.

Changes

File/Path Summary
environment.yml Added requests library; commented out RAPIDpy.
geoglows_ecflow/.../RAPIDpy/dataset.py Added function compare_qout_files and class RAPIDDataset with methods for data handling.
geoglows_ecflow/.../RAPIDpy/helper_functions.py Added helper functions for file handling, logging, and data comparison.
geoglows_ecflow/.../RAPIDpy/postprocess_merge.py Added methods to ConvertRAPIDOutputToCF for file validation, initialization, and data writing.
geoglows_ecflow/.../RAPIDpy/rapid.py Introduced methods for parameter update, namelist generation, RAPID execution, and data conversion.
geoglows_ecflow/.../run_rapid_forecast.py Updated import statement to reflect the specific module structure of RAPIDpy.
pyproject.toml Updated project version from "2.2.2" to "3.0.0".

Poem

In code we trust, a rabbit's delight,
New functions bloom, making workflows bright.
We fetch and fetch, requests in our stack,
Comment old lines, there's no going back.
With datasets clear and helpers in tow,
The RAPID flow grows, steady and slow. 🐇✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

Share
Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai generate interesting stats about this repository and render them as a table.
    • @coderabbitai show all the console.log statements in this repository.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (invoked as PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Additionally, you can add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.

CodeRabbit Configration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 16

Outside diff range and nitpick comments (6)
geoglows_ecflow/resources/run_rapid_forecast.py (1)

Line range hint 9-215:
Consider breaking down the function into smaller, modular functions.

The rapid_forecast_exec function is large and handles multiple tasks. Breaking it down into smaller functions would improve readability and maintainability.

def setup_directories(mp_execute_directory, subprocess_forecast_log_dir):
    if not os.path.exists(mp_execute_directory):
        os.mkdir(mp_execute_directory)
    if not os.path.exists(subprocess_forecast_log_dir):
        os.mkdir(subprocess_forecast_log_dir)

def read_configuration(workspace, job_id):
    with open(os.path.join(workspace, "rapid_run.json"), "r") as f:
        data = json.load(f)
        job = data.get(job_id, {})
        if not job:
            raise ValueError(f"Job {job_id} not found.")
        return data["date"], job

def prepare_directories(job_id, mp_execute_directory, subprocess_forecast_log_dir):
    execute_directory = os.path.join(mp_execute_directory, job_id)
    output_base_dir = os.path.dirname(master_rapid_outflow_file)
    if not os.path.exists(execute_directory):
        os.mkdir(execute_directory)
    if not os.path.exists(output_base_dir):
        os.makedirs(output_base_dir)
    return execute_directory, output_base_dir

def rapid_forecast_exec(
    workspace: str,
    job_id: str,
    rapid_executable_location: str,
    mp_execute_directory: str,
    subprocess_forecast_log_dir: str,
) -> None:
    """Runs GEOGloWS RAPID forecast."""
    setup_directories(mp_execute_directory, subprocess_forecast_log_dir)
    date, job = read_configuration(workspace, job_id)
    execute_directory, output_base_dir = prepare_directories(job_id, mp_execute_directory, subprocess_forecast_log_dir)
    # Continue with the rest of the function...
geoglows_ecflow/resources/RAPIDpy/dataset.py (5)

24-28: Add parameter types and return type to the function docstring.

The function docstring should specify the types of dataset1_path and dataset2_path parameters and the return type for better clarity.

def compare_qout_files(dataset1_path: str, dataset2_path: str) -> bool:
    """
    This function compares the output of RAPID Qout and tells you where
    they are different.
    """

138-144: Add type hints to the __init__ method parameters.

Type hints improve code readability and help with static analysis.

def __init__(self, filename: str,
             river_id_dimension: str = "",
             river_id_variable: str = "",
             streamflow_variable: str = "",
             datetime_simulation_start: Optional[datetime.datetime] = None,
             simulation_time_step_seconds: Optional[int] = None,
             out_tzinfo: Optional[tzinfo] = None):

416-534: Add type hints to the method parameters and return type.

Type hints improve code readability and help with static analysis.

def get_time_index_range(self,
                         date_search_start: Optional[datetime.datetime] = None,
                         date_search_end: Optional[datetime.datetime] = None,
                         time_index_start: Optional[int] = None,
                         time_index_end: Optional[int] = None,
                         time_index: Optional[int] = None) -> np.ndarray:

590-633: Add type hints to the method parameters and return type.

Type hints improve code readability and help with static analysis.

def get_subset_riverid_index_list(self, river_id_list: List[int]) -> Tuple[np.ndarray, np.ndarray, List[int]]:

826-951: Add type hints to the method parameters and return type.

Type hints improve code readability and help with static analysis.

def write_flows_to_csv(self, path_to_output_file: str,
                       river_index: Optional[int] = None,
                       river_id: Optional[int] = None,
                       date_search_start: Optional[datetime.datetime] = None,
                       date_search_end: Optional[datetime.datetime] = None,
                       daily: bool = False,
                       filter_mode: str = "mean") -> None:
Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between b9a6406 and 8e83221.

Files selected for processing (7)
  • environment.yml (1 hunks)
  • geoglows_ecflow/resources/RAPIDpy/dataset.py (1 hunks)
  • geoglows_ecflow/resources/RAPIDpy/helper_functions.py (1 hunks)
  • geoglows_ecflow/resources/RAPIDpy/postprocess_merge.py (1 hunks)
  • geoglows_ecflow/resources/RAPIDpy/rapid.py (1 hunks)
  • geoglows_ecflow/resources/run_rapid_forecast.py (1 hunks)
  • pyproject.toml (1 hunks)
Files skipped from review due to trivial changes (2)
  • environment.yml
  • pyproject.toml
Additional context used
Ruff
geoglows_ecflow/resources/RAPIDpy/helper_functions.py

34-34: Use context handler for opening files

(SIM115)


129-132: Use contextlib.suppress(OSError) instead of try-except-pass

Replace with contextlib.suppress(OSError)

(SIM105)

geoglows_ecflow/resources/RAPIDpy/postprocess_merge.py

488-488: Loop control variable comid not used within loop body

(B007)

geoglows_ecflow/resources/RAPIDpy/rapid.py

483-486: Use contextlib.suppress(OSError) instead of try-except-pass

Replace with contextlib.suppress(OSError)

(SIM105)


533-536: Use contextlib.suppress(ValueError) instead of try-except-pass

Replace with contextlib.suppress(ValueError)

(SIM105)


755-758: Use contextlib.suppress(OSError) instead of try-except-pass

Replace with contextlib.suppress(OSError)

(SIM105)


896-896: Do not perform function call datetime.datetime.utcnow in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


1154-1157: Use contextlib.suppress(IndexError) instead of try-except-pass

Replace with contextlib.suppress(IndexError)

(SIM105)

geoglows_ecflow/resources/RAPIDpy/dataset.py

271-273: Use a single if statement instead of nested if statements

(SIM102)


271-271: Use key in dict instead of key in dict.keys()

Remove .keys()

(SIM118)


272-273: Use a single if statement instead of nested if statements

(SIM102)


404-404: Do not use bare except

(E722)


587-588: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


755-757: Use a single if statement instead of nested if statements

(SIM102)


756-757: Use a single if statement instead of nested if statements

(SIM102)


818-818: Do not use bare except

(E722)

Additional comments not posted (30)
geoglows_ecflow/resources/RAPIDpy/helper_functions.py (6)

37-54: LGTM!

The logging function is correctly implemented.


57-71: LGTM!

The function for reading CSV files to a list is correctly implemented.


73-79: LGTM!

The function for reading NetCDF files to a list is correctly implemented.


82-114: LGTM!

The function for comparing CSV files with decimal values is correctly implemented.


117-121: LGTM!

The function for comparing CSV timeseries files is correctly implemented.


135-145: LGTM!

The function for adding latitude and longitude metadata is correctly implemented.

geoglows_ecflow/resources/run_rapid_forecast.py (1)

6-6: LGTM!

The import statement is correctly updated to specifically import the RAPID class from RAPIDpy.rapid.

geoglows_ecflow/resources/RAPIDpy/postprocess_merge.py (7)

128-161: LGTM!

The __init__ method is correctly implemented.


162-197: LGTM!

The _validate_raw_nc method is correctly implemented.


199-268: LGTM!

The _initialize_output method is correctly implemented.


319-395: LGTM!

The _write_comid_lat_lon_z method is correctly implemented.


399-421: LGTM!

The _generate_time_values method is correctly implemented.


423-490: LGTM!

The _copy_streamflow_values method is correctly implemented.

Tools
Ruff

488-488: Loop control variable comid not used within loop body

(B007)


492-539: LGTM!

The convert method is correctly implemented.

geoglows_ecflow/resources/RAPIDpy/rapid.py (10)

92-255: Initialization method looks good!

The __init__ method correctly initializes the RAPID class with appropriate parameters and default values.


257-270: Cygwin path conversion method looks good!

The _get_cygwin_path method correctly converts a Windows path to a Cygwin path and handles errors appropriately.


272-286: Cygwin symlink creation method looks good!

The _create_symlink_cygwin method correctly creates a symbolic link using Cygwin's ln.exe and handles errors appropriately.


288-299: Cygwin dos2unix conversion method looks good!

The _dos2unix_cygwin method correctly converts a file to Unix format using Cygwin's dos2unix.exe and handles errors appropriately.


299-344: Parameter update method looks good!

The update_parameters method correctly updates class attributes based on keyword arguments and handles invalid parameters appropriately.


344-431: Reach number data update method looks good!

The update_reach_number_data method correctly updates reach number data based on input files and handles missing files appropriately.


431-471: Simulation runtime update method looks good!

The update_simulation_runtime method correctly updates the total simulation duration based on the m3 file and time step, handling missing parameters appropriately.


557-633: CF compliance method looks good!

The make_output_cf_compliant method correctly converts the RAPID output to be CF compliant, handling existing compliance appropriately.


806-891: Qinit generation method looks good!

The generate_qinit_from_past_qout method correctly generates a qinit file from a RAPID qout file, handling required parameters and data reordering appropriately.


806-891: Qinit generation method looks good!

The generate_qinit_from_past_qout method correctly generates a qinit file from a RAPID qout file, handling required parameters and data reordering appropriately.

geoglows_ecflow/resources/RAPIDpy/dataset.py (6)

226-227: LGTM!

The __enter__ method is correct.


229-230: LGTM!

The __exit__ method is correct.


232-234: LGTM!

The close method is correct.


236-246: LGTM!

The _is_legacy_time_valid method is correct.


288-292: LGTM!

The raise_time_valid method is correct.


536-555: LGTM!

The get_river_id_array method is correct.

Comment on lines +22 to +34
def open_csv(csv_file, mode='r'):
"""
Get mode depending on Python version
Based on: http://stackoverflow.com/questions/29840849/writing-a-csv-file-in-python-that-works-for-both-python-2-7-and-python-3-3-in
""" # noqa
if version_info[0] == 2: # Not named on 2.6
access = '{0}b'.format(mode)
kwargs = {}
else:
access = '{0}t'.format(mode)
kwargs = {'newline': ''}

return open(csv_file, access, **kwargs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use context manager for opening files.

To ensure the file is properly closed, use a context manager.

def open_csv(csv_file, mode='r'):
    """
    Get mode depending on Python version
    Based on: http://stackoverflow.com/questions/29840849/writing-a-csv-file-in-python-that-works-for-both-python-2-7-and-python-3-3-in
    """  # noqa
    if version_info[0] == 2:  # Not named on 2.6
        access = '{0}b'.format(mode)
        kwargs = {}
    else:
        access = '{0}t'.format(mode)
        kwargs = {'newline': ''}

    with open(csv_file, access, **kwargs) as f:
        return f
Tools
Ruff

34-34: Use context handler for opening files

(SIM115)

Comment on lines +124 to +132
def remove_files(*args):
"""
This function removes all files input as arguments
"""
for arg in args:
try:
remove(arg)
except OSError:
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use contextlib.suppress for handling OSError.

To make the code cleaner, use contextlib.suppress(OSError) instead of try-except-pass.

from contextlib import suppress

def remove_files(*args):
    """
    This function removes all files input as arguments
    """
    for arg in args:
        with suppress(OSError):
            remove(arg)
Tools
Ruff

129-132: Use contextlib.suppress(OSError) instead of try-except-pass

Replace with contextlib.suppress(OSError)

(SIM105)

Comment on lines +634 to +804
use_all_processors=True,
ZS_TauR=24*3600,
ZS_dtR=15*60,
ZS_TauM=365*24*3600,
ZS_dtM=24*3600
)

rapid_input = 'C:/cygwin64/home/username/rapid-io/input'
rapid_output = 'C:/cygwin64/home/username/rapid-io/output'
rapid_manager.update_parameters(
rapid_connect_file=path.join(rapid_input, 'rapid_connect.csv'),
Vlat_file=path.join(rapid_input, 'm3_riv.nc'),
riv_bas_id_file=path.join(rapid_input, 'riv_bas_id.csv'),
k_file=path.join(rapid_input, 'k.csv'),
x_file=path.join(rapid_input, 'x.csv'),
Qout_file=path.join(rapid_output, 'Qout.nc'),
)

rapid_manager.update_reach_number_data()
rapid_manager.update_simulation_runtime()
rapid_manager.run()
"""
if not self._rapid_executable_location:
log("Missing rapid_executable_location. "
"Please set before running this function ...",
"ERROR")

time_start = datetime.datetime.utcnow()
temp_rapid_namelist_file = os.path.join(os.getcwd(), "rapid_namelist")

if not rapid_namelist_file or not os.path.exists(rapid_namelist_file):
# generate input file if it does not exist
self.generate_namelist_file(temp_rapid_namelist_file)
with open(temp_rapid_namelist_file, 'r') as file_:
log(file_.read(), "INFO")
else:
# update existing file
self.update_namelist_file(rapid_namelist_file,
temp_rapid_namelist_file)

local_rapid_executable_location = \
os.path.join(os.path.dirname(temp_rapid_namelist_file),
"rapid_exe_symlink")

def rapid_cleanup(*args):
"""
Cleans up the rapid files generated by the process
"""
for arg in args:
# remove files
try:
os.remove(arg)
except OSError:
pass

# create link to RAPID if needed
temp_link_to_rapid = ""
# pylint: disable=no-member
if self._rapid_executable_location != \
local_rapid_executable_location:
rapid_cleanup(local_rapid_executable_location)
if os.name == "nt":
self._create_symlink_cygwin(self._rapid_executable_location,
local_rapid_executable_location)
else:
os.symlink(self._rapid_executable_location,
local_rapid_executable_location)
temp_link_to_rapid = local_rapid_executable_location

# run RAPID
log("Running RAPID ...",
"INFO")
if os.name == "nt":
local_rapid_executable_location = \
self._get_cygwin_path(local_rapid_executable_location)

# htcondor will not allow mpiexec for single processor jobs
# this was added for that purpose
run_rapid_command = [local_rapid_executable_location,
"-ksp_type", self._ksp_type]

if self._num_processors > 1:
run_rapid_command = [self._mpiexec_command,
"-n", str(self._num_processors)] \
+ run_rapid_command

process = Popen(run_rapid_command,
stdout=PIPE, stderr=PIPE, shell=False)
out, err = process.communicate()
if err:
rapid_cleanup(temp_link_to_rapid, temp_rapid_namelist_file)
raise Exception(err)
else:
log('RAPID output:',
"INFO")
for line in out.split(b'\n'):
print(line)
rapid_cleanup(temp_link_to_rapid, temp_rapid_namelist_file)
log("Time to run RAPID: %s" % (datetime.datetime.utcnow() - time_start),
"INFO")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Run method looks good!

The run method correctly generates a namelist file and runs the RAPID program, handling required parameters and symbolic links appropriately.

Consider using contextlib.suppress for handling OSError to simplify the code.

-            try:
-                os.remove(arg)
-            except OSError:
-                pass
+            with contextlib.suppress(OSError):
+                os.remove(arg)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def run(self, rapid_namelist_file=""):
"""
Run RAPID program and generate file based on inputs
This will generate your rapid_namelist file and run RAPID from wherever
you call this script (your working directory).
Parameters
----------
rapid_namelist_file: str, optional
Path of namelist file to use in the simulation.
It will be updated with any parameters added to the RAPID manager.
Linux Example:
.. code:: python
from RAPIDpy import RAPID
rapid_manager = RAPID(
rapid_executable_location='~/work/rapid/src/rapid'
use_all_processors=True,
)
rapid_manager.update_parameters(
rapid_connect_file='../rapid-io/input/rapid_connect.csv',
Vlat_file='../rapid-io/input/m3_riv.nc',
riv_bas_id_file='../rapid-io/input/riv_bas_id.csv',
k_file='../rapid-io/input/k.csv',
x_file='../rapid-io/input/x.csv',
Qout_file='../rapid-io/output/Qout.nc',
)
rapid_manager.update_reach_number_data()
rapid_manager.update_simulation_runtime()
rapid_manager.run(
rapid_namelist_file='../rapid-io/input/rapid_namelist')
Linux Reservoir Forcing Flows Example:
.. code:: python
from RAPIDpy import RAPID
rapid_manager = RAPID(
rapid_executable_location='~/work/rapid/src/rapid',
num_processors=4,
IS_for_tot=4,
IS_for_use=4,
for_tot_id_file='../rapid-io/input/dam_id.csv',
for_use_id_file='../rapid-io/input/dam_id.csv',
Qfor_file='../rapid-io/input/qout_dams.csv',
ZS_dtF=86400,
BS_opt_for=True,
)
rapid_manager.run(
rapid_namelist_file='../rapid-io/input/rapid_namelist_regular')
Windows with Cygwin Example:
.. code:: python
from RAPIDpy import RAPID
from os import path
rapid_exe_path = 'C:/cygwin64/home/username/rapid/run/rapid',
rapid_manager = RAPID(
rapid_executable_location=rapid_exe_path,
cygwin_bin_location='C:/cygwin64/bin',
use_all_processors=True,
ZS_TauR=24*3600,
ZS_dtR=15*60,
ZS_TauM=365*24*3600,
ZS_dtM=24*3600
)
rapid_input = 'C:/cygwin64/home/username/rapid-io/input'
rapid_output = 'C:/cygwin64/home/username/rapid-io/output'
rapid_manager.update_parameters(
rapid_connect_file=path.join(rapid_input, 'rapid_connect.csv'),
Vlat_file=path.join(rapid_input, 'm3_riv.nc'),
riv_bas_id_file=path.join(rapid_input, 'riv_bas_id.csv'),
k_file=path.join(rapid_input, 'k.csv'),
x_file=path.join(rapid_input, 'x.csv'),
Qout_file=path.join(rapid_output, 'Qout.nc'),
)
rapid_manager.update_reach_number_data()
rapid_manager.update_simulation_runtime()
rapid_manager.run()
"""
if not self._rapid_executable_location:
log("Missing rapid_executable_location. "
"Please set before running this function ...",
"ERROR")
time_start = datetime.datetime.utcnow()
temp_rapid_namelist_file = os.path.join(os.getcwd(), "rapid_namelist")
if not rapid_namelist_file or not os.path.exists(rapid_namelist_file):
# generate input file if it does not exist
self.generate_namelist_file(temp_rapid_namelist_file)
with open(temp_rapid_namelist_file, 'r') as file_:
log(file_.read(), "INFO")
else:
# update existing file
self.update_namelist_file(rapid_namelist_file,
temp_rapid_namelist_file)
local_rapid_executable_location = \
os.path.join(os.path.dirname(temp_rapid_namelist_file),
"rapid_exe_symlink")
def rapid_cleanup(*args):
"""
Cleans up the rapid files generated by the process
"""
for arg in args:
# remove files
try:
os.remove(arg)
except OSError:
pass
# create link to RAPID if needed
temp_link_to_rapid = ""
# pylint: disable=no-member
if self._rapid_executable_location != \
local_rapid_executable_location:
rapid_cleanup(local_rapid_executable_location)
if os.name == "nt":
self._create_symlink_cygwin(self._rapid_executable_location,
local_rapid_executable_location)
else:
os.symlink(self._rapid_executable_location,
local_rapid_executable_location)
temp_link_to_rapid = local_rapid_executable_location
# run RAPID
log("Running RAPID ...",
"INFO")
if os.name == "nt":
local_rapid_executable_location = \
self._get_cygwin_path(local_rapid_executable_location)
# htcondor will not allow mpiexec for single processor jobs
# this was added for that purpose
run_rapid_command = [local_rapid_executable_location,
"-ksp_type", self._ksp_type]
if self._num_processors > 1:
run_rapid_command = [self._mpiexec_command,
"-n", str(self._num_processors)] \
+ run_rapid_command
process = Popen(run_rapid_command,
stdout=PIPE, stderr=PIPE, shell=False)
out, err = process.communicate()
if err:
rapid_cleanup(temp_link_to_rapid, temp_rapid_namelist_file)
raise Exception(err)
else:
log('RAPID output:',
"INFO")
for line in out.split(b'\n'):
print(line)
rapid_cleanup(temp_link_to_rapid, temp_rapid_namelist_file)
log("Time to run RAPID: %s" % (datetime.datetime.utcnow() - time_start),
"INFO")
with contextlib.suppress(OSError):
os.remove(arg)
Tools
Ruff

755-758: Use contextlib.suppress(OSError) instead of try-except-pass

Replace with contextlib.suppress(OSError)

(SIM105)

Comment on lines +1009 to +1224

gage_id_list = []
for row in reach_id_gage_id_list[1:]:
station_id = row[1]
if len(row[1]) == 7:
station_id = '0' + row[1]
gage_id_list.append(station_id)

num_gage_id_list = np.array(gage_id_list, dtype=np.int32)
log("Querying Server for Data ...",
"INFO")

query_params = {
'format': 'json',
'sites': ",".join(gage_id_list),
'startDT': start_datetime.strftime("%Y-%m-%d"),
'endDT': end_datetime.strftime("%Y-%m-%d"),
'parameterCd': '00060', # streamflow
'statCd': '00003' # average
}
response = get("http://waterservices.usgs.gov/nwis/dv",
params=query_params)

if not response.ok:
log("USGS query error ...",
"WARNING")
return

requested_data = None
try:
requested_data = response.json()['value']['timeSeries']
except IndexError:
pass

if requested_data is not None:
for time_series in enumerate(requested_data):
usgs_station_full_name = time_series[1]['name']
usgs_station_id = usgs_station_full_name.split(":")[1]
gage_data = []
for time_step in time_series[1]['values'][0]['value']:
local_datetime = parse(time_step['dateTime'])
if local_datetime > end_datetime:
break

if local_datetime >= start_datetime:
if not time_step['value']:
log("MISSING DATA for USGS Station {0} {1} {2}"
.format(usgs_station_id,
local_datetime,
time_step['value']),
"WARNING")
gage_data.append(
float(time_step['value']) / 35.3146667)

try:
# get where streamids associated with USGS station ID
streamid_index = \
np.where(num_gage_id_list ==
int(float(usgs_station_id)))[0][0] + 1
except (IndexError, ValueError):
log("USGS Station {0} not found in list ..."
.format(usgs_station_id),
"WARNING")
raise

if len(gage_data) == num_days_needed:
gage_data_matrix.append(gage_data)
valid_comid_list.append(
reach_id_gage_id_list[streamid_index][0])
else:
log("StreamID {0} USGS Station {1} MISSING {2} "
"DATA VALUES".format(
reach_id_gage_id_list[streamid_index][0],
usgs_station_id,
num_days_needed - len(gage_data)),
"WARNING")

if gage_data_matrix and valid_comid_list:
log("Writing Output ...",
"INFO")
np_array = np.array(gage_data_matrix).transpose()
with open_csv(out_streamflow_file, 'w') as gage_data:
wgd = csvwriter(gage_data)
for row in np_array:
wgd.writerow(row)

with open_csv(out_stream_id_file, 'w') as comid_data:
wcd = csvwriter(comid_data)
for row in valid_comid_list:
wcd.writerow([int(float(row))])

# set parameters for RAPID run
self.IS_obs_tot = len(valid_comid_list)
self.obs_tot_id_file = out_stream_id_file
self.Qobs_file = out_streamflow_file
self.IS_obs_use = len(valid_comid_list)
self.obs_use_id_file = out_stream_id_file
else:
log("No valid data returned ...",
"WARNING")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

USGS average daily flows generation method looks good!

The generate_usgs_avg_daily_flows_opt method correctly generates daily streamflow and stream ID files based on USGS gage IDs, handling data querying and processing appropriately.

Consider using contextlib.suppress for handling IndexError to simplify the code.

-        try:
-            requested_data = response.json()['value']['timeSeries']
-        except IndexError:
-            pass
+        with contextlib.suppress(IndexError):
+            requested_data = response.json()['value']['timeSeries']
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def generate_usgs_avg_daily_flows_opt(self,
reach_id_gage_id_file,
start_datetime,
end_datetime,
out_streamflow_file,
out_stream_id_file):
"""
Generate daily streamflow file and stream id file required for
calibration or for substituting flows based on USGS gage ids
associated with stream ids.
Parameters
----------
reach_id_gage_id_file: str
Path to reach_id_gage_id file.
start_datetime: datetime
A datetime object with the start date to download data.
end_datetime: datetime
A datetime object with the end date to download data.
out_streamflow_file: str
The path to output the streamflow file for RAPID.
out_stream_id_file: str
The path to output the stream ID file associated with the
streamflow file for RAPID.
Example *reach_id_gage_id_file*::
COMID, USGS_GAGE_ID
2000, 503944
...
.. warning:: Overuse will get you blocked from downloading data from
USGS.
.. warning:: This code does not clean the data in any way. Thus, you
are likely to run into issues if you simply use the raw
data.
.. warning:: The code skips gages that do not have data
for the entire time period.
Simple Example:
.. code:: python
import datetime
from os.path import join
from RAPIDpy import RAPID
main_path = "/home/username/data"
rapid_manager = RAPID()
rapid_manager.generate_usgs_avg_daily_flows_opt(
reach_id_gage_id_file=join(main_path, "usgsgage_id_comid.csv"),
start_datetime=datetime.datetime(2000,1,1),
end_datetime=datetime.datetime(2014,12,31),
out_streamflow_file=join(main_path,"streamflow_2000_2014.csv"),
out_stream_id_file=join(main_path,"streamid_2000_2014.csv")
)
Complex Example:
.. code:: python
import datetime
from os.path import join
from RAPIDpy import RAPID
main_path = "/home/username/data"
rapid_manager = RAPID(
rapid_executable_location='~/work/rapid/run/rapid'
use_all_processors=True,
ZS_TauR=24*3600,
ZS_dtR=15*60,
ZS_TauM=365*24*3600,
ZS_dtM=24*3600
)
rapid_manager.update_parameters(
rapid_connect_file='../rapid-io/input/rapid_connect.csv',
Vlat_file='../rapid-io/input/m3_riv.nc',
riv_bas_id_file='../rapid-io/input/riv_bas_id.csv',
k_file='../rapid-io/input/k.csv',
x_file='../rapid-io/input/x.csv',
Qout_file='../rapid-io/output/Qout.nc',
)
rapid_manager.update_reach_number_data()
rapid_manager.update_simulation_runtime()
rapid_manager.generate_usgs_avg_daily_flows_opt(
reach_id_gage_id_file=join(main_path, "usgsgage_id_comid.csv"),
start_datetime=datetime.datetime(2000,1,1),
end_datetime=datetime.datetime(2014,12,31),
out_streamflow_file=join(main_path,"streamflow_2000_2014.csv"),
out_stream_id_file=join(main_path,"streamid_2000_2014.csv")
)
rapid_manager.run()
"""
log("Generating avg streamflow file and stream id file "
"required for calibration ...",
"INFO")
log("Generating avg streamflow file and stream id file "
"required for calibration ...",
"INFO")
reach_id_gage_id_list = csv_to_list(reach_id_gage_id_file)
gage_data_matrix = []
valid_comid_list = []
# add extra day as it includes the start date
# (e.g. 7-5 is 2 days, but have data for 5,6,7, so +1)
num_days_needed = (end_datetime - start_datetime).days + 1
gage_id_list = []
for row in reach_id_gage_id_list[1:]:
station_id = row[1]
if len(row[1]) == 7:
station_id = '0' + row[1]
gage_id_list.append(station_id)
num_gage_id_list = np.array(gage_id_list, dtype=np.int32)
log("Querying Server for Data ...",
"INFO")
query_params = {
'format': 'json',
'sites': ",".join(gage_id_list),
'startDT': start_datetime.strftime("%Y-%m-%d"),
'endDT': end_datetime.strftime("%Y-%m-%d"),
'parameterCd': '00060', # streamflow
'statCd': '00003' # average
}
response = get("http://waterservices.usgs.gov/nwis/dv",
params=query_params)
if not response.ok:
log("USGS query error ...",
"WARNING")
return
requested_data = None
try:
requested_data = response.json()['value']['timeSeries']
except IndexError:
pass
if requested_data is not None:
for time_series in enumerate(requested_data):
usgs_station_full_name = time_series[1]['name']
usgs_station_id = usgs_station_full_name.split(":")[1]
gage_data = []
for time_step in time_series[1]['values'][0]['value']:
local_datetime = parse(time_step['dateTime'])
if local_datetime > end_datetime:
break
if local_datetime >= start_datetime:
if not time_step['value']:
log("MISSING DATA for USGS Station {0} {1} {2}"
.format(usgs_station_id,
local_datetime,
time_step['value']),
"WARNING")
gage_data.append(
float(time_step['value']) / 35.3146667)
try:
# get where streamids associated with USGS station ID
streamid_index = \
np.where(num_gage_id_list ==
int(float(usgs_station_id)))[0][0] + 1
except (IndexError, ValueError):
log("USGS Station {0} not found in list ..."
.format(usgs_station_id),
"WARNING")
raise
if len(gage_data) == num_days_needed:
gage_data_matrix.append(gage_data)
valid_comid_list.append(
reach_id_gage_id_list[streamid_index][0])
else:
log("StreamID {0} USGS Station {1} MISSING {2} "
"DATA VALUES".format(
reach_id_gage_id_list[streamid_index][0],
usgs_station_id,
num_days_needed - len(gage_data)),
"WARNING")
if gage_data_matrix and valid_comid_list:
log("Writing Output ...",
"INFO")
np_array = np.array(gage_data_matrix).transpose()
with open_csv(out_streamflow_file, 'w') as gage_data:
wgd = csvwriter(gage_data)
for row in np_array:
wgd.writerow(row)
with open_csv(out_stream_id_file, 'w') as comid_data:
wcd = csvwriter(comid_data)
for row in valid_comid_list:
wcd.writerow([int(float(row))])
# set parameters for RAPID run
self.IS_obs_tot = len(valid_comid_list)
self.obs_tot_id_file = out_stream_id_file
self.Qobs_file = out_streamflow_file
self.IS_obs_use = len(valid_comid_list)
self.obs_use_id_file = out_stream_id_file
else:
log("No valid data returned ...",
"WARNING")
requested_data = None
with contextlib.suppress(IndexError):
requested_data = response.json()['value']['timeSeries']
Tools
Ruff

1154-1157: Use contextlib.suppress(IndexError) instead of try-except-pass

Replace with contextlib.suppress(IndexError)

(SIM105)


USGS average daily flows generation method looks good!

The generate_usgs_avg_daily_flows_opt method correctly generates daily streamflow and stream ID files based on USGS gage IDs, handling data querying and processing appropriately.

Consider using contextlib.suppress for handling IndexError to simplify the code.

-        try:
-            requested_data = response.json()['value']['timeSeries']
-        except IndexError:
-            pass
+        with contextlib.suppress(IndexError):
+            requested_data = response.json()['value']['timeSeries']
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def generate_usgs_avg_daily_flows_opt(self,
reach_id_gage_id_file,
start_datetime,
end_datetime,
out_streamflow_file,
out_stream_id_file):
"""
Generate daily streamflow file and stream id file required for
calibration or for substituting flows based on USGS gage ids
associated with stream ids.
Parameters
----------
reach_id_gage_id_file: str
Path to reach_id_gage_id file.
start_datetime: datetime
A datetime object with the start date to download data.
end_datetime: datetime
A datetime object with the end date to download data.
out_streamflow_file: str
The path to output the streamflow file for RAPID.
out_stream_id_file: str
The path to output the stream ID file associated with the
streamflow file for RAPID.
Example *reach_id_gage_id_file*::
COMID, USGS_GAGE_ID
2000, 503944
...
.. warning:: Overuse will get you blocked from downloading data from
USGS.
.. warning:: This code does not clean the data in any way. Thus, you
are likely to run into issues if you simply use the raw
data.
.. warning:: The code skips gages that do not have data
for the entire time period.
Simple Example:
.. code:: python
import datetime
from os.path import join
from RAPIDpy import RAPID
main_path = "/home/username/data"
rapid_manager = RAPID()
rapid_manager.generate_usgs_avg_daily_flows_opt(
reach_id_gage_id_file=join(main_path, "usgsgage_id_comid.csv"),
start_datetime=datetime.datetime(2000,1,1),
end_datetime=datetime.datetime(2014,12,31),
out_streamflow_file=join(main_path,"streamflow_2000_2014.csv"),
out_stream_id_file=join(main_path,"streamid_2000_2014.csv")
)
Complex Example:
.. code:: python
import datetime
from os.path import join
from RAPIDpy import RAPID
main_path = "/home/username/data"
rapid_manager = RAPID(
rapid_executable_location='~/work/rapid/run/rapid'
use_all_processors=True,
ZS_TauR=24*3600,
ZS_dtR=15*60,
ZS_TauM=365*24*3600,
ZS_dtM=24*3600
)
rapid_manager.update_parameters(
rapid_connect_file='../rapid-io/input/rapid_connect.csv',
Vlat_file='../rapid-io/input/m3_riv.nc',
riv_bas_id_file='../rapid-io/input/riv_bas_id.csv',
k_file='../rapid-io/input/k.csv',
x_file='../rapid-io/input/x.csv',
Qout_file='../rapid-io/output/Qout.nc',
)
rapid_manager.update_reach_number_data()
rapid_manager.update_simulation_runtime()
rapid_manager.generate_usgs_avg_daily_flows_opt(
reach_id_gage_id_file=join(main_path, "usgsgage_id_comid.csv"),
start_datetime=datetime.datetime(2000,1,1),
end_datetime=datetime.datetime(2014,12,31),
out_streamflow_file=join(main_path,"streamflow_2000_2014.csv"),
out_stream_id_file=join(main_path,"streamid_2000_2014.csv")
)
rapid_manager.run()
"""
log("Generating avg streamflow file and stream id file "
"required for calibration ...",
"INFO")
log("Generating avg streamflow file and stream id file "
"required for calibration ...",
"INFO")
reach_id_gage_id_list = csv_to_list(reach_id_gage_id_file)
gage_data_matrix = []
valid_comid_list = []
# add extra day as it includes the start date
# (e.g. 7-5 is 2 days, but have data for 5,6,7, so +1)
num_days_needed = (end_datetime - start_datetime).days + 1
gage_id_list = []
for row in reach_id_gage_id_list[1:]:
station_id = row[1]
if len(row[1]) == 7:
station_id = '0' + row[1]
gage_id_list.append(station_id)
num_gage_id_list = np.array(gage_id_list, dtype=np.int32)
log("Querying Server for Data ...",
"INFO")
query_params = {
'format': 'json',
'sites': ",".join(gage_id_list),
'startDT': start_datetime.strftime("%Y-%m-%d"),
'endDT': end_datetime.strftime("%Y-%m-%d"),
'parameterCd': '00060', # streamflow
'statCd': '00003' # average
}
response = get("http://waterservices.usgs.gov/nwis/dv",
params=query_params)
if not response.ok:
log("USGS query error ...",
"WARNING")
return
requested_data = None
try:
requested_data = response.json()['value']['timeSeries']
except IndexError:
pass
if requested_data is not None:
for time_series in enumerate(requested_data):
usgs_station_full_name = time_series[1]['name']
usgs_station_id = usgs_station_full_name.split(":")[1]
gage_data = []
for time_step in time_series[1]['values'][0]['value']:
local_datetime = parse(time_step['dateTime'])
if local_datetime > end_datetime:
break
if local_datetime >= start_datetime:
if not time_step['value']:
log("MISSING DATA for USGS Station {0} {1} {2}"
.format(usgs_station_id,
local_datetime,
time_step['value']),
"WARNING")
gage_data.append(
float(time_step['value']) / 35.3146667)
try:
# get where streamids associated with USGS station ID
streamid_index = \
np.where(num_gage_id_list ==
int(float(usgs_station_id)))[0][0] + 1
except (IndexError, ValueError):
log("USGS Station {0} not found in list ..."
.format(usgs_station_id),
"WARNING")
raise
if len(gage_data) == num_days_needed:
gage_data_matrix.append(gage_data)
valid_comid_list.append(
reach_id_gage_id_list[streamid_index][0])
else:
log("StreamID {0} USGS Station {1} MISSING {2} "
"DATA VALUES".format(
reach_id_gage_id_list[streamid_index][0],
usgs_station_id,
num_days_needed - len(gage_data)),
"WARNING")
if gage_data_matrix and valid_comid_list:
log("Writing Output ...",
"INFO")
np_array = np.array(gage_data_matrix).transpose()
with open_csv(out_streamflow_file, 'w') as gage_data:
wgd = csvwriter(gage_data)
for row in np_array:
wgd.writerow(row)
with open_csv(out_stream_id_file, 'w') as comid_data:
wcd = csvwriter(comid_data)
for row in valid_comid_list:
wcd.writerow([int(float(row))])
# set parameters for RAPID run
self.IS_obs_tot = len(valid_comid_list)
self.obs_tot_id_file = out_stream_id_file
self.Qobs_file = out_streamflow_file
self.IS_obs_use = len(valid_comid_list)
self.obs_use_id_file = out_stream_id_file
else:
log("No valid data returned ...",
"WARNING")
log("Generating avg streamflow file and stream id file "
"required for calibration ...",
"INFO")
log("Generating avg streamflow file and stream id file "
"required for calibration ...",
"INFO")
reach_id_gage_id_list = csv_to_list(reach_id_gage_id_file)
gage_data_matrix = []
valid_comid_list = []
# add extra day as it includes the start date
# (e.g. 7-5 is 2 days, but have data for 5,6,7, so +1)
num_days_needed = (end_datetime - start_datetime).days + 1
gage_id_list = []
for row in reach_id_gage_id_list[1:]:
station_id = row[1]
if len(row[1]) == 7:
station_id = '0' + row[1]
gage_id_list.append(station_id)
num_gage_id_list = np.array(gage_id_list, dtype=np.int32)
log("Querying Server for Data ...",
"INFO")
query_params = {
'format': 'json',
'sites': ",".join(gage_id_list),
'startDT': start_datetime.strftime("%Y-%m-%d"),
'endDT': end_datetime.strftime("%Y-%m-%d"),
'parameterCd': '00060', # streamflow
'statCd': '00003' # average
}
response = get("http://waterservices.usgs.gov/nwis/dv",
params=query_params)
if not response.ok:
log("USGS query error ...",
"WARNING")
return
requested_data = None
with contextlib.suppress(IndexError):
requested_data = response.json()['value']['timeSeries']
if requested_data is not None:
for time_series in enumerate(requested_data):
usgs_station_full_name = time_series[1]['name']
usgs_station_id = usgs_station_full_name.split(":")[1]
gage_data = []
for time_step in time_series[1]['values'][0]['value']:
local_datetime = parse(time_step['dateTime'])
if local_datetime > end_datetime:
break
if local_datetime >= start_datetime:
if not time_step['value']:
log("MISSING DATA for USGS Station {0} {1} {2}"
.format(usgs_station_id,
local_datetime,
time_step['value']),
"WARNING")
gage_data.append(
float(time_step['value']) / 35.3146667)
try:
# get where streamids associated with USGS station ID
streamid_index = \
np.where(num_gage_id_list ==
int(float(usgs_station_id)))[0][0] + 1
except (IndexError, ValueError):
log("USGS Station {0} not found in list ..."
.format(usgs_station_id),
"WARNING")
raise
if len(gage_data) == num_days_needed:
gage_data_matrix.append(gage_data)
valid_comid_list.append(
reach_id_gage_id_list[streamid_index][0])
else:
log("StreamID {0} USGS Station {1} MISSING {2} "
"DATA VALUES".format(
reach_id_gage_id_list[streamid_index][0],
usgs_station_id,
num_days_needed - len(gage_data)),
"WARNING")
if gage_data_matrix and valid_comid_list:
log("Writing Output ...",
"INFO")
np_array = np.array(gage_data_matrix).transpose()
with open_csv(out_streamflow_file, 'w') as gage_data:
wgd = csvwriter(gage_data)
for row in np_array:
wgd.writerow(row)
with open_csv(out_stream_id_file, 'w') as comid_data:
wcd = csvwriter(comid_data)
for row in valid_comid_list:
wcd.writerow([int(float(row))])
# set parameters for RAPID run
self.IS_obs_tot = len(valid_comid_list)
self.obs_tot_id_file = out_stream_id_file
self.Qobs_file = out_streamflow_file
self.IS_obs_use = len(valid_comid_list)
self.obs_use_id_file = out_stream_id_file
else:
log("No valid data returned ...",
"WARNING")
Tools
Ruff

1154-1157: Use contextlib.suppress(IndexError) instead of try-except-pass

Replace with contextlib.suppress(IndexError)

(SIM105)

Comment on lines +635 to +734
def get_qout(self,
river_id_array=None,
date_search_start=None,
date_search_end=None,
time_index_start=None,
time_index_end=None,
time_index=None,
time_index_array=None,
daily=False,
pd_filter=None,
filter_mode="mean",
as_dataframe=False):
"""
This method extracts streamflow data by a single river ID
or by a river ID array. It has options to extract by date
or by date index.

Parameters
----------
river_id_array: :obj:`numpy.array` or list or int, optional
A single river ID or an array of river IDs.
date_search_start: :obj:`datetime.datetime`, optional
This is a datetime object with the date of the minimum date
for starting.
date_search_end: :obj:`datetime.datetime`, optional
This is a datetime object with the date of the maximum date
for ending.
time_index_start: int, optional
This is the index of the start of the time array subset.
Useful for the old file version.
time_index_end: int, optional
This is the index of the end of the time array subset.
Useful for the old file version.
time_index: int, optional
This is the index of time to return in the case that your
code only wants one index. Used internally.
time_index_array: list or :obj:`numpy.array`, optional
This is used to extract the vales only for particular dates.
This can be from the *get_time_index_range* function.
daily: bool, optional
If true, this will convert qout to daily average.
pd_filter: str, optional
This is a valid pandas resample frequency filter.
filter_mode: str, optional
You can get the daily average "mean" or the maximum "max".
Default is "mean".
as_dataframe: bool, optional
Return as a pandas dataframe object. Default is False.


Returns
-------
qout_array: :obj:`numpy.array`
This is a 1D or 2D array or a single value depending on your
input search.


This example demonstrates how to retrieve the streamflow associated
with the reach you are interested in::

from RAPIDpy import RAPIDDataset

path_to_rapid_qout = '/path/to/Qout.nc'
river_id = 500
with RAPIDDataset(path_to_rapid_qout) as qout_nc:
streamflow_array = qout_nc.get_qout(river_id)

This example demonstrates how to retrieve the streamflow within a date
range associated with the reach you are interested in::

from RAPIDpy import RAPIDDataset

path_to_rapid_qout = '/path/to/Qout.nc'
river_id = 500
with RAPIDDataset(path_to_rapid_qout) as qout_nc:
streamflow_array = qout_nc.get_qout(
river_id,
date_search_start=datetime(1985,1,1),
date_search_end=datetime(1985,2,4))

"""
# get indices of where the streamflow data is
riverid_index_list_subset = None
if river_id_array is not None:
if not hasattr(river_id_array, "__len__"):
river_id_array = [river_id_array]
riverid_index_list_subset = \
self.get_subset_riverid_index_list(river_id_array)[0]

return self.get_qout_index(riverid_index_list_subset,
date_search_start,
date_search_end,
time_index_start,
time_index_end,
time_index,
time_index_array,
daily,
pd_filter,
filter_mode,
as_dataframe)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplify nested if statements.

The nested if statements can be simplified for better readability.

if river_id_array is not None:
    if not hasattr(river_id_array, "__len__"):
        river_id_array = [river_id_array]
    riverid_index_list_subset = self.get_subset_riverid_index_list(river_id_array)[0]

return self.get_qout_index(riverid_index_list_subset,
                           date_search_start,
                           date_search_end,
                           time_index_start,
                           time_index_end,
                           time_index,
                           time_index_array,
                           daily,
                           pd_filter,
                           filter_mode,
                           as_dataframe)

Committable suggestion was skipped due to low confidence.

Comment on lines +151 to +167
self.river_id_dimension = river_id_dimension
if not river_id_dimension:
if 'rivid' in self.qout_nc.dimensions:
self.river_id_dimension = 'rivid'
elif 'COMID' in self.qout_nc.dimensions:
self.river_id_dimension = 'COMID'
elif 'station' in self.qout_nc.dimensions:
self.river_id_dimension = 'station'
elif 'DrainLnID' in self.qout_nc.dimensions:
self.river_id_dimension = 'DrainLnID'
elif 'FEATUREID' in self.qout_nc.dimensions:
self.river_id_dimension = 'FEATUREID'
else:
raise IndexError('Could not find river ID dimension.')
elif river_id_dimension not in self.qout_nc.dimensions:
raise IndexError('Could not find river ID dimension:'
' {0}.'.format(river_id_dimension))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplify nested if-elif statements.

The nested if-elif statements can be simplified for better readability.

self.river_id_dimension = next((dim for dim in ['rivid', 'COMID', 'station', 'DrainLnID', 'FEATUREID']
                                if dim in self.qout_nc.dimensions), None)
if not self.river_id_dimension:
    raise IndexError('Could not find river ID dimension.')
elif river_id_dimension and river_id_dimension not in self.qout_nc.dimensions:
    raise IndexError('Could not find river ID dimension: {0}.'.format(river_id_dimension))
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
self.river_id_dimension = river_id_dimension
if not river_id_dimension:
if 'rivid' in self.qout_nc.dimensions:
self.river_id_dimension = 'rivid'
elif 'COMID' in self.qout_nc.dimensions:
self.river_id_dimension = 'COMID'
elif 'station' in self.qout_nc.dimensions:
self.river_id_dimension = 'station'
elif 'DrainLnID' in self.qout_nc.dimensions:
self.river_id_dimension = 'DrainLnID'
elif 'FEATUREID' in self.qout_nc.dimensions:
self.river_id_dimension = 'FEATUREID'
else:
raise IndexError('Could not find river ID dimension.')
elif river_id_dimension not in self.qout_nc.dimensions:
raise IndexError('Could not find river ID dimension:'
' {0}.'.format(river_id_dimension))
self.river_id_dimension = next((dim for dim in ['rivid', 'COMID', 'station', 'DrainLnID', 'FEATUREID']
if dim in self.qout_nc.dimensions), None)
if not self.river_id_dimension:
raise IndexError('Could not find river ID dimension.')
elif river_id_dimension and river_id_dimension not in self.qout_nc.dimensions:
raise IndexError('Could not find river ID dimension: {0}.'.format(river_id_dimension))

Comment on lines +736 to +823
def get_qout_index(self,
river_index_array=None,
date_search_start=None,
date_search_end=None,
time_index_start=None,
time_index_end=None,
time_index=None,
time_index_array=None,
daily=False,
pd_filter=None,
filter_mode="mean",
as_dataframe=False):
"""
This method extracts streamflow data by river index.
It allows for extracting single or multiple river streamflow arrays
It has options to extract by date or by date index.

See: :meth:`RAPIDpy.RAPIDDataset.get_qout`
"""
if river_index_array is not None:
if hasattr(river_index_array, "__len__"):
if len(river_index_array) == 1:
river_index_array = river_index_array[0]

if time_index_array is None:
time_index_array = self.get_time_index_range(date_search_start,
date_search_end,
time_index_start,
time_index_end,
time_index)

qout_variable = self.qout_nc.variables[self.q_var_name]
qout_dimensions = qout_variable.dimensions
if qout_dimensions[0].lower() == 'time' and \
qout_dimensions[1].lower() == self.river_id_dimension.lower():
if time_index_array is not None and river_index_array is not None:
streamflow_array = qout_variable[time_index_array,
river_index_array].transpose()
elif time_index_array is not None:
streamflow_array = qout_variable[time_index_array, :] \
.transpose()
elif river_index_array is not None:
streamflow_array = qout_variable[:, river_index_array] \
.transpose()
else:
streamflow_array = qout_variable[:].transpose()
elif qout_dimensions[1].lower() == 'time' and \
qout_dimensions[0].lower() == self.river_id_dimension.lower():
if time_index_array is not None and river_index_array is not None:
streamflow_array = qout_variable[river_index_array,
time_index_array]
elif time_index_array is not None:
streamflow_array = qout_variable[:, time_index_array]
elif river_index_array is not None:
streamflow_array = qout_variable[river_index_array, :]
else:
streamflow_array = qout_variable[:]
else:
raise Exception("Invalid RAPID Qout file dimensions ...")

if daily:
pd_filter = "D"

if pd_filter is not None or as_dataframe:
time_array = self.get_time_array(return_datetime=True,
time_index_array=time_index_array)
qout_df = pd.DataFrame(streamflow_array.T, index=time_array)

if pd_filter is not None:
qout_df = qout_df.resample(pd_filter)
if filter_mode == "mean":
qout_df = qout_df.mean()
elif filter_mode == "max":
qout_df = qout_df.max()
else:
raise Exception("Invalid filter_mode ...")

if as_dataframe:
return qout_df

try:
streamflow_array = qout_df.to_numpy().T
except:
streamflow_array = qout_df.as_matrix().T

if streamflow_array.ndim > 0 and streamflow_array.shape[0] == 1:
streamflow_array = streamflow_array[0]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplify nested if statements.

The nested if statements can be simplified for better readability.

if river_index_array is not None and hasattr(river_index_array, "__len__") and len(river_index_array) == 1:
    river_index_array = river_index_array[0]
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def get_qout_index(self,
river_index_array=None,
date_search_start=None,
date_search_end=None,
time_index_start=None,
time_index_end=None,
time_index=None,
time_index_array=None,
daily=False,
pd_filter=None,
filter_mode="mean",
as_dataframe=False):
"""
This method extracts streamflow data by river index.
It allows for extracting single or multiple river streamflow arrays
It has options to extract by date or by date index.
See: :meth:`RAPIDpy.RAPIDDataset.get_qout`
"""
if river_index_array is not None:
if hasattr(river_index_array, "__len__"):
if len(river_index_array) == 1:
river_index_array = river_index_array[0]
if time_index_array is None:
time_index_array = self.get_time_index_range(date_search_start,
date_search_end,
time_index_start,
time_index_end,
time_index)
qout_variable = self.qout_nc.variables[self.q_var_name]
qout_dimensions = qout_variable.dimensions
if qout_dimensions[0].lower() == 'time' and \
qout_dimensions[1].lower() == self.river_id_dimension.lower():
if time_index_array is not None and river_index_array is not None:
streamflow_array = qout_variable[time_index_array,
river_index_array].transpose()
elif time_index_array is not None:
streamflow_array = qout_variable[time_index_array, :] \
.transpose()
elif river_index_array is not None:
streamflow_array = qout_variable[:, river_index_array] \
.transpose()
else:
streamflow_array = qout_variable[:].transpose()
elif qout_dimensions[1].lower() == 'time' and \
qout_dimensions[0].lower() == self.river_id_dimension.lower():
if time_index_array is not None and river_index_array is not None:
streamflow_array = qout_variable[river_index_array,
time_index_array]
elif time_index_array is not None:
streamflow_array = qout_variable[:, time_index_array]
elif river_index_array is not None:
streamflow_array = qout_variable[river_index_array, :]
else:
streamflow_array = qout_variable[:]
else:
raise Exception("Invalid RAPID Qout file dimensions ...")
if daily:
pd_filter = "D"
if pd_filter is not None or as_dataframe:
time_array = self.get_time_array(return_datetime=True,
time_index_array=time_index_array)
qout_df = pd.DataFrame(streamflow_array.T, index=time_array)
if pd_filter is not None:
qout_df = qout_df.resample(pd_filter)
if filter_mode == "mean":
qout_df = qout_df.mean()
elif filter_mode == "max":
qout_df = qout_df.max()
else:
raise Exception("Invalid filter_mode ...")
if as_dataframe:
return qout_df
try:
streamflow_array = qout_df.to_numpy().T
except:
streamflow_array = qout_df.as_matrix().T
if streamflow_array.ndim > 0 and streamflow_array.shape[0] == 1:
streamflow_array = streamflow_array[0]
if river_index_array is not None and hasattr(river_index_array, "__len__") and len(river_index_array) == 1:
river_index_array = river_index_array[0]
if time_index_array is None:
time_index_array = self.get_time_index_range(date_search_start,
date_search_end,
time_index_start,
time_index_end,
time_index)
qout_variable = self.qout_nc.variables[self.q_var_name]
qout_dimensions = qout_variable.dimensions
if qout_dimensions[0].lower() == 'time' and \
qout_dimensions[1].lower() == self.river_id_dimension.lower():
if time_index_array is not None and river_index_array is not None:
streamflow_array = qout_variable[time_index_array,
river_index_array].transpose()
elif time_index_array is not None:
streamflow_array = qout_variable[time_index_array, :] \
.transpose()
elif river_index_array is not None:
streamflow_array = qout_variable[:, river_index_array] \
.transpose()
else:
streamflow_array = qout_variable[:].transpose()
elif qout_dimensions[1].lower() == 'time' and \
qout_dimensions[0].lower() == self.river_id_dimension.lower():
if time_index_array is not None and river_index_array is not None:
streamflow_array = qout_variable[river_index_array,
time_index_array]
elif time_index_array is not None:
streamflow_array = qout_variable[:, time_index_array]
elif river_index_array is not None:
streamflow_array = qout_variable[river_index_array, :]
else:
streamflow_array = qout_variable[:]
else:
raise Exception("Invalid RAPID Qout file dimensions ...")
if daily:
pd_filter = "D"
if pd_filter is not None or as_dataframe:
time_array = self.get_time_array(return_datetime=True,
time_index_array=time_index_array)
qout_df = pd.DataFrame(streamflow_array.T, index=time_array)
if pd_filter is not None:
qout_df = qout_df.resample(pd_filter)
if filter_mode == "mean":
qout_df = qout_df.mean()
elif filter_mode == "max":
qout_df = qout_df.max()
else:
raise Exception("Invalid filter_mode ...")
if as_dataframe:
return qout_df
try:
streamflow_array = qout_df.to_numpy().T
except:
streamflow_array = qout_df.as_matrix().T
if streamflow_array.ndim > 0 and streamflow_array.shape[0] == 1:
streamflow_array = streamflow_array[0]
Tools
Ruff

755-757: Use a single if statement instead of nested if statements

(SIM102)


756-757: Use a single if statement instead of nested if statements

(SIM102)


818-818: Do not use bare except

(E722)

Comment on lines +29 to +94
qout_same = False

d1 = RAPIDDataset(dataset1_path)
d2 = RAPIDDataset(dataset2_path)

if len(d1.get_river_id_array()) != len(d2.get_river_id_array()):
log("Length of COMID/rivid input not the same.",
"ERROR")

if not (d1.get_river_id_array() == d2.get_river_id_array()).all():
log("COMID/rivid order is different in each dataset."
" Reordering data for comparison.",
"WARNING")

d2_reordered_river_index_list = []
for rivid in d1.get_river_id_array():
reordered_index = np.where(d2.get_river_id_array() == rivid)[0][0]
d2_reordered_river_index_list.append(reordered_index)
d2_reordered_qout = d2.get_qout_index(d2_reordered_river_index_list)
else:
d2_reordered_qout = d2.get_qout()

# get where the files are different
d1_qout = d1.get_qout()
where_diff = np.where(d1_qout != d2_reordered_qout)
un_where_diff = np.unique(where_diff[0])

# if different, check to see how different
if un_where_diff.any():
decimal_test = 7
while decimal_test > 0:
try:
np.testing.assert_almost_equal(d1_qout,
d2_reordered_qout,
decimal=decimal_test)
log("ALMOST EQUAL to {0} decimal places.".format(decimal_test),
"INFO")
qout_same = True
decimal_test = -1
except AssertionError as ex:
if decimal_test <= 1:
log(ex, "WARNING")
decimal_test -= 1

log("Number of different timeseries: {0}".format(len(un_where_diff)),
"INFO")
log("COMID idexes where different: {0}".format(un_where_diff),
"INFO")
log("COMID idexes where different: {0}".format(un_where_diff),
"INFO")
index = un_where_diff[0]
log("Dataset 1 example. COMID index: "
"{0}".format(d1.get_qout_index(index)),
"INFO")
log("Dataset 2 example. COMID index: "
"{0}".format(d2_reordered_qout[index, :]),
"INFO")

else:
qout_same = True
log("Output Qout data is the same.",
"INFO")

d1.close()
d2.close()
return qout_same
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ensure proper resource handling with context managers.

The function should use context managers to handle the RAPIDDataset instances to ensure proper resource management.

with RAPIDDataset(dataset1_path) as d1, RAPIDDataset(dataset2_path) as d2:
    ...

Committable suggestion was skipped due to low confidence.

Comment on lines +893 to +1007
Datetime object with date of simulation to go back through the
years and get a running average to generate streamflow
initialization. Default is utcnow.


Example:

.. code:: python

from RAPIDpy.rapid import RAPID

rapid_manager = RAPID(
Qout_file='/output_mississippi-nfie/Qout_2000to2015.nc',
rapid_connect_file='/input_mississippi_nfie/rapid_connect.csv'
)

rapid_manager.generate_seasonal_intitialization(
qinit_file='/input_mississippi_nfie/Qinit_seasonal_avg.csv'
)
"""
if not self.Qout_file or not os.path.exists(self.Qout_file):
log("Missing Qout_file. "
"Please set before running this function ...",
"ERROR")

if not self.rapid_connect_file or not self.rapid_connect_file:
log("Missing rapid_connect file. "
"Please set before running this function ...",
"ERROR")

day_of_year = datetime_start_initialization.timetuple().tm_yday
min_day = day_of_year - 3
max_day = day_of_year + 3

with RAPIDDataset(self.Qout_file) as qout_hist_nc:
if not qout_hist_nc.is_time_variable_valid():
log("File must be CF 1.6 compliant "
"with valid time variable ...",
"ERROR")

log("Generating seasonal average qinit file from qout file ...",
"INFO")

log("Determining dates with streamflows of interest ...",
"INFO")

time_indices = []
for idx, ttt in enumerate(qout_hist_nc.get_time_array()):
var_time = gmtime(ttt)
compare_yday = var_time.tm_yday
# move day back one past because of leap year adds
# a day after feb 29 (day 60)
if isleap(var_time.tm_year) and compare_yday > 60:
compare_yday -= 1
# check if date within range of season
if min_day <= compare_yday < max_day:
time_indices.append(idx)

if not time_indices:
log("No time steps found within range ...",
"ERROR")

log("Extracting data ...",
"INFO")

streamflow_array = \
qout_hist_nc.get_qout(time_index_array=time_indices)

log("Reordering data...",
"INFO")
stream_id_array = np.loadtxt(self.rapid_connect_file,
ndmin=1, delimiter=",",
usecols=(0,), dtype=int)
init_flows_array = np.zeros(stream_id_array.size)
for riv_bas_index, riv_bas_id in enumerate(
qout_hist_nc.get_river_id_array()):
try:
data_index = np.where(stream_id_array == riv_bas_id)[0][0]
init_flows_array[data_index] = \
np.mean(streamflow_array[riv_bas_index])
except IndexError:
log('riv_bas_id {0} not found in connectivity list.'
.format(riv_bas_id),
"WARNING")

log("Writing to file ...",
"INFO")
if qinit_file.endswith(".csv"):
with open_csv(qinit_file, 'w') as qinit_out:
for init_flow in init_flows_array:
qinit_out.write('{}\n'.format(init_flow))
else:
with nc.Dataset(qinit_file, "w", format="NETCDF3_CLASSIC") as qinit_out:
qinit_out.createDimension('Time', 1)
qinit_out.createDimension('rivid', stream_id_array.size)
var_Qout = qinit_out.createVariable('Qout', 'f8', ('Time', 'rivid',))
var_Qout[:] = init_flows_array

log("Initialization Complete!",
"INFO")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seasonal initialization method looks good!

The generate_seasonal_intitialization method correctly creates a seasonal qinit file from a RAPID qout file, handling required parameters and data reordering appropriately.

Avoid performing function calls in argument defaults. Consider setting datetime_start_initialization within the function.

-            datetime_start_initialization=datetime.datetime.utcnow()
+            datetime_start_initialization=None
):
    if datetime_start_initialization is None:
        datetime_start_initialization = datetime.datetime.utcnow()
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def generate_seasonal_intitialization(
self,
qinit_file,
datetime_start_initialization=datetime.datetime.utcnow()
):
"""This creates a seasonal qinit file from a RAPID qout file. This
requires a simulation Qout file with a longer time period of record and
to be CF compliant. It takes the average of the current date +- 3 days
and goes back as far as possible.
Parameters
----------
qinit_file: str
Path to output qinit_file.
datetime_start_initialization: :obj:`datetime.datetime`, optional
Datetime object with date of simulation to go back through the
years and get a running average to generate streamflow
initialization. Default is utcnow.
Example:
.. code:: python
from RAPIDpy.rapid import RAPID
rapid_manager = RAPID(
Qout_file='/output_mississippi-nfie/Qout_2000to2015.nc',
rapid_connect_file='/input_mississippi_nfie/rapid_connect.csv'
)
rapid_manager.generate_seasonal_intitialization(
qinit_file='/input_mississippi_nfie/Qinit_seasonal_avg.csv'
)
"""
if not self.Qout_file or not os.path.exists(self.Qout_file):
log("Missing Qout_file. "
"Please set before running this function ...",
"ERROR")
if not self.rapid_connect_file or not self.rapid_connect_file:
log("Missing rapid_connect file. "
"Please set before running this function ...",
"ERROR")
day_of_year = datetime_start_initialization.timetuple().tm_yday
min_day = day_of_year - 3
max_day = day_of_year + 3
with RAPIDDataset(self.Qout_file) as qout_hist_nc:
if not qout_hist_nc.is_time_variable_valid():
log("File must be CF 1.6 compliant "
"with valid time variable ...",
"ERROR")
log("Generating seasonal average qinit file from qout file ...",
"INFO")
log("Determining dates with streamflows of interest ...",
"INFO")
time_indices = []
for idx, ttt in enumerate(qout_hist_nc.get_time_array()):
var_time = gmtime(ttt)
compare_yday = var_time.tm_yday
# move day back one past because of leap year adds
# a day after feb 29 (day 60)
if isleap(var_time.tm_year) and compare_yday > 60:
compare_yday -= 1
# check if date within range of season
if min_day <= compare_yday < max_day:
time_indices.append(idx)
if not time_indices:
log("No time steps found within range ...",
"ERROR")
log("Extracting data ...",
"INFO")
streamflow_array = \
qout_hist_nc.get_qout(time_index_array=time_indices)
log("Reordering data...",
"INFO")
stream_id_array = np.loadtxt(self.rapid_connect_file,
ndmin=1, delimiter=",",
usecols=(0,), dtype=int)
init_flows_array = np.zeros(stream_id_array.size)
for riv_bas_index, riv_bas_id in enumerate(
qout_hist_nc.get_river_id_array()):
try:
data_index = np.where(stream_id_array == riv_bas_id)[0][0]
init_flows_array[data_index] = \
np.mean(streamflow_array[riv_bas_index])
except IndexError:
log('riv_bas_id {0} not found in connectivity list.'
.format(riv_bas_id),
"WARNING")
log("Writing to file ...",
"INFO")
if qinit_file.endswith(".csv"):
with open_csv(qinit_file, 'w') as qinit_out:
for init_flow in init_flows_array:
qinit_out.write('{}\n'.format(init_flow))
else:
with nc.Dataset(qinit_file, "w", format="NETCDF3_CLASSIC") as qinit_out:
qinit_out.createDimension('Time', 1)
qinit_out.createDimension('rivid', stream_id_array.size)
var_Qout = qinit_out.createVariable('Qout', 'f8', ('Time', 'rivid',))
var_Qout[:] = init_flows_array
log("Initialization Complete!",
"INFO")
def generate_seasonal_intitialization(
self,
qinit_file,
datetime_start_initialization=None
):
if datetime_start_initialization is None:
datetime_start_initialization = datetime.datetime.utcnow()
"""This creates a seasonal qinit file from a RAPID qout file. This
requires a simulation Qout file with a longer time period of record and
to be CF compliant. It takes the average of the current date +- 3 days
and goes back as far as possible.
Parameters
----------
qinit_file: str
Path to output qinit_file.
datetime_start_initialization: :obj:`datetime.datetime`, optional
Datetime object with date of simulation to go back through the
years and get a running average to generate streamflow
initialization. Default is utcnow.
Example:
.. code:: python
from RAPIDpy.rapid import RAPID
rapid_manager = RAPID(
Qout_file='/output_mississippi-nfie/Qout_2000to2015.nc',
rapid_connect_file='/input_mississippi_nfie/rapid_connect.csv'
)
rapid_manager.generate_seasonal_intitialization(
qinit_file='/input_mississippi_nfie/Qinit_seasonal_avg.csv'
)
"""
if not self.Qout_file or not os.path.exists(self.Qout_file):
log("Missing Qout_file. "
"Please set before running this function ...",
"ERROR")
if not self.rapid_connect_file or not self.rapid_connect_file:
log("Missing rapid_connect file. "
"Please set before running this function ...",
"ERROR")
day_of_year = datetime_start_initialization.timetuple().tm_yday
min_day = day_of_year - 3
max_day = day_of_year + 3
with RAPIDDataset(self.Qout_file) as qout_hist_nc:
if not qout_hist_nc.is_time_variable_valid():
log("File must be CF 1.6 compliant "
"with valid time variable ...",
"ERROR")
log("Generating seasonal average qinit file from qout file ...",
"INFO")
log("Determining dates with streamflows of interest ...",
"INFO")
time_indices = []
for idx, ttt in enumerate(qout_hist_nc.get_time_array()):
var_time = gmtime(ttt)
compare_yday = var_time.tm_yday
# move day back one past because of leap year adds
# a day after feb 29 (day 60)
if isleap(var_time.tm_year) and compare_yday > 60:
compare_yday -= 1
# check if date within range of season
if min_day <= compare_yday < max_day:
time_indices.append(idx)
if not time_indices:
log("No time steps found within range ...",
"ERROR")
log("Extracting data ...",
"INFO")
streamflow_array = \
qout_hist_nc.get_qout(time_index_array=time_indices)
log("Reordering data...",
"INFO")
stream_id_array = np.loadtxt(self.rapid_connect_file,
ndmin=1, delimiter=",",
usecols=(0,), dtype=int)
init_flows_array = np.zeros(stream_id_array.size)
for riv_bas_index, riv_bas_id in enumerate(
qout_hist_nc.get_river_id_array()):
try:
data_index = np.where(stream_id_array == riv_bas_id)[0][0]
init_flows_array[data_index] = \
np.mean(streamflow_array[riv_bas_index])
except IndexError:
log('riv_bas_id {0} not found in connectivity list.'
.format(riv_bas_id),
"WARNING")
log("Writing to file ...",
"INFO")
if qinit_file.endswith(".csv"):
with open_csv(qinit_file, 'w') as qinit_out:
for init_flow in init_flows_array:
qinit_out.write('{}\n'.format(init_flow))
else:
with nc.Dataset(qinit_file, "w", format="NETCDF3_CLASSIC") as qinit_out:
qinit_out.createDimension('Time', 1)
qinit_out.createDimension('rivid', stream_id_array.size)
var_Qout = qinit_out.createVariable('Qout', 'f8', ('Time', 'rivid',))
var_Qout[:] = init_flows_array
log("Initialization Complete!",
"INFO")
Tools
Ruff

896-896: Do not perform function call datetime.datetime.utcnow in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)


Seasonal initialization method looks good!

The generate_seasonal_intitialization method correctly creates a seasonal qinit file from a RAPID qout file, handling required parameters and data reordering appropriately.

Avoid performing function calls in argument defaults. Consider setting datetime_start_initialization within the function.

-            datetime_start_initialization=datetime.datetime.utcnow()
+            datetime_start_initialization=None
):
    if datetime_start_initialization is None:
        datetime_start_initialization = datetime.datetime.utcnow()
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def generate_seasonal_intitialization(
self,
qinit_file,
datetime_start_initialization=datetime.datetime.utcnow()
):
"""This creates a seasonal qinit file from a RAPID qout file. This
requires a simulation Qout file with a longer time period of record and
to be CF compliant. It takes the average of the current date +- 3 days
and goes back as far as possible.
Parameters
----------
qinit_file: str
Path to output qinit_file.
datetime_start_initialization: :obj:`datetime.datetime`, optional
Datetime object with date of simulation to go back through the
years and get a running average to generate streamflow
initialization. Default is utcnow.
Example:
.. code:: python
from RAPIDpy.rapid import RAPID
rapid_manager = RAPID(
Qout_file='/output_mississippi-nfie/Qout_2000to2015.nc',
rapid_connect_file='/input_mississippi_nfie/rapid_connect.csv'
)
rapid_manager.generate_seasonal_intitialization(
qinit_file='/input_mississippi_nfie/Qinit_seasonal_avg.csv'
)
"""
if not self.Qout_file or not os.path.exists(self.Qout_file):
log("Missing Qout_file. "
"Please set before running this function ...",
"ERROR")
if not self.rapid_connect_file or not self.rapid_connect_file:
log("Missing rapid_connect file. "
"Please set before running this function ...",
"ERROR")
day_of_year = datetime_start_initialization.timetuple().tm_yday
min_day = day_of_year - 3
max_day = day_of_year + 3
with RAPIDDataset(self.Qout_file) as qout_hist_nc:
if not qout_hist_nc.is_time_variable_valid():
log("File must be CF 1.6 compliant "
"with valid time variable ...",
"ERROR")
log("Generating seasonal average qinit file from qout file ...",
"INFO")
log("Determining dates with streamflows of interest ...",
"INFO")
time_indices = []
for idx, ttt in enumerate(qout_hist_nc.get_time_array()):
var_time = gmtime(ttt)
compare_yday = var_time.tm_yday
# move day back one past because of leap year adds
# a day after feb 29 (day 60)
if isleap(var_time.tm_year) and compare_yday > 60:
compare_yday -= 1
# check if date within range of season
if min_day <= compare_yday < max_day:
time_indices.append(idx)
if not time_indices:
log("No time steps found within range ...",
"ERROR")
log("Extracting data ...",
"INFO")
streamflow_array = \
qout_hist_nc.get_qout(time_index_array=time_indices)
log("Reordering data...",
"INFO")
stream_id_array = np.loadtxt(self.rapid_connect_file,
ndmin=1, delimiter=",",
usecols=(0,), dtype=int)
init_flows_array = np.zeros(stream_id_array.size)
for riv_bas_index, riv_bas_id in enumerate(
qout_hist_nc.get_river_id_array()):
try:
data_index = np.where(stream_id_array == riv_bas_id)[0][0]
init_flows_array[data_index] = \
np.mean(streamflow_array[riv_bas_index])
except IndexError:
log('riv_bas_id {0} not found in connectivity list.'
.format(riv_bas_id),
"WARNING")
log("Writing to file ...",
"INFO")
if qinit_file.endswith(".csv"):
with open_csv(qinit_file, 'w') as qinit_out:
for init_flow in init_flows_array:
qinit_out.write('{}\n'.format(init_flow))
else:
with nc.Dataset(qinit_file, "w", format="NETCDF3_CLASSIC") as qinit_out:
qinit_out.createDimension('Time', 1)
qinit_out.createDimension('rivid', stream_id_array.size)
var_Qout = qinit_out.createVariable('Qout', 'f8', ('Time', 'rivid',))
var_Qout[:] = init_flows_array
log("Initialization Complete!",
"INFO")
def generate_seasonal_intitialization(
self,
qinit_file,
datetime_start_initialization=None
):
if datetime_start_initialization is None:
datetime_start_initialization = datetime.datetime.utcnow()
"""This creates a seasonal qinit file from a RAPID qout file. This
requires a simulation Qout file with a longer time period of record and
to be CF compliant. It takes the average of the current date +- 3 days
and goes back as far as possible.
Parameters
----------
qinit_file: str
Path to output qinit_file.
datetime_start_initialization: :obj:`datetime.datetime`, optional
Datetime object with date of simulation to go back through the
years and get a running average to generate streamflow
initialization. Default is utcnow.
Example:
.. code:: python
from RAPIDpy.rapid import RAPID
rapid_manager = RAPID(
Qout_file='/output_mississippi-nfie/Qout_2000to2015.nc',
rapid_connect_file='/input_mississippi_nfie/rapid_connect.csv'
)
rapid_manager.generate_seasonal_intitialization(
qinit_file='/input_mississippi_nfie/Qinit_seasonal_avg.csv'
)
"""
if not self.Qout_file or not os.path.exists(self.Qout_file):
log("Missing Qout_file. "
"Please set before running this function ...",
"ERROR")
if not self.rapid_connect_file or not self.rapid_connect_file:
log("Missing rapid_connect file. "
"Please set before running this function ...",
"ERROR")
day_of_year = datetime_start_initialization.timetuple().tm_yday
min_day = day_of_year - 3
max_day = day_of_year + 3
with RAPIDDataset(self.Qout_file) as qout_hist_nc:
if not qout_hist_nc.is_time_variable_valid():
log("File must be CF 1.6 compliant "
"with valid time variable ...",
"ERROR")
log("Generating seasonal average qinit file from qout file ...",
"INFO")
log("Determining dates with streamflows of interest ...",
"INFO")
time_indices = []
for idx, ttt in enumerate(qout_hist_nc.get_time_array()):
var_time = gmtime(ttt)
compare_yday = var_time.tm_yday
# move day back one past because of leap year adds
# a day after feb 29 (day 60)
if isleap(var_time.tm_year) and compare_yday > 60:
compare_yday -= 1
# check if date within range of season
if min_day <= compare_yday < max_day:
time_indices.append(idx)
if not time_indices:
log("No time steps found within range ...",
"ERROR")
log("Extracting data ...",
"INFO")
streamflow_array = \
qout_hist_nc.get_qout(time_index_array=time_indices)
log("Reordering data...",
"INFO")
stream_id_array = np.loadtxt(self.rapid_connect_file,
ndmin=1, delimiter=",",
usecols=(0,), dtype=int)
init_flows_array = np.zeros(stream_id_array.size)
for riv_bas_index, riv_bas_id in enumerate(
qout_hist_nc.get_river_id_array()):
try:
data_index = np.where(stream_id_array == riv_bas_id)[0][0]
init_flows_array[data_index] = \
np.mean(streamflow_array[riv_bas_index])
except IndexError:
log('riv_bas_id {0} not found in connectivity list.'
.format(riv_bas_id),
"WARNING")
log("Writing to file ...",
"INFO")
if qinit_file.endswith(".csv"):
with open_csv(qinit_file, 'w') as qinit_out:
for init_flow in init_flows_array:
qinit_out.write('{}\n'.format(init_flow))
else:
with nc.Dataset(qinit_file, "w", format="NETCDF3_CLASSIC") as qinit_out:
qinit_out.createDimension('Time', 1)
qinit_out.createDimension('rivid', stream_id_array.size)
var_Qout = qinit_out.createVariable('Qout', 'f8', ('Time', 'rivid',))
var_Qout[:] = init_flows_array
log("Initialization Complete!",
"INFO")
Tools
Ruff

896-896: Do not perform function call datetime.datetime.utcnow in argument defaults; instead, perform the call within the function, or read the default from a module-level singleton variable

(B008)

Comment on lines +294 to +413
If true, it converts the data to a list of datetime objects.
Default is False.
time_index_array: list or :obj:`numpy.array`, optional
This is used to extract the datetime values by index from the main
list. This can be from the *get_time_index_range* function.

Returns
-------
list:
An array of integers representing seconds since Jan 1, 1970 UTC
or datetime objects if *return_datetime* is set to True.

These examples demonstrates how to retrieve or generate a time array
to go along with your RAPID streamflow series.


CF-Compliant Qout File Example:

.. code:: python

from RAPIDpy import RAPIDDataset

path_to_rapid_qout = '/path/to/Qout.nc'
with RAPIDDataset(path_to_rapid_qout) as qout_nc:
#retrieve integer timestamp array
time_array = qout_nc.get_time_array()

#or, to get datetime array
time_datetime = qout_nc.get_time_array(return_datetime=True)


Legacy Qout File Example:

.. code:: python

from RAPIDpy import RAPIDDataset

path_to_rapid_qout = '/path/to/Qout.nc'
with RAPIDDataset(path_to_rapid_qout,
datetime_simulation_start=datetime(1980, 1, 1),
simulation_time_step_seconds=3 * 3600)\
as qout_nc:

#retrieve integer timestamp array
time_array = qout_nc.get_time_array()

#or, to get datetime array
time_datetime = qout_nc.get_time_array(return_datetime=True)

"""
# Original Qout file
if datetime_simulation_start is not None:
self.datetime_simulation_start = datetime_simulation_start
if simulation_time_step_seconds is not None:
self.simulation_time_step_seconds = simulation_time_step_seconds

epoch = datetime.datetime(1970, 1, 1, tzinfo=utc)
time_units = "seconds since {0}".format(epoch)

# CF-1.6 compliant file
if self.is_time_variable_valid():
time_array = self.qout_nc.variables['time'][:]
if self.qout_nc.variables['time'].units:
time_units = self.qout_nc.variables['time'].units

# Original Qout file
elif self._is_legacy_time_valid():
initial_time_seconds = ((self.datetime_simulation_start
.replace(tzinfo=utc) - epoch)
.total_seconds() +
self.simulation_time_step_seconds)
final_time_seconds = (initial_time_seconds +
self.size_time *
self.simulation_time_step_seconds)
time_array = np.arange(initial_time_seconds,
final_time_seconds,
self.simulation_time_step_seconds)
else:
raise ValueError("This file does not contain the time"
" variable. To get time array, add"
" datetime_simulation_start and"
" simulation_time_step_seconds")

if time_index_array is not None:
time_array = time_array[time_index_array]

if return_datetime:
try:
# only_use_cftime_datetime=True is default after cftime PR #135
time_array = num2date(time_array, time_units, only_use_cftime_datetimes=False)
except:
time_array = num2date(time_array, time_units)

if self.out_tzinfo is not None:
for i in range(len(time_array)):
# convert time to output timezone
time_array[i] = utc.localize(time_array[i]) \
.astimezone(self.out_tzinfo) \
.replace(tzinfo=None)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Avoid bare except and add type hints.

The bare except statement should be replaced with specific exceptions, and type hints should be added for better clarity.

def get_time_array(self,
                   datetime_simulation_start: Optional[datetime.datetime] = None,
                   simulation_time_step_seconds: Optional[int] = None,
                   return_datetime: bool = False,
                   time_index_array: Optional[List[int]] = None) -> List[Union[int, datetime.datetime]]:
    ...
    try:
        # only_use_cftime_datetime=True is default after cftime PR #135
        time_array = num2date(time_array, time_units, only_use_cftime_datetimes=False)
    except Exception as e:
        time_array = num2date(time_array, time_units)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def get_time_array(self,
datetime_simulation_start=None,
simulation_time_step_seconds=None,
return_datetime=False,
time_index_array=None):
"""
This method extracts or generates an array of time.
The new version of RAPID output has the time array stored.
However, the old version requires the user to know when the
simulation began and the time step of the output.
Parameters
----------
datetime_simulation_start: :obj:`datetime.datetime`, optional
The start datetime of the simulation. Only required if the time
variable is not included in the file.
simulation_time_step_seconds: int, optional
The time step of the file in seconds. Only required if the time
variable is not included in the file.
return_datetime: bool, optional
If true, it converts the data to a list of datetime objects.
Default is False.
time_index_array: list or :obj:`numpy.array`, optional
This is used to extract the datetime values by index from the main
list. This can be from the *get_time_index_range* function.
Returns
-------
list:
An array of integers representing seconds since Jan 1, 1970 UTC
or datetime objects if *return_datetime* is set to True.
These examples demonstrates how to retrieve or generate a time array
to go along with your RAPID streamflow series.
CF-Compliant Qout File Example:
.. code:: python
from RAPIDpy import RAPIDDataset
path_to_rapid_qout = '/path/to/Qout.nc'
with RAPIDDataset(path_to_rapid_qout) as qout_nc:
#retrieve integer timestamp array
time_array = qout_nc.get_time_array()
#or, to get datetime array
time_datetime = qout_nc.get_time_array(return_datetime=True)
Legacy Qout File Example:
.. code:: python
from RAPIDpy import RAPIDDataset
path_to_rapid_qout = '/path/to/Qout.nc'
with RAPIDDataset(path_to_rapid_qout,
datetime_simulation_start=datetime(1980, 1, 1),
simulation_time_step_seconds=3 * 3600)\
as qout_nc:
#retrieve integer timestamp array
time_array = qout_nc.get_time_array()
#or, to get datetime array
time_datetime = qout_nc.get_time_array(return_datetime=True)
"""
# Original Qout file
if datetime_simulation_start is not None:
self.datetime_simulation_start = datetime_simulation_start
if simulation_time_step_seconds is not None:
self.simulation_time_step_seconds = simulation_time_step_seconds
epoch = datetime.datetime(1970, 1, 1, tzinfo=utc)
time_units = "seconds since {0}".format(epoch)
# CF-1.6 compliant file
if self.is_time_variable_valid():
time_array = self.qout_nc.variables['time'][:]
if self.qout_nc.variables['time'].units:
time_units = self.qout_nc.variables['time'].units
# Original Qout file
elif self._is_legacy_time_valid():
initial_time_seconds = ((self.datetime_simulation_start
.replace(tzinfo=utc) - epoch)
.total_seconds() +
self.simulation_time_step_seconds)
final_time_seconds = (initial_time_seconds +
self.size_time *
self.simulation_time_step_seconds)
time_array = np.arange(initial_time_seconds,
final_time_seconds,
self.simulation_time_step_seconds)
else:
raise ValueError("This file does not contain the time"
" variable. To get time array, add"
" datetime_simulation_start and"
" simulation_time_step_seconds")
if time_index_array is not None:
time_array = time_array[time_index_array]
if return_datetime:
try:
# only_use_cftime_datetime=True is default after cftime PR #135
time_array = num2date(time_array, time_units, only_use_cftime_datetimes=False)
except:
time_array = num2date(time_array, time_units)
if self.out_tzinfo is not None:
for i in range(len(time_array)):
# convert time to output timezone
time_array[i] = utc.localize(time_array[i]) \
.astimezone(self.out_tzinfo) \
.replace(tzinfo=None)
def get_time_array(self,
datetime_simulation_start: Optional[datetime.datetime] = None,
simulation_time_step_seconds: Optional[int] = None,
return_datetime: bool = False,
time_index_array: Optional[List[int]] = None) -> List[Union[int, datetime.datetime]]:
"""
This method extracts or generates an array of time.
The new version of RAPID output has the time array stored.
However, the old version requires the user to know when the
simulation began and the time step of the output.
Parameters
----------
datetime_simulation_start: :obj:`datetime.datetime`, optional
The start datetime of the simulation. Only required if the time
variable is not included in the file.
simulation_time_step_seconds: int, optional
The time step of the file in seconds. Only required if the time
variable is not included in the file.
return_datetime: bool, optional
If true, it converts the data to a list of datetime objects.
Default is False.
time_index_array: list or :obj:`numpy.array`, optional
This is used to extract the datetime values by index from the main
list. This can be from the *get_time_index_range* function.
Returns
-------
list:
An array of integers representing seconds since Jan 1, 1970 UTC
or datetime objects if *return_datetime* is set to True.
These examples demonstrates how to retrieve or generate a time array
to go along with your RAPID streamflow series.
CF-Compliant Qout File Example:
.. code:: python
from RAPIDpy import RAPIDDataset
path_to_rapid_qout = '/path/to/Qout.nc'
with RAPIDDataset(path_to_rapid_qout) as qout_nc:
#retrieve integer timestamp array
time_array = qout_nc.get_time_array()
#or, to get datetime array
time_datetime = qout_nc.get_time_array(return_datetime=True)
Legacy Qout File Example:
.. code:: python
from RAPIDpy import RAPIDDataset
path_to_rapid_qout = '/path/to/Qout.nc'
with RAPIDDataset(path_to_rapid_qout,
datetime_simulation_start=datetime(1980, 1, 1),
simulation_time_step_seconds=3 * 3600)\
as qout_nc:
#retrieve integer timestamp array
time_array = qout_nc.get_time_array()
#or, to get datetime array
time_datetime = qout_nc.get_time_array(return_datetime=True)
"""
# Original Qout file
if datetime_simulation_start is not None:
self.datetime_simulation_start = datetime_simulation_start
if simulation_time_step_seconds is not None:
self.simulation_time_step_seconds = simulation_time_step_seconds
epoch = datetime.datetime(1970, 1, 1, tzinfo=utc)
time_units = "seconds since {0}".format(epoch)
# CF-1.6 compliant file
if self.is_time_variable_valid():
time_array = self.qout_nc.variables['time'][:]
if self.qout_nc.variables['time'].units:
time_units = self.qout_nc.variables['time'].units
# Original Qout file
elif self._is_legacy_time_valid():
initial_time_seconds = ((self.datetime_simulation_start
.replace(tzinfo=utc) - epoch)
.total_seconds() +
self.simulation_time_step_seconds)
final_time_seconds = (initial_time_seconds +
self.size_time *
self.simulation_time_step_seconds)
time_array = np.arange(initial_time_seconds,
final_time_seconds,
self.simulation_time_step_seconds)
else:
raise ValueError("This file does not contain the time"
" variable. To get time array, add"
" datetime_simulation_start and"
" simulation_time_step_seconds")
if time_index_array is not None:
time_array = time_array[time_index_array]
if return_datetime:
try:
# only_use_cftime_datetime=True is default after cftime PR #135
time_array = num2date(time_array, time_units, only_use_cftime_datetimes=False)
except Exception as e:
time_array = num2date(time_array, time_units)
if self.out_tzinfo is not None:
for i in range(len(time_array)):
# convert time to output timezone
time_array[i] = utc.localize(time_array[i]) \
.astimezone(self.out_tzinfo) \
.replace(tzinfo=None)
Tools
Ruff

404-404: Do not use bare except

(E722)

@msouff msouff merged commit f53b78d into main Jul 10, 2024
@rileyhales rileyhales deleted the feature/remove-rapidpy branch July 10, 2024 15:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants