Skip to content

Commit

Permalink
Handle timesteps (#11)
Browse files Browse the repository at this point in the history
  • Loading branch information
rileyhales authored Nov 14, 2023
1 parent ce6a394 commit 5bc3eb3
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 136 deletions.
2 changes: 1 addition & 1 deletion basininflow/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from basininflow.inflow import create_inflow_file

__version__ = '0.5.0'
__version__ = '0.10.0'
__all__ = ['create_inflow_file', ]
27 changes: 20 additions & 7 deletions basininflow/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,23 @@ def main():
parser = argparse.ArgumentParser(description='Create inflow file for LSM files and input directory.')

parser.add_argument('--lsmdata', type=str,
help='Path to directory of LSM NC files, a single NC file, or a glob pattern for NC files')
help='Path to directory of LSM NC files, a single NC file, or a glob pattern for NC files',
required=True)
parser.add_argument('--inputdir', type=str,
help='Input directory path for a specific VPU which contains weight tables')
help='Input directory path for a specific VPU which contains weight tables',
required=True)
parser.add_argument('--inflowdir', type=str,
help='Inflow directory path for a specific VPU where m3 runoff inflows NC files are saved')
help='Inflow directory path for a specific VPU where m3 runoff inflows NC files are saved',
required=True)
parser.add_argument('--timestep', type=int, default=3,
help='Desired time step in hours. Default is 3 hours')
help='Desired time step in hours. Default is 3 hours',
required=False)
parser.add_argument('--cumulative', action='store_true', default=False,
help='A boolean flag to mark if the runoff is cumulative. Inflows should be incremental')
help='A boolean flag to mark if the runoff is cumulative. Inflows should be incremental',
required=False)
parser.add_argument('--file_label', type=str, default=None,
help='A string to include in the inflow file name. Default is None')
help='A string to include in the inflow file name. Default is None',
required=False)

args = parser.parse_args()

Expand All @@ -47,5 +53,12 @@ def gen(parser, args):
return

