Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enhance clustering #34

Merged
merged 17 commits into from
Sep 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 31 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
# Stream Analysis for Bias Estimation and Reduction
# SABER Stream Analysis for Bias Estimation and Reduction

## Theory
Large scale hydrological models are often biased despite the best calibration efforts. Bias correction is a pre- or
post-processing step applied to the model's inputs or outputs to empirically alter datasets and improve model performance.
SABER is a bias correction post processor for discharge predictions. SABER builds on frequency matching ideas used in several
other methods that match biased predictions with observed data with corresponding exceedance probabilities; that is,
using the flow duration curve. SABER expands this approach to ungauged stream reaches using spatial analysis, clustering
(machine learning), and statistical refinements and a new concept- the scalar flow duration curve.
## Installation
It is recommended `saber` be installed in its own virtual environment. Saber requires `python>=3.10`.

## Python environment
```bash
pip install hydrosaber
```

## Example
The theory for the SABER method is presented in this open access journal article https://doi.org/10.3390/hydrology9070113.
Be sure to read and understand the theory before using this code. This package is a series of functions that perform the
steps outlined in the paper. An example script is provided in `examples/example.py`.

This package is configured to use `logging` at the `INFO` level to give status updates for longer processing steps.

```python
import logging
logging.basicConfig(level=logging.INFO, filename='path/to/info.log')
```

## Dependencies
See requirements.txt

## Required Data/Inputs
You need the following data to follow this procedure. Geopackage format GIS data may be substituted with Shapefile or
equivalent formats that can be read by `geopandas`.
## Required Data/Inputs
1. Geopackage drainage Lines (usually delineated center lines) and catchments/subbasins (polygons) in the watershed. The
attribute tables for both should contain (at least) the following entries for each feature:
- An identifier column (alphanumeric) labeled `model_id`
Expand All @@ -35,16 +44,23 @@ Things to check when preparing your data
3. The GIS datasets should only contain gauges and reaches/subbasins within the area of interest. Clip/delete anything
else. You might find it helpful to keep a watershed boundary geopackage.

- `tables/`
- `model_id_list.parquet`
- `drain_table.parquet`
- `gauge_table.parquet`
- `hindcast_series.parquet`
- `hindcast_fdc.parquet`
- `hindcast_fdc_transformed.parquet`

## Process
### 1 Create a Working Directory

Your working directory should exactly like this.
```
working_directory
tables/
data_inputs/
kmeans_outputs/
gis_outputs/
clusters/
gis/
```

### 2 Prepare Spatial Data (scripts not provided)
Expand Down
2 changes: 2 additions & 0 deletions conda-environment.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ dependencies:
- python=3
- contextily
- geopandas
- fastparquet
- hydrostats
- joblib
- kneed
Expand All @@ -17,4 +18,5 @@ dependencies:
- pandas
- pyarrow
- requests
- scikit-learn
- scipy
133 changes: 133 additions & 0 deletions examples/global-scratch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
import logging

import numpy as np
import pandas as pd

import saber

np.seterr(all="ignore")


def workflow(workdir, hist_sim_nc, obs_data_dir, drain_gis, gauge_gis):
# scaffold the project working directory
logger.info('Scaffold project working directory')
# saber.io.scaffold_workdir(workdir)

# Prepare input data
logger.info('Prepare input data')
# saber.prep.gis_tables(workdir, drain_gis=drain_gis, gauge_gis=gauge_gis)
# saber.prep.hindcast(workdir, hist_sim_nc, drop_order_1=True)

# Generate clusters
logger.info('Generate Clusters')
saber.cluster.generate(workdir, x=x_fdc)
logger.info('Cluster Post-Processing and Metrics')
saber.cluster.summarize_fit(workdir)
saber.cluster.calc_silhouette(workdir, x=x_fdc, n_clusters=range(2, 8))
logger.info('Create Plots')
saber.cluster.plot_clusters(workdir, x=x_fdc)
saber.cluster.plot_silhouettes(workdir)
saber.cluster.plot_centers(workdir)
saber.cluster.plot_fit_metrics(workdir)

# ALL COMPLETED ABOVE

# Generate assignments table
# print('Generate Assignment Table')
# assign_table = saber.assign.gen(workdir)
# assign_table = saber.assign.merge_clusters(workdir, assign_table)
# assign_table = saber.assign.assign_gauged(assign_table)
# assign_table = saber.assign.assign_propagation(assign_table)
# assign_table = saber.assign.assign_by_distance(assign_table)
# saber.io.write_table(assign_table, workdir, 'assign_table')

# Generate GIS files
# print('Generate GIS files')
# saber.gis.clip_by_assignment(workdir, assign_table, drain_gis)
# saber.gis.clip_by_cluster(workdir, assign_table, drain_gis)
# saber.gis.clip_by_unassigned(workdir, assign_table, drain_gis)

