Skip to content

Commit

Permalink
Merge pull request #21 from geoglows/update_initialization
Browse files Browse the repository at this point in the history
removed RAPIDpy from initialization
  • Loading branch information
msouff authored Jul 12, 2024
2 parents c6f67cd + 97566fd commit 523b105
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 248 deletions.
311 changes: 64 additions & 247 deletions geoglows_ecflow/resources/compute_init_flows.py
Original file line number Diff line number Diff line change
@@ -1,253 +1,8 @@
import argparse
import json
import os
from glob import glob

import netCDF4 as nc
import numpy as np
from .RAPIDpy.dataset import RAPIDDataset
from .RAPIDpy.helper_functions import csv_to_list

from geoglows_ecflow.resources.helper_functions import (
create_logger,
find_current_rapid_output,
)


class StreamSegment(object):
def __init__(
self,
stream_id,
down_id,
up_id_array,
init_flow=0,
station=None,
station_flow=None,
station_distance=None,
natural_flow=None,
):
self.stream_id = stream_id
self.down_id = down_id # downstream segment id
self.up_id_array = (
up_id_array # array of atream ids for upstream segments
)
self.init_flow = init_flow
self.station = station
self.station_flow = station_flow
self.station_distance = (
station_distance # number of tream segments to station
)
self.natural_flow = natural_flow


class StreamNetworkInitializer(object):
def __init__(self, connectivity_file, gage_ids_natur_flow_file=None):
# files
self.connectivity_file = connectivity_file
self.gage_ids_natur_flow_file = gage_ids_natur_flow_file
# variables
self.stream_segments = []
self.outlet_id_list = []
self.stream_undex_with_usgs_station = []
self.stream_id_array = None

# generate the network
self._generate_network_from_connectivity()

# add gage id and natur flow to network
if gage_ids_natur_flow_file is not None:
if (
os.path.exists(gage_ids_natur_flow_file)
and gage_ids_natur_flow_file
):
self._add_gage_ids_natur_flow_to_network()

def compute_init_flows_from_past_forecast(
self, forecasted_streamflow_files
):
"""
Compute initial flows from the past ECMWF forecast ensemble
"""
if forecasted_streamflow_files:
# get list of COMIDS
print("Computing initial flows from the past ECMWF forecast ...")
with RAPIDDataset(forecasted_streamflow_files[0]) as qout_nc:
(
comid_index_list,
reordered_comid_list,
ignored_comid_list,
) = qout_nc.get_subset_riverid_index_list(self.stream_id_array)
print("Extracting data ...")
# instead of making a zeros array, make an array full of nans of the same shape
reach_prediciton_array = np.full(
(
len(self.stream_id_array),
len(forecasted_streamflow_files),
1,
),
np.nan,
)
# get information from datasets
for file_index, forecasted_streamflow_file in enumerate(
forecasted_streamflow_files
):
try:
ensemble_index = int(
os.path.basename(forecasted_streamflow_file)
.split(".")[0]
.split("_")[-1]
)
if ensemble_index == 52:
continue
try:
# Get hydrograph data from ECMWF Ensemble
with RAPIDDataset(forecasted_streamflow_file) as predicted_qout_nc:
data_values_2d_array = (
predicted_qout_nc
.get_qout_index(comid_index_list, time_index=8)
)
except Exception:
print("Invalid ECMWF forecast file {0}".format(forecasted_streamflow_file))
continue
# organize the data
for comid_index, _ in enumerate(reordered_comid_list):
reach_prediciton_array[comid_index][file_index] = data_values_2d_array[comid_index]
except Exception as e:
print(e)

print("Analyzing data ...")
for index in range(len(self.stream_segments)):
try:
# get where comids are in netcdf file
data_index = np.where(reordered_comid_list == self.stream_segments[index].stream_id)[0][0]
self.stream_segments[index].init_flow = np.nanmean(reach_prediciton_array[data_index])
except Exception:
# stream id not found in list. Adding zero init flow ...
self.stream_segments[index].init_flow = 0
pass
continue

print("Initialization Complete!")

def write_init_flow_file(self, out_file, log=create_logger(__name__)):
"""
Write initial flow file
"""
print("Writing to initial flow file: {0}".format(out_file))
if out_file.endswith(".csv"):
with open(out_file, "w") as init_flow_file:
for stream_index, stream_segment in enumerate(self.stream_segments):
if stream_segment.station_flow is not None:
init_flow_file.write("{}\n".format(stream_segment.station_flow))
else:
init_flow_file.write("{}\n".format(stream_segment.init_flow))
else:
init_flows_array = np.zeros(len(self.stream_segments))
for stream_index, stream_segment in enumerate(self.stream_segments):
try:
if stream_segment.station_flow is not None:
init_flows_array[stream_index] = stream_segment.station_flow
else:
init_flows_array[stream_index] = stream_segment.init_flow
except IndexError:
log.warning("Stream index not found.")
with nc.Dataset(out_file, "w", format="NETCDF3_CLASSIC") as init_flow_file:
init_flow_file.createDimension("Time", 1)
init_flow_file.createDimension("rivid", len(self.stream_segments))
var_Qout = init_flow_file.createVariable("Qout", "f8", ("Time", "rivid",), )
var_Qout[:] = init_flows_array

def _generate_network_from_connectivity(self):
"""
Generate river network from connectivity file
"""
print("Generating river network from connectivity file ...")
connectivity_table = csv_to_list(self.connectivity_file)
self.stream_id_array = np.array([row[0] for row in connectivity_table], dtype=np.int64)
# add each stream segment to network
for connectivity_info in connectivity_table:
stream_id = int(connectivity_info[0])
downstream_id = int(connectivity_info[1])
# add outlet to list of outlets if downstream id is zero
if downstream_id == 0:
self.outlet_id_list.append(stream_id)