# Create the inflow file for each LSM file
create_inflow_file(lsm_data, input_dir, inflow_dir, timestep=timestep, cumulative=cumulative)
create_inflow_file(
lsm_data,
input_dir,
inflow_dir,
timestep=timestep,
cumulative=cumulative,
file_label=file_label
)
return
243 changes: 121 additions & 122 deletions basininflow/inflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,146 +77,145 @@ def create_inflow_file(lsm_data: str,
vpu_name = os.path.basename(input_dir)

# open all the ncs and select only the area within the weight table
logging.info('Opening LSM files multi-file dataset')
if os.path.isdir(lsm_data):
if type(lsm_data) == list:
... # this is correct, a list of files is allowed
elif os.path.isdir(lsm_data):
lsm_data = os.path.join(lsm_data, '*.nc*')
elif os.path.isfile(lsm_data):
... # this is correct, a single file is allowed
elif '*' in lsm_data:
... # this is correct, xarray will interpret the glob sting independently
elif not os.path.exists(lsm_data) and '*' not in lsm_data:
raise FileNotFoundError(f'{lsm_data} does not exist and is not a glob pattern')
lsm_dataset = xr.open_mfdataset(lsm_data)

# Select the variable names
runoff_variable = [x for x in ['ro', 'RO', 'runoff', 'RUNOFF'] if x in lsm_dataset.variables][0]
lon_variable = [x for x in ['lon', 'longitude', 'LONGITUDE', 'LON'] if x in lsm_dataset.variables][0]
lat_variable = [x for x in ['lat', 'latitude', 'LATITUDE', 'LAT'] if x in lsm_dataset.variables][0]

# Check that the input table dimensions match the dataset dimensions
# This gets us the shape, while ignoring the time dimension
variable_dims = lsm_dataset[runoff_variable].dims
dataset_shape = [lsm_dataset[runoff_variable].shape[variable_dims.index(lat_variable)],
lsm_dataset[runoff_variable].shape[variable_dims.index(lon_variable)]]

if weight_table is None:
# find a file in the input_dir which contains f"weight*{dataset_shape[0]}x{dataset_shape[1]}.csv"
weight_table = glob.glob(os.path.join(input_dir, f'weight*{dataset_shape[0]}x{dataset_shape[1]}.csv'))
if not len(weight_table):
raise FileNotFoundError(f'Could not find a weight table in {input_dir} with shape {dataset_shape}')
weight_table = weight_table[0]
logging.info(f'Using weight table: {weight_table}')
# check that the grid dimensions are found in the weight table filename
matches = re.findall(r'(\d+)x(\d+)', weight_table)[0]
if len(matches) != 2:
raise ValueError(f"Could not validate the grid shape and weight table. Grid shape not found in filename")
if not (len(matches) == 2 and all(int(item) in dataset_shape for item in matches)):
raise ValueError(f"{weight_table} dimensions don't match the input dataset shape: {dataset_shape}")

# load in weight table and get some information
logging.info(f'Using weight table: {weight_table}')
logging.info(f'Using comid_lat_lon_z: {comid_lat_lon_z}')
weight_df = pd.read_csv(weight_table)
comid_df = pd.read_csv(comid_lat_lon_z)

sorted_rivid_array = comid_df.iloc[:, 0].to_numpy()

min_lon = weight_df['lon'].min()
max_lon = weight_df['lon'].max()
min_lat = weight_df['lat'].min()
max_lat = weight_df['lat'].max()

# for readability, select certain cols from the weight table
n_wt_rows = weight_df.shape[0]
stream_ids = weight_df.iloc[:, 0].to_numpy()
lat_indices = weight_df['lat_index'].values # - min_lat_idx
lon_indices = weight_df['lon_index'].values # - min_lon_idx

ds = (
lsm_dataset
[runoff_variable]
)

# Get approximate sizes of arrays and check if we have enough memory
out_array_size = ds['time'].shape[0] * sorted_rivid_array.shape[0]
in_array_size = ds['time'].shape[0] * n_wt_rows
if ds.ndim == 4:
in_array_size *= 2
total_size = out_array_size + in_array_size
_memory_check(total_size)

# Get conversion factor
conversion_factor = 1
units = ds.attrs.get('units', False)
if not units:
logging.warning("No units attribute found. Assuming meters")
elif ds.attrs['units'] == 'm':

logging.info(f'Opening LSM files {lsm_data[0] if type(lsm_data) == list else lsm_data}')
with xr.open_mfdataset(lsm_data) as ds:
# Select the variable names
runoff_variable = [x for x in ['ro', 'RO', 'runoff', 'RUNOFF'] if x in ds.variables][0]
lon_variable = [x for x in ['lon', 'longitude', 'LONGITUDE', 'LON'] if x in ds.variables][0]
lat_variable = [x for x in ['lat', 'latitude', 'LATITUDE', 'LAT'] if x in ds.variables][0]

# Check that the input table dimensions match the dataset dimensions
# This gets us the shape, while ignoring the time dimension
variable_dims = ds[runoff_variable].dims
dataset_shape = [ds[runoff_variable].shape[variable_dims.index(lat_variable)],
ds[runoff_variable].shape[variable_dims.index(lon_variable)]]

if weight_table is None:
# find a file in the input_dir which contains f"weight*{dataset_shape[0]}x{dataset_shape[1]}.csv"
weight_table = glob.glob(os.path.join(input_dir, f'weight*{dataset_shape[0]}x{dataset_shape[1]}.csv'))
if not len(weight_table):
raise FileNotFoundError(f'Could not find a weight table in {input_dir} with shape {dataset_shape}')
weight_table = weight_table[0]
logging.info(f'Using weight table: {weight_table}')
# check that the grid dimensions are found in the weight table filename
matches = re.findall(r'(\d+)x(\d+)', weight_table)[0]
if len(matches) != 2:
raise ValueError(f"Could not validate the grid shape and weight table. Grid shape not found in filename")
if not (len(matches) == 2 and all(int(item) in dataset_shape for item in matches)):
raise ValueError(f"{weight_table} dimensions don't match the input dataset shape: {dataset_shape}")

# load in weight table and get some information
logging.debug(f'Using weight table: {weight_table}')
logging.debug(f'Using comid_lat_lon_z: {comid_lat_lon_z}')
weight_df = pd.read_csv(weight_table)
comid_df = pd.read_csv(comid_lat_lon_z)

sorted_rivid_array = comid_df.iloc[:, 0].to_numpy()

min_lon = weight_df['lon'].min()
max_lon = weight_df['lon'].max()
min_lat = weight_df['lat'].min()
max_lat = weight_df['lat'].max()

# for readability, select certain cols from the weight table
n_wt_rows = weight_df.shape[0]
stream_ids = weight_df.iloc[:, 0].to_numpy()
lat_indices = weight_df['lat_index'].values # - min_lat_idx
lon_indices = weight_df['lon_index'].values # - min_lon_idx

ds = ds[runoff_variable]

# Get approximate sizes of arrays and check if we have enough memory
out_array_size = ds['time'].shape[0] * sorted_rivid_array.shape[0]
in_array_size = ds['time'].shape[0] * n_wt_rows
if ds.ndim == 4:
in_array_size *= 2
total_size = out_array_size + in_array_size
_memory_check(total_size)

# Get conversion factor
conversion_factor = 1
elif ds.attrs['units'] == 'mm':
conversion_factor = .001
else:
raise ValueError(f"Unknown units: {ds.attrs['units']}")

# get the time array from the dataset
logging.info('Reading Time values')
datetime_array = ds['time'].to_numpy()

logging.info('Creating inflow array')
if ds.ndim == 3:
inflow_array = ds.values[:, lat_indices, lon_indices]
elif ds.ndim == 4:
inflow_array = ds.values[:, :, lat_indices, lon_indices]
inflow_array = np.where(np.isnan(inflow_array[:, 0, :]), inflow_array[:, 1, :], inflow_array[:, 0, :]),
else:
raise ValueError(f"Unknown number of dimensions: {ds.ndim}")
units = ds.attrs.get('units', False)
if not units:
logging.warning("No units attribute found. Assuming meters")
elif ds.attrs['units'] == 'm':
conversion_factor = 1
elif ds.attrs['units'] == 'mm':
conversion_factor = .001
else:
raise ValueError(f"Unknown units: {ds.attrs['units']}")

# get the time array from the dataset
logging.info('Reading Time values')
datetime_array = ds['time'].to_numpy()

logging.info('Reading Runoff values')
if ds.ndim == 3:
inflow_df = ds.values[:, lat_indices, lon_indices]
elif ds.ndim == 4:
inflow_df = ds.values[:, :, lat_indices, lon_indices]
inflow_df = np.where(np.isnan(inflow_df[:, 0, :]), inflow_df[:, 1, :], inflow_df[:, 0, :]),
else:
raise ValueError(f"Unknown number of dimensions: {ds.ndim}")

inflow_df = np.nan_to_num(inflow_df, nan=0)
inflow_df = inflow_df * weight_df['area_sqm'].values * conversion_factor
inflow_df = pd.DataFrame(inflow_df, columns=stream_ids, index=datetime_array)
inflow_df = inflow_df.groupby(by=stream_ids, axis=1).sum()

if cumulative:
inflow_array = np.vstack((inflow_array[0, :], np.diff(inflow_array, axis=0)))
inflow_df = pd.DataFrame(
np.vstack([inflow_df.values[0, :], np.diff(inflow_df.values, axis=0)]),
index=inflow_df.index,
columns=inflow_df.columns
)

inflow_df[inflow_df < 0] = 0

# Check that all timesteps are the same
time_diff = np.diff(datetime_array)
if not np.all(time_diff == datetime_array[1] - datetime_array[0]):
if timestep is None:
logging.warning('Timesteps are not all uniform and a target timestep was not provided.')
timestep = datetime_array[1] - datetime_array[0]
logging.warning(f'Assuming the first timedelta is the target: {timestep.astype("timedelta64[s]")}')
elif isinstance(timestep,datetime.timedelta):
# Convert datetime timedelta to timedelta64[ns]
timestep = np.timedelta64(timestep,'ns')

logging.warning("Input datasets do NOT have consistent time steps!")
logging.warning(f"Attempting to linearly interpolate non-uniform timesteps to {timestep.astype('timedelta64[h]')}")

# make sure that the rows with a non-uniform timestep are all the same timestep
rows_with_matching_timesteps = np.where(time_diff == timestep)[0]
rows_with_nonmatching_timesteps = np.where(time_diff != timestep)[0]
non_matching_timesteps = time_diff[rows_with_nonmatching_timesteps]
if not all([i == non_matching_timesteps[0] for i in non_matching_timesteps]):
raise ValueError("Non-matching timesteps are not all the same")

# make sure that the timestep is a multiple of the non-matching timestep
if non_matching_timesteps[0] % timestep != 0:
raise ValueError("The non-uniform timesteps are not a multiple of the non-matching timestep")
num_substeps_to_linearly_interpolate = non_matching_timesteps[0] // timestep

# linearly interpolate the non-matching timesteps
new_inflows_array = inflow_array[rows_with_nonmatching_timesteps, :] / num_substeps_to_linearly_interpolate
new_inflows_array = np.repeat(new_inflows_array, num_substeps_to_linearly_interpolate, axis=0)
inflow_array = np.vstack((inflow_array[rows_with_matching_timesteps, :], new_inflows_array))

# set the new list of timesteps
datetime_array = np.arange(
datetime_array[0],
datetime_array[-1] + timestep * (num_substeps_to_linearly_interpolate - 1 - 1),
timestep
# # figure out how many uniform timesteps are represented by each row
# timesteps_per_row = np.hstack([np.array(timestep), time_diff]) / timestep
# df1 = (
# inflow_df
# .div(timesteps_per_row, axis=0)
# .resample(rule=f'{timestep.astype("timedelta64[s]").astype(int)}S')
# .bfill()
# )

# everything is forced to be incremental before this step so we can use cumsum to get the cumulative values
inflow_df = (
inflow_df
.cumsum()
.resample(rule=f'{timestep.astype("timedelta64[s]").astype(int)}S')
.interpolate(method='linear')
)

inflow_array = np.nan_to_num(inflow_array, nan=0)
inflow_array[inflow_array < 0] = 0
inflow_array = inflow_array * weight_df['area_sqm'].values * conversion_factor
inflow_array = pd.DataFrame(inflow_array, columns=stream_ids)
inflow_array = inflow_array.groupby(by=stream_ids, axis=1).sum()
inflow_array = inflow_array[sorted_rivid_array].to_numpy()

ds.close()
inflow_df = pd.DataFrame(
np.vstack([inflow_df.values[0, :], np.diff(inflow_df.values, axis=0)]),
index=inflow_df.index,
columns=inflow_df.columns
)
datetime_array = inflow_df.index.to_numpy()

# Create output inflow netcdf data
logging.info("Writing inflows to file")
Expand All @@ -227,6 +226,7 @@ def create_inflow_file(lsm_data: str,
if file_label is not None:
file_name = f'm3_{vpu_name}_{start_date}_{end_date}_{file_label}.nc'
inflow_file_path = os.path.join(inflow_dir, file_name)
logging.debug(f'Writing inflow file to {inflow_file_path}')

with nc.Dataset(inflow_file_path, "w", format="NETCDF3_CLASSIC") as inflow_nc:
# create dimensions
Expand All @@ -237,7 +237,7 @@ def create_inflow_file(lsm_data: str,
# m3_riv
# note - nan's and fill values are not supported on netcdf3 files
m3_riv_var = inflow_nc.createVariable('m3_riv', 'f4', ('time', 'rivid'))
m3_riv_var[:] = inflow_array
m3_riv_var[:] = inflow_df.values
m3_riv_var.long_name = 'accumulated inflow inflow volume in river reach boundaries'
m3_riv_var.units = 'm3'
m3_riv_var.coordinates = 'lon lat'
Expand All @@ -253,7 +253,7 @@ def create_inflow_file(lsm_data: str,

# time
reference_time = datetime_array[0]
time_step = (datetime_array[1] - datetime_array[0]).astype('timedelta64[s]')
time_step = (datetime_array[1] - reference_time).astype('timedelta64[s]')
time_var = inflow_nc.createVariable('time', 'i4', ('time',))
time_var[:] = (datetime_array - reference_time).astype('timedelta64[s]').astype(int)
time_var.long_name = 'time'
Expand Down Expand Up @@ -301,5 +301,4 @@ def create_inflow_file(lsm_data: str,
inflow_nc.geospatial_lon_min = min_lon
inflow_nc.geospatial_lon_max = max_lon

lsm_dataset.close()
return
5 changes: 5 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,9 @@
'Topic :: Scientific/Engineering',
'Topic :: Scientific/Engineering :: Hydrology',
],
entrypoints={
'console_scripts': [
'basininflow = basininflow.cli:main',
],
},
)
12 changes: 6 additions & 6 deletions tests/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ def check_function(validation_ds, output_ds, test):


# TEST 1: Normal inputs, directory of LSM
create_inflow_file('tests/inputs/era5_721x1440_sample_data/',
'tests/test_vpu/123',
create_inflow_file('tests/inputs/era5_721x1440_sample_data/',
'tests/test_vpu/123',
'tests/test_results/',)

out_ds = nc.Dataset(glob.glob('./tests/test_results/*_123_*.nc')[0], 'r')
Expand All @@ -74,8 +74,8 @@ def check_function(validation_ds, output_ds, test):
check_function(val_ds, out_ds, 'TEST 1: Normal inputs')

# TEST 2: Forecast inputs, auto timestep
create_inflow_file('tests/inputs/era5_2560x5120_sample_data/forecast_data.nc',
'tests/test_vpu/345',
create_inflow_file('tests/inputs/era5_2560x5120_sample_data/forecast_data.nc',
'tests/test_vpu/345',
'tests/test_results/',
cumulative=True)

Expand All @@ -85,8 +85,8 @@ def check_function(validation_ds, output_ds, test):
check_function(val_ds, out_ds, 'TEST 2: Forecast inputs, auto timestep')

# TEST 3: Forecast inputs, 1 hour timestep
create_inflow_file('tests/inputs/era5_2560x5120_sample_data/forecast_data.nc',
'tests/test_vpu/345',
create_inflow_file('tests/inputs/era5_2560x5120_sample_data/forecast_data.nc',
'tests/test_vpu/345',
'tests/test_results/',
vpu_name='custom_vpu',
cumulative=True,
Expand Down

0 comments on commit 5bc3eb3

Please sign in to comment.