# Compute the corrected simulation data
# print('Starting Calibration')
# saber.calibrate_region(workdir, assign_table)

# run the validation study
# print('Performing Validation')
# saber.validate.sample_gauges(workdir, overwrite=True)
# saber.validate.run_series(workdir, drain_shape, obs_data_dir)
# vtab = saber.validate.gen_val_table(workdir)
# saber.gis.validation_maps(workdir, gauge_shape, vtab)

logger.info('SABER Completed')
return


# basedir = '/Volumes/T7/global-saber/saber-directories-50fdcpts'
# hindcast_ncs = '/Volumes/T7/GEOGloWS/GEOGloWS-Hindcast-20220430'
# drainline_pqs = '/Volumes/T7/GEOGloWS/DrainageParquet'
# grdc_dir = os.path.join(basedir, 'GRDC')
# # grdc_gis = os.path.join(basedir, 'grdc_points.shp')
# grdc_gis = None
#
#
# regions = [
# 'africa',
# 'australia',
# 'central_america',
# 'central_asia',
# 'east_asia',
# 'europe',
# 'islands',
# 'japan',
# 'middle_east',
# 'north_america',
# 'south_asia',
# 'south_america',
# 'west_asia',
# ]
#
# for wdir in natsorted(glob.glob(os.path.join(basedir, '*'))):
# country = os.path.basename(wdir)
# print('\n')
# print(country)
# hist_sim_nc = os.path.join(hindcast_ncs, f'{country}-geoglows', 'Qout_era5_t640_24hr_19790101to20220430.nc')
# drain_gis = os.path.join(drainline_pqs, f'{country}-drainagelines.parquet')
# workflow(wdir, hist_sim_nc, grdc_dir, drain_gis, grdc_gis)


# workdir = '/Users/rchales/Desktop/tmp'
workdir = '/Volumes/T7/geoglows_saber'
logging.basicConfig(
level=logging.INFO,
filename='geoglows_saber.log',
filemode='w',
datefmt='%Y-%m-%d %X',
format='%(asctime)s: %(name)s - %(message)s'
)
logger = logging.getLogger(__name__)
logger.info('Reading prepared data')
# x_fdc = saber.io.read_table(workdir, 'hindcast_fdc_trans').values
x_fdc = pd.read_parquet('/Volumes/T7/geoglows_saber/Backups/hindcast_fdc_transformed_noord1.parquet').values

# logger.info('Scaffold project working directory')
# saber.io.scaffold_workdir(workdir)

# logger.info('Prepare input data')
# saber.prep.gis_tables(workdir, drain_gis=drain_gis, gauge_gis=gauge_gis)
# saber.prep.hindcast(workdir, hist_sim_nc, drop_order_1=True)

# Generate clusters
logger.info('Generate Clusters')
saber.cluster.generate(workdir, x=x_fdc)

logger.info('Cluster Post-Processing and Metrics')
saber.cluster.summarize_fit(workdir)
saber.cluster.calc_silhouette(workdir, x=x_fdc, n_clusters=range(2, 10))

logger.info('Create Plots')
saber.cluster.plot_clusters(workdir, x=x_fdc)
saber.cluster.plot_silhouettes(workdir)
saber.cluster.plot_centers(workdir)
saber.cluster.plot_fit_metrics(workdir)

logger.info('SABER Completed')
2 changes: 1 addition & 1 deletion examples/saber_example.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import os

import numpy as np
import saber

import saber

np.seterr(all="ignore")

Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
contextily
geopandas
fastparquet
hydrostats
joblib
kneed
Expand All @@ -11,4 +12,5 @@ numpy
pandas
pyarrow
requests
scikit-learn
scipy
2 changes: 1 addition & 1 deletion saber/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@
]

__author__ = 'Riley Hales'
__version__ = '0.4.0'
__version__ = '0.5.0'
__license__ = 'BSD 3 Clause Clear'
19 changes: 11 additions & 8 deletions saber/_calibrate.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import datetime
import os
import statistics
Expand All @@ -17,6 +18,8 @@
from .io import mid_col
from .io import table_hindcast

logger = logging.getLogger(__name__)