self.stream_segments.append(
StreamSegment(
stream_id=stream_id,
down_id=downstream_id,
up_id_array=connectivity_info[2: 2 + int(connectivity_info[2])],
)
)


def _cleanup_past_qinit(input_directory):
"""
Removes past qinit files.
:param input_directory:
:return:
"""
past_init_flow_files = glob(os.path.join(input_directory, "Qinit_*"))
for past_init_flow_file in past_init_flow_files:
try:
os.remove(past_init_flow_file)
except Exception:
pass


def compute_init_rapid_flows(
prediction_files: list, input_directory: str, date: str
) -> None:
"""Gets mean of all 52 ensembles for the next day and prints to netcdf (nc)
format as initial flow Qinit_file (BS_opt_Qinit). The
assumptions are that Qinit_file is ordered the same way as
rapid_connect_file if subset of list, add zero where there is no flow.
Args:
prediction_files (list): List of paths to RAPID output files for a
specific VPU for each ensemble.
input_directory (str): Path to rapid input directory for specific VPU.
date (str): Date of the forecast in YYYYMMDDHH format.
"""
# remove old init files for this basin
_cleanup_past_qinit(input_directory)
init_file_location = os.path.join(input_directory, "Qinit_%s.nc" % date)
# check to see if exists and only perform operation once
if prediction_files:
sni = StreamNetworkInitializer(
connectivity_file=os.path.join(
input_directory, "rapid_connect.csv"
)
)
sni.compute_init_flows_from_past_forecast(prediction_files)
sni.write_init_flow_file(init_file_location)
else:
print("No current forecasts found. Skipping ...")


def compute_all_rapid_init_flows(workspace: str, vpu: str) -> None:
"""Computes initial flows for all basins in the vpu.
Args:
workspace (str): Path to rapid_run.json base directory.
"""
with open(os.path.join(workspace, "rapid_run.json"), "r") as f:
data = json.load(f)
rapid_input = data["input_dir"]
rapid_output = data["output_dir"]
date = data["date"]

# Initialize flows for next run
rapid_vpu_input_dir = os.path.join(rapid_input, vpu)

basin_files = find_current_rapid_output(rapid_output, vpu)

try:
compute_init_rapid_flows(basin_files, rapid_vpu_input_dir, date)
except Exception as ex:
print(ex)
pass
import xarray as xr


if __name__ == "__main__":
Expand All @@ -267,4 +22,66 @@ def compute_all_rapid_init_flows(workspace: str, vpu: str) -> None:
workspace = args.workspace[0]
vpu = args.vpu[0]

compute_all_rapid_init_flows(workspace, vpu)
ymd = os.getenv('YMD', None)
output_file = os.path.join(workspace, 'input', vpu, f'Qinit_{ymd}.nc')

with xr.open_dataset(os.path.join(workspace, "output", f"nces_avg_{vpu}.nc")) as average_flows:
qinit_values = average_flows.Qout[7, :]
river_ids = average_flows.rivid[:]
init_date = average_flows.time[7].strftime('%Y%M%d %X')

with nc.Dataset(output_file, "w", format="NETCDF3_CLASSIC") as qinit_nc:
# create dimensions
qinit_nc.createDimension("time", 1)
qinit_nc.createDimension("rivid", river_ids.shape[0])

qout_var = qinit_nc.createVariable("Qout", "f8", ("time", "rivid"))
qout_var[:] = qinit_values
qout_var.long_name = "instantaneous river water discharge downstream of each river reach"
qout_var.units = "m3 s-1"
qout_var.coordinates = "lon lat"
qout_var.grid_mapping = "crs"
qout_var.cell_methods = "time: point"

# rivid
rivid_var = qinit_nc.createVariable("rivid", "i4", ("rivid",))
rivid_var[:] = river_ids
rivid_var.long_name = "unique identifier for each river reach"
rivid_var.units = "1"
rivid_var.cf_role = "timeseries_id"

# time
time_var = qinit_nc.createVariable("time", "i4", ("time",))
time_var[:] = 0
time_var.long_name = "time"
time_var.standard_name = "time"
time_var.units = f'seconds since {init_date}' # Must be seconds
time_var.axis = "T"
time_var.calendar = "gregorian"

# longitude
lon_var = qinit_nc.createVariable("lon", "f8", ("rivid",))
lon_var[:] = 0
lon_var.long_name = "longitude of a point related to each river reach"
lon_var.standard_name = "longitude"
lon_var.units = "degrees_east"
lon_var.axis = "X"

# latitude
lat_var = qinit_nc.createVariable("lat", "f8", ("rivid",))
lat_var[:] = 0
lat_var.long_name = "latitude of a point related to each river reach"
lat_var.standard_name = "latitude"
lat_var.units = "degrees_north"
lat_var.axis = "Y"

# crs
crs_var = qinit_nc.createVariable("crs", "i4")
crs_var.grid_mapping_name = "latitude_longitude"
crs_var.epsg_code = "EPSG:4326" # WGS 84
crs_var.semi_major_axis = 6378137.0
crs_var.inverse_flattening = 298.257223563

# add global attributes
qinit_nc.Conventions = "CF-1.6"
qinit_nc.featureType = "timeSeries"
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "geoglows_ecflow"
version = "3.0.0"
version = "3.0.2"
description = "ECFLOW RAPID workflow for GEOGloWS"
authors = [
{ name = "Michael Souffront", email = "msouffront@aquaveo.com" },
Expand Down

0 comments on commit 523b105

Please sign in to comment.