def calibrate(sim_flow_a: pd.DataFrame, obs_flow_a: pd.DataFrame, sim_flow_b: pd.DataFrame = None,
fix_seasonally: bool = True, empty_months: str = 'skip',
Expand Down Expand Up @@ -356,7 +359,7 @@ def calibrate_region(workdir: str, assign_table: pd.DataFrame,
for idx, triple in enumerate(assign_table[[mid_col, asgn_mid_col, asgn_gid_col]].values):
try:
if idx % 25 == 0:
print(f'\n\t\t{idx + 1}/{m_size}')
logger.info(f'\n\t\t{idx + 1}/{m_size}')

model_id, asgn_mid, asgn_gid = triple
if np.isnan(asgn_gid) or np.isnan(asgn_mid):
Expand All @@ -372,8 +375,8 @@ def calibrate_region(workdir: str, assign_table: pd.DataFrame,
obs_df = pd.read_csv(os.path.join(obs_data_dir, f'{asgn_gid}.csv'), index_col=0, parse_dates=True)
obs_df = obs_df.dropna()
except Exception as e:
print('failed to read the observed data')
print(e)
logger.info('failed to read the observed data')
logger.info(e)
errors['g2'] += 1
continue

Expand All @@ -383,8 +386,8 @@ def calibrate_region(workdir: str, assign_table: pd.DataFrame,
p_array[:, idx] = calibrated_df['percentile'].values
s_array[:, idx] = calibrated_df['scalars'].values
except Exception as e:
print('failed during the calibration step')
print(e)
logger.info('failed during the calibration step')
logger.info(e)
errors['g3'] += 1
continue

Expand All @@ -399,8 +402,8 @@ def calibrate_region(workdir: str, assign_table: pd.DataFrame,
float(sim_obs_stats[metric].values[0]), float(bcs_obs_stats[metric].values[0])

except Exception as e:
print('failed during collecting stats')
print(e)
logger.info('failed during collecting stats')
logger.info(e)
errors['g4'] += 1
continue

Expand All @@ -413,7 +416,7 @@ def calibrate_region(workdir: str, assign_table: pd.DataFrame,
bcs_nc.sync()
bcs_nc.close()

print(errors)
logger.info(errors)
with open(os.path.join(workdir, 'calibration_errors.json'), 'w') as f:
f.write(str(errors))

Expand Down
13 changes: 9 additions & 4 deletions saber/_propagation.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import logging

import pandas as pd

from .io import asgn_gid_col
Expand All @@ -7,6 +9,8 @@
from .io import order_col
from .io import reason_col

logger = logging.getLogger(__name__)


def walk_downstream(df: pd.DataFrame, start_id: int, same_order: bool = True, outlet_next_id: str or int = -1) -> tuple:
"""
Expand All @@ -26,7 +30,8 @@ def walk_downstream(df: pd.DataFrame, start_id: int, same_order: bool = True, ou

df_ = df.copy()
if same_order:
df_ = df_[df_[order_col] == df_[df_[mid_col] == start_id][order_col].values[0]]
start_id_order = df_[df_[mid_col] == start_id][order_col].values[0]
df_ = df_[df_[order_col] == start_id_order]

stream_row = df_[df_[mid_col] == start_id]
while len(stream_row[down_mid_col].values) > 0 and stream_row[down_mid_col].values[0] != outlet_next_id:
Expand Down Expand Up @@ -97,16 +102,16 @@ def propagate_in_table(df: pd.DataFrame, start_mid: int, start_gid: int, connect
if downstream_row[asgn_mid_col].empty or pd.isna(downstream_row[asgn_mid_col]).any():
_df.loc[_df[mid_col] == segment_id, [asgn_mid_col, asgn_gid_col, reason_col]] = \
[start_mid, start_gid, f'propagation-{direction}-{distance}']
print(f'assigned gauged stream {start_mid} to ungauged {direction} {segment_id}')
logger.info(f'assigned gauged stream {start_mid} to ungauged {direction} {segment_id}')
continue

# if the stream segment does have an assigned value, check the value to determine what to do
else:
downstream_reason = downstream_row[reason_col].values[0]
if downstream_reason == 'gauged':
print('already gauged')
logger.info('already gauged')
if 'propagation' in downstream_reason and int(str(downstream_reason).split('-')[-1]) >= distance:
_df.loc[_df[mid_col] == segment_id, [asgn_mid_col, asgn_gid_col, reason_col]] = \
[start_mid, start_gid, f'propagation-{direction}-{distance}']
print(f'assigned gauged stream {start_mid} to previously assigned {direction} {segment_id}')
logger.info(f'assigned gauged stream {start_mid} to previously assigned {direction} {segment_id}')
return _df
5 changes: 4 additions & 1 deletion saber/assign.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import os

import joblib
Expand All @@ -18,6 +19,8 @@

__all__ = ['gen', 'merge_clusters', 'assign_gauged', 'assign_propagation', 'assign_by_distance', ]

logger = logging.getLogger(__name__)


def gen(workdir: str, cache: bool = True) -> pd.DataFrame:
"""
Expand Down Expand Up @@ -139,7 +142,7 @@ def assign_by_distance(df: pd.DataFrame) -> pd.DataFrame:
ids_to_assign = c_so_sub[c_so_sub[asgn_mid_col].isna()][mid_col].values
avail_assigns = c_so_sub[c_so_sub[asgn_mid_col].notna()]
if ids_to_assign.size == 0 or avail_assigns.empty:
print(f'unable to assign cluster {c_num} to stream order {so_num}')
logger.error(f'unable to assign cluster {c_num} to stream order {so_num}')
continue
# now you find the closest gauge to each unassigned
for id in ids_to_assign:
Expand Down
Loading