From 32cdbe4464729e18f769d0eaa43f7bd398cac7a7 Mon Sep 17 00:00:00 2001 From: rileyhales Date: Sat, 3 Sep 2022 22:41:21 -0600 Subject: [PATCH 01/17] limit to python>=3.10 --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 0453132..c7d7807 100644 --- a/setup.py +++ b/setup.py @@ -30,6 +30,6 @@ 'License :: OSI Approved :: BSD License', 'Natural Language :: English', ], - python_requires=">=3", + python_requires=">=3.10", install_requires=install_requires ) From 7da2299476c4cd04fa8e502e334afe0f46cac336 Mon Sep 17 00:00:00 2001 From: rileyhales Date: Wed, 7 Sep 2022 16:42:34 -0600 Subject: [PATCH 02/17] mods for running on server --- examples/global-scratch.py | 101 +++++++++++++++++++++++++++++++++++++ examples/saber_example.py | 2 +- saber/cluster.py | 10 ++-- saber/io.py | 2 +- saber/prep.py | 2 +- 5 files changed, 110 insertions(+), 7 deletions(-) create mode 100644 examples/global-scratch.py diff --git a/examples/global-scratch.py b/examples/global-scratch.py new file mode 100644 index 0000000..abb2ec3 --- /dev/null +++ b/examples/global-scratch.py @@ -0,0 +1,101 @@ +import numpy as np + +import saber + +np.seterr(all="ignore") + + +def workflow(workdir, hist_sim_nc, obs_data_dir, drain_gis, gauge_gis): + # scaffold the project working directory + print("Scaffold the project working directory") + saber.io.scaffold_workdir(workdir) + + # Prepare input data + print('Prepare inputs') + saber.prep.gis_tables(workdir, drain_gis=drain_gis, gauge_gis=gauge_gis) + saber.prep.hindcast(workdir, hist_sim_nc, drop_order_1=True) + + # Generate clusters + print('Generate clusters') + saber.cluster.generate(workdir) + print('Summarize clusters') + saber.cluster.summarize(workdir) + print('Plot clusters') + saber.cluster.plot_clusters(workdir) + print('Plot silhouettes') + saber.cluster.plot_silhouette(workdir) + + # ALL COMPLETED ABOVE + + # Generate assignments table + # print('Generate Assignment Table') + # assign_table = saber.assign.gen(workdir) + # assign_table = saber.assign.merge_clusters(workdir, assign_table) + # assign_table = saber.assign.assign_gauged(assign_table) + # assign_table = saber.assign.assign_propagation(assign_table) + # assign_table = saber.assign.assign_by_distance(assign_table) + # saber.io.write_table(assign_table, workdir, 'assign_table') + + # Generate GIS files + # print('Generate GIS files') + # saber.gis.clip_by_assignment(workdir, assign_table, drain_gis) + # saber.gis.clip_by_cluster(workdir, assign_table, drain_gis) + # saber.gis.clip_by_unassigned(workdir, assign_table, drain_gis) + + # Compute the corrected simulation data + # print('Starting Calibration') + # saber.calibrate_region(workdir, assign_table) + + # run the validation study + # print('Performing Validation') + # saber.validate.sample_gauges(workdir, overwrite=True) + # saber.validate.run_series(workdir, drain_shape, obs_data_dir) + # vtab = saber.validate.gen_val_table(workdir) + # saber.gis.validation_maps(workdir, gauge_shape, vtab) + + return + + +# basedir = '/Volumes/T7/global-saber/saber-directories-50fdcpts' +# hindcast_ncs = '/Volumes/T7/GEOGloWS/GEOGloWS-Hindcast-20220430' +# drainline_pqs = '/Volumes/T7/GEOGloWS/DrainageParquet' +# grdc_dir = os.path.join(basedir, 'GRDC') +# # grdc_gis = os.path.join(basedir, 'grdc_points.shp') +# grdc_gis = None +# +# +# regions = [ +# 'africa', +# 'australia', +# 'central_america', +# 'central_asia', +# 'east_asia', +# 'europe', +# 'islands', +# 'japan', +# 'middle_east', +# 'north_america', +# 'south_asia', +# 'south_america', +# 'west_asia', +# ] +# +# for wdir in natsorted(glob.glob(os.path.join(basedir, '*'))): +# country = os.path.basename(wdir) +# print('\n') +# print(country) +# hist_sim_nc = os.path.join(hindcast_ncs, f'{country}-geoglows', 'Qout_era5_t640_24hr_19790101to20220430.nc') +# drain_gis = os.path.join(drainline_pqs, f'{country}-drainagelines.parquet') +# workflow(wdir, hist_sim_nc, grdc_dir, drain_gis, grdc_gis) + + +workdir = '/Volumes/T7/SABER' +# Generate clusters +print('Generate clusters') +saber.cluster.generate(workdir) +print('Summarize clusters') +saber.cluster.summarize(workdir) +print('Plot clusters') +saber.cluster.plot_clusters(workdir) +print('Plot silhouettes') +saber.cluster.plot_silhouette(workdir) diff --git a/examples/saber_example.py b/examples/saber_example.py index 843f349..0b27045 100644 --- a/examples/saber_example.py +++ b/examples/saber_example.py @@ -1,8 +1,8 @@ import os import numpy as np -import saber +import saber np.seterr(all="ignore") diff --git a/saber/cluster.py b/saber/cluster.py index 81f8109..afd13e3 100755 --- a/saber/cluster.py +++ b/saber/cluster.py @@ -11,7 +11,7 @@ import numpy as np import pandas as pd from natsort import natsorted -from sklearn.cluster import KMeans +from sklearn.cluster import MiniBatchKMeans from sklearn.metrics import silhouette_samples from .io import cluster_count_file @@ -32,12 +32,13 @@ def generate(workdir: str, max_clusters: int = 12) -> None: None """ # read the prepared data (array x) - x = read_table(workdir, 'hindcast_fdc_trans') - x = x.values + x = read_table(workdir, 'hindcast_fdc_trans').values # build the kmeans model for a range of cluster numbers for n_clusters in range(2, max_clusters + 1): - kmeans = KMeans(n_clusters=n_clusters) + # logging.info(f'Clustering n={n_clusters}') + print(f'Clustering n={n_clusters}') + kmeans = MiniBatchKMeans(n_clusters=n_clusters) kmeans.fit_predict(x) joblib.dump(kmeans, os.path.join(workdir, 'clusters', f'kmeans-{n_clusters}.pickle')) return @@ -62,6 +63,7 @@ def summarize(workdir: str) -> None: x = x.values for model_file in natsorted(glob.glob(os.path.join(workdir, 'clusters', 'kmeans-*.pickle'))): + print(model_file) kmeans = joblib.load(model_file) n_clusters = int(kmeans.n_clusters) diff --git a/saber/io.py b/saber/io.py index 8436a2d..bcf1e55 100644 --- a/saber/io.py +++ b/saber/io.py @@ -82,7 +82,7 @@ def read_table(workdir: str, table_name: str) -> pd.DataFrame: table_path = get_table_path(workdir, table_name) table_format = os.path.splitext(table_path)[-1] if table_format == '.parquet': - return pd.read_parquet(table_path) + return pd.read_parquet(table_path, engine='fastparquet') elif table_format == '.feather': return pd.read_feather(table_path) elif table_format == '.csv': diff --git a/saber/prep.py b/saber/prep.py index 753ba17..987bd30 100755 --- a/saber/prep.py +++ b/saber/prep.py @@ -82,7 +82,7 @@ def hindcast(workdir: str, hind_nc_path: str, drop_order_1: bool = False) -> Non write_table(df, workdir, 'hindcast') # calculate the FDC and save to parquet - exceed_prob = np.linspace(0, 100, 101)[::-1] + exceed_prob = np.linspace(100, 0, 51) df = df.apply(lambda x: np.transpose(np.nanpercentile(x, exceed_prob))) df.index = exceed_prob df.index.name = 'exceed_prob' From f9d8a114d2345da1d9f63bf673648f486ed3a7bc Mon Sep 17 00:00:00 2001 From: Riley Hales Date: Thu, 8 Sep 2022 12:44:02 -0600 Subject: [PATCH 03/17] cap num fdc's shown in cluster subplots, prioritize silhouettes --- examples/global-scratch.py | 8 ++++---- saber/cluster.py | 5 +++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/examples/global-scratch.py b/examples/global-scratch.py index abb2ec3..9b25204 100644 --- a/examples/global-scratch.py +++ b/examples/global-scratch.py @@ -20,10 +20,10 @@ def workflow(workdir, hist_sim_nc, obs_data_dir, drain_gis, gauge_gis): saber.cluster.generate(workdir) print('Summarize clusters') saber.cluster.summarize(workdir) - print('Plot clusters') - saber.cluster.plot_clusters(workdir) print('Plot silhouettes') saber.cluster.plot_silhouette(workdir) + print('Plot clusters') + saber.cluster.plot_clusters(workdir) # ALL COMPLETED ABOVE @@ -95,7 +95,7 @@ def workflow(workdir, hist_sim_nc, obs_data_dir, drain_gis, gauge_gis): saber.cluster.generate(workdir) print('Summarize clusters') saber.cluster.summarize(workdir) -print('Plot clusters') -saber.cluster.plot_clusters(workdir) print('Plot silhouettes') saber.cluster.plot_silhouette(workdir) +print('Plot clusters') +saber.cluster.plot_clusters(workdir) \ No newline at end of file diff --git a/saber/cluster.py b/saber/cluster.py index afd13e3..f455af3 100755 --- a/saber/cluster.py +++ b/saber/cluster.py @@ -105,7 +105,7 @@ def summarize(workdir: str) -> None: def plot_clusters(workdir: str, clusters: int or Iterable = 'all', - max_cols: int = 3, plt_width: int = 3, plt_height: int = 3) -> None: + max_cols: int = 3, plt_width: int = 3, plt_height: int = 3, n_lines: int = 500) -> None: """ Generate figures of the clustered FDC's @@ -115,6 +115,7 @@ def plot_clusters(workdir: str, clusters: int or Iterable = 'all', max_cols: maximum number of columns (subplots) in the figure plt_width: width of each subplot in inches plt_height: height of each subplot in inches + n_lines: max number of lines to plot in each subplot Returns: None @@ -158,7 +159,7 @@ def plot_clusters(workdir: str, clusters: int or Iterable = 'all', ax.set_xlim(0, size) ax.set_xticks(x_values, x_ticks) ax.set_ylim(-2, 4) - for j in x[kmeans.labels_ == i]: + for j in np.random.shuffle(x[kmeans.labels_ == i])[:n_lines]: ax.plot(j.ravel(), "k-") ax.plot(kmeans.cluster_centers_[i].flatten(), "r-") # turn off plotting axes which are blank (when ax number > n_clusters) From d967ce40a0954fbad2938300aeafd265f3cbf9ed Mon Sep 17 00:00:00 2001 From: Riley Hales Date: Thu, 8 Sep 2022 12:56:18 -0600 Subject: [PATCH 04/17] increase number of random inits in minibatchkmeans --- saber/cluster.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/saber/cluster.py b/saber/cluster.py index f455af3..301ef73 100755 --- a/saber/cluster.py +++ b/saber/cluster.py @@ -38,7 +38,7 @@ def generate(workdir: str, max_clusters: int = 12) -> None: for n_clusters in range(2, max_clusters + 1): # logging.info(f'Clustering n={n_clusters}') print(f'Clustering n={n_clusters}') - kmeans = MiniBatchKMeans(n_clusters=n_clusters) + kmeans = MiniBatchKMeans(n_clusters=n_clusters, n_init=6) kmeans.fit_predict(x) joblib.dump(kmeans, os.path.join(workdir, 'clusters', f'kmeans-{n_clusters}.pickle')) return From 911893b60ca0dc66012bfcf66265cd8e159378da Mon Sep 17 00:00:00 2001 From: Riley Hales Date: Thu, 8 Sep 2022 15:09:57 -0600 Subject: [PATCH 05/17] switch print for logging --- saber/_calibrate.py | 19 +++++++++++-------- saber/_propagation.py | 10 +++++++--- saber/assign.py | 5 ++++- saber/cluster.py | 9 ++++++--- 4 files changed, 28 insertions(+), 15 deletions(-) diff --git a/saber/_calibrate.py b/saber/_calibrate.py index 9a1aa08..b3ac004 100644 --- a/saber/_calibrate.py +++ b/saber/_calibrate.py @@ -1,3 +1,4 @@ +import logging import datetime import os import statistics @@ -17,6 +18,8 @@ from .io import mid_col from .io import table_hindcast +logger = logging.getLogger(__name__) + def calibrate(sim_flow_a: pd.DataFrame, obs_flow_a: pd.DataFrame, sim_flow_b: pd.DataFrame = None, fix_seasonally: bool = True, empty_months: str = 'skip', @@ -356,7 +359,7 @@ def calibrate_region(workdir: str, assign_table: pd.DataFrame, for idx, triple in enumerate(assign_table[[mid_col, asgn_mid_col, asgn_gid_col]].values): try: if idx % 25 == 0: - print(f'\n\t\t{idx + 1}/{m_size}') + logger.info(f'\n\t\t{idx + 1}/{m_size}') model_id, asgn_mid, asgn_gid = triple if np.isnan(asgn_gid) or np.isnan(asgn_mid): @@ -372,8 +375,8 @@ def calibrate_region(workdir: str, assign_table: pd.DataFrame, obs_df = pd.read_csv(os.path.join(obs_data_dir, f'{asgn_gid}.csv'), index_col=0, parse_dates=True) obs_df = obs_df.dropna() except Exception as e: - print('failed to read the observed data') - print(e) + logger.info('failed to read the observed data') + logger.info(e) errors['g2'] += 1 continue @@ -383,8 +386,8 @@ def calibrate_region(workdir: str, assign_table: pd.DataFrame, p_array[:, idx] = calibrated_df['percentile'].values s_array[:, idx] = calibrated_df['scalars'].values except Exception as e: - print('failed during the calibration step') - print(e) + logger.info('failed during the calibration step') + logger.info(e) errors['g3'] += 1 continue @@ -399,8 +402,8 @@ def calibrate_region(workdir: str, assign_table: pd.DataFrame, float(sim_obs_stats[metric].values[0]), float(bcs_obs_stats[metric].values[0]) except Exception as e: - print('failed during collecting stats') - print(e) + logger.info('failed during collecting stats') + logger.info(e) errors['g4'] += 1 continue @@ -413,7 +416,7 @@ def calibrate_region(workdir: str, assign_table: pd.DataFrame, bcs_nc.sync() bcs_nc.close() - print(errors) + logger.info(errors) with open(os.path.join(workdir, 'calibration_errors.json'), 'w') as f: f.write(str(errors)) diff --git a/saber/_propagation.py b/saber/_propagation.py index b738a95..7581d9f 100644 --- a/saber/_propagation.py +++ b/saber/_propagation.py @@ -1,3 +1,5 @@ +import logging + import pandas as pd from .io import asgn_gid_col @@ -7,6 +9,8 @@ from .io import order_col from .io import reason_col +logger = logging.getLogger(__name__) + def walk_downstream(df: pd.DataFrame, start_id: int, same_order: bool = True, outlet_next_id: str or int = -1) -> tuple: """ @@ -97,16 +101,16 @@ def propagate_in_table(df: pd.DataFrame, start_mid: int, start_gid: int, connect if downstream_row[asgn_mid_col].empty or pd.isna(downstream_row[asgn_mid_col]).any(): _df.loc[_df[mid_col] == segment_id, [asgn_mid_col, asgn_gid_col, reason_col]] = \ [start_mid, start_gid, f'propagation-{direction}-{distance}'] - print(f'assigned gauged stream {start_mid} to ungauged {direction} {segment_id}') + logger.info(f'assigned gauged stream {start_mid} to ungauged {direction} {segment_id}') continue # if the stream segment does have an assigned value, check the value to determine what to do else: downstream_reason = downstream_row[reason_col].values[0] if downstream_reason == 'gauged': - print('already gauged') + logger.info('already gauged') if 'propagation' in downstream_reason and int(str(downstream_reason).split('-')[-1]) >= distance: _df.loc[_df[mid_col] == segment_id, [asgn_mid_col, asgn_gid_col, reason_col]] = \ [start_mid, start_gid, f'propagation-{direction}-{distance}'] - print(f'assigned gauged stream {start_mid} to previously assigned {direction} {segment_id}') + logger.info(f'assigned gauged stream {start_mid} to previously assigned {direction} {segment_id}') return _df diff --git a/saber/assign.py b/saber/assign.py index 5017d26..3df555b 100644 --- a/saber/assign.py +++ b/saber/assign.py @@ -1,3 +1,4 @@ +import logging import os import joblib @@ -18,6 +19,8 @@ __all__ = ['gen', 'merge_clusters', 'assign_gauged', 'assign_propagation', 'assign_by_distance', ] +logger = logging.getLogger(__name__) + def gen(workdir: str, cache: bool = True) -> pd.DataFrame: """ @@ -139,7 +142,7 @@ def assign_by_distance(df: pd.DataFrame) -> pd.DataFrame: ids_to_assign = c_so_sub[c_so_sub[asgn_mid_col].isna()][mid_col].values avail_assigns = c_so_sub[c_so_sub[asgn_mid_col].notna()] if ids_to_assign.size == 0 or avail_assigns.empty: - print(f'unable to assign cluster {c_num} to stream order {so_num}') + logger.error(f'unable to assign cluster {c_num} to stream order {so_num}') continue # now you find the closest gauge to each unassigned for id in ids_to_assign: diff --git a/saber/cluster.py b/saber/cluster.py index 301ef73..87bb0c3 100755 --- a/saber/cluster.py +++ b/saber/cluster.py @@ -1,5 +1,6 @@ import glob import json +import logging import math import os from collections.abc import Iterable @@ -19,6 +20,8 @@ __all__ = ['generate', 'summarize', 'plot_clusters', 'plot_silhouette'] +logger = logging.getLogger(__name__) + def generate(workdir: str, max_clusters: int = 12) -> None: """ @@ -32,12 +35,12 @@ def generate(workdir: str, max_clusters: int = 12) -> None: None """ # read the prepared data (array x) + logger.info('Reading FDC table') x = read_table(workdir, 'hindcast_fdc_trans').values # build the kmeans model for a range of cluster numbers for n_clusters in range(2, max_clusters + 1): - # logging.info(f'Clustering n={n_clusters}') - print(f'Clustering n={n_clusters}') + logger.info(f'Clustering n={n_clusters}') kmeans = MiniBatchKMeans(n_clusters=n_clusters, n_init=6) kmeans.fit_predict(x) joblib.dump(kmeans, os.path.join(workdir, 'clusters', f'kmeans-{n_clusters}.pickle')) @@ -63,7 +66,7 @@ def summarize(workdir: str) -> None: x = x.values for model_file in natsorted(glob.glob(os.path.join(workdir, 'clusters', 'kmeans-*.pickle'))): - print(model_file) + logger.info(model_file) kmeans = joblib.load(model_file) n_clusters = int(kmeans.n_clusters) From 58927741c4e50d3cedd15b66775852bc60901ef5 Mon Sep 17 00:00:00 2001 From: Riley Hales Date: Thu, 8 Sep 2022 21:31:30 -0600 Subject: [PATCH 06/17] config logging in example script, fix bug shuffling fdcs to plot --- examples/global-scratch.py | 19 +++++++++++++------ saber/cluster.py | 30 +++++++++++++++++++----------- saber/prep.py | 2 +- 3 files changed, 33 insertions(+), 18 deletions(-) diff --git a/examples/global-scratch.py b/examples/global-scratch.py index 9b25204..172c3b7 100644 --- a/examples/global-scratch.py +++ b/examples/global-scratch.py @@ -1,3 +1,6 @@ +import logging +import datetime + import numpy as np import saber @@ -89,13 +92,17 @@ def workflow(workdir, hist_sim_nc, obs_data_dir, drain_gis, gauge_gis): # workflow(wdir, hist_sim_nc, grdc_dir, drain_gis, grdc_gis) -workdir = '/Volumes/T7/SABER' +workdir = '/home/water/global_saber/workdir' +logging.basicConfig(filename='geoglows_saber.log', filemode='w', datefmt='%Y-%m-%d %X', + format='%(name)s - %(asctime)s: %(message)s') +logging.info('Reading prepared data') +fdc_trans_table = saber.io.read_table(workdir, 'hindcast_fdc_trans') # Generate clusters -print('Generate clusters') +logging.info('Generate Clusters') saber.cluster.generate(workdir) -print('Summarize clusters') +logging.info('Summarize Clusters') saber.cluster.summarize(workdir) -print('Plot silhouettes') +logging.info('Plot Silhouette') saber.cluster.plot_silhouette(workdir) -print('Plot clusters') -saber.cluster.plot_clusters(workdir) \ No newline at end of file +logging.ingo('Plot Clusters') +saber.cluster.plot_clusters(workdir) diff --git a/saber/cluster.py b/saber/cluster.py index 87bb0c3..f38ddf2 100755 --- a/saber/cluster.py +++ b/saber/cluster.py @@ -23,20 +23,23 @@ logger = logging.getLogger(__name__) -def generate(workdir: str, max_clusters: int = 12) -> None: +def generate(workdir: str, df: pd.DataFrame = None, max_clusters: int = 12) -> None: """ Trains kmeans clustering models, saves the model as pickle, generates images and supplementary files Args: workdir: path to the project directory + df: dataframe of the prepared FDC data max_clusters: maximum number of clusters to train Returns: None """ # read the prepared data (array x) - logger.info('Reading FDC table') - x = read_table(workdir, 'hindcast_fdc_trans').values + if df is not None: + x = df.values + else: + x = read_table(workdir, 'hindcast_fdc_trans').values # build the kmeans model for a range of cluster numbers for n_clusters in range(2, max_clusters + 1): @@ -55,16 +58,12 @@ def summarize(workdir: str) -> None: workdir: path to the project directory Returns: - + None """ summary = {'number': [], 'inertia': [], 'n_iter': [], 'silhouette': []} silhouette_scores = [] labels = [] - # read the prepared data (array x) - x = read_table(workdir, 'hindcast_fdc_trans') - x = x.values - for model_file in natsorted(glob.glob(os.path.join(workdir, 'clusters', 'kmeans-*.pickle'))): logger.info(model_file) kmeans = joblib.load(model_file) @@ -107,13 +106,14 @@ def summarize(workdir: str) -> None: return -def plot_clusters(workdir: str, clusters: int or Iterable = 'all', +def plot_clusters(workdir: str, df: pd.DataFrame = None, clusters: int or Iterable = 'all', max_cols: int = 3, plt_width: int = 3, plt_height: int = 3, n_lines: int = 500) -> None: """ Generate figures of the clustered FDC's Args: workdir: path to the project directory + df: dataframe of the prepared FDC data clusters: number of clusters to create figures for max_cols: maximum number of columns (subplots) in the figure plt_width: width of each subplot in inches @@ -123,7 +123,12 @@ def plot_clusters(workdir: str, clusters: int or Iterable = 'all', Returns: None """ - x = read_table(workdir, 'hindcast_fdc_trans').values + # read the prepared data (array x) + if df is not None: + x = df.values + else: + x = read_table(workdir, 'hindcast_fdc_trans').values + size = x.shape[1] x_values = np.linspace(0, size, 5) x_ticks = np.linspace(0, 100, 5).astype(int) @@ -162,7 +167,10 @@ def plot_clusters(workdir: str, clusters: int or Iterable = 'all', ax.set_xlim(0, size) ax.set_xticks(x_values, x_ticks) ax.set_ylim(-2, 4) - for j in np.random.shuffle(x[kmeans.labels_ == i])[:n_lines]: + fdc_to_plot = x[kmeans.labels_ == i] + np.random.shuffle(fdc_to_plot) + fdc_to_plot = fdc_to_plot[:n_lines] + for j in fdc_to_plot: ax.plot(j.ravel(), "k-") ax.plot(kmeans.cluster_centers_[i].flatten(), "r-") # turn off plotting axes which are blank (when ax number > n_clusters) diff --git a/saber/prep.py b/saber/prep.py index 987bd30..be87052 100755 --- a/saber/prep.py +++ b/saber/prep.py @@ -82,7 +82,7 @@ def hindcast(workdir: str, hind_nc_path: str, drop_order_1: bool = False) -> Non write_table(df, workdir, 'hindcast') # calculate the FDC and save to parquet - exceed_prob = np.linspace(100, 0, 51) + exceed_prob = np.linspace(100, 0, 41) df = df.apply(lambda x: np.transpose(np.nanpercentile(x, exceed_prob))) df.index = exceed_prob df.index.name = 'exceed_prob' From d06fba16b174f8651437a107b33c7c58e6b4d817 Mon Sep 17 00:00:00 2001 From: Riley Hales Date: Thu, 8 Sep 2022 21:34:40 -0600 Subject: [PATCH 07/17] use logger in example script --- examples/global-scratch.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/examples/global-scratch.py b/examples/global-scratch.py index 172c3b7..1af01d9 100644 --- a/examples/global-scratch.py +++ b/examples/global-scratch.py @@ -95,14 +95,15 @@ def workflow(workdir, hist_sim_nc, obs_data_dir, drain_gis, gauge_gis): workdir = '/home/water/global_saber/workdir' logging.basicConfig(filename='geoglows_saber.log', filemode='w', datefmt='%Y-%m-%d %X', format='%(name)s - %(asctime)s: %(message)s') -logging.info('Reading prepared data') +logger = logging.getLogger(__name__) +logger.info('Reading prepared data') fdc_trans_table = saber.io.read_table(workdir, 'hindcast_fdc_trans') # Generate clusters -logging.info('Generate Clusters') +logger.info('Generate Clusters') saber.cluster.generate(workdir) -logging.info('Summarize Clusters') +logger.info('Summarize Clusters') saber.cluster.summarize(workdir) -logging.info('Plot Silhouette') +logger.info('Plot Silhouette') saber.cluster.plot_silhouette(workdir) -logging.ingo('Plot Clusters') +logger.ingo('Plot Clusters') saber.cluster.plot_clusters(workdir) From 0f298b6d40c29380409b09dfc3ceeb85c96a5d09 Mon Sep 17 00:00:00 2001 From: Riley Hales Date: Thu, 8 Sep 2022 21:34:55 -0600 Subject: [PATCH 08/17] correct typo --- examples/global-scratch.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/global-scratch.py b/examples/global-scratch.py index 1af01d9..17b6a36 100644 --- a/examples/global-scratch.py +++ b/examples/global-scratch.py @@ -105,5 +105,5 @@ def workflow(workdir, hist_sim_nc, obs_data_dir, drain_gis, gauge_gis): saber.cluster.summarize(workdir) logger.info('Plot Silhouette') saber.cluster.plot_silhouette(workdir) -logger.ingo('Plot Clusters') +logger.info('Plot Clusters') saber.cluster.plot_clusters(workdir) From b5611d977a5f615ced6214d2d39ff5e3064ee4cc Mon Sep 17 00:00:00 2001 From: Riley Hales Date: Thu, 8 Sep 2022 21:44:35 -0600 Subject: [PATCH 09/17] fix bug reading wrong fdc table table --- saber/io.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/saber/io.py b/saber/io.py index bcf1e55..0e1a9d1 100644 --- a/saber/io.py +++ b/saber/io.py @@ -65,7 +65,7 @@ def get_table_path(workdir: str, table_name: str) -> str: elif table_name == 'hindcast_fdc': return os.path.join(workdir, dir_tables, table_hindcast_fdc) elif table_name == 'hindcast_fdc_trans': - return os.path.join(workdir, dir_tables, table_hindcast_fdc) + return os.path.join(workdir, dir_tables, table_hindcast_fdc_trans) elif table_name == 'model_ids': return os.path.join(workdir, dir_tables, table_mids) elif table_name == 'drain_table': From 7c1ff497c6b089a88080dc8d46dc059a05267308 Mon Sep 17 00:00:00 2001 From: Riley Hales Date: Thu, 8 Sep 2022 21:50:00 -0600 Subject: [PATCH 10/17] re-include x in cluster summary function --- examples/global-scratch.py | 2 +- saber/cluster.py | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/examples/global-scratch.py b/examples/global-scratch.py index 17b6a36..2224535 100644 --- a/examples/global-scratch.py +++ b/examples/global-scratch.py @@ -92,7 +92,7 @@ def workflow(workdir, hist_sim_nc, obs_data_dir, drain_gis, gauge_gis): # workflow(wdir, hist_sim_nc, grdc_dir, drain_gis, grdc_gis) -workdir = '/home/water/global_saber/workdir' +workdir = '/Users/rchales/Desktop/tmp' logging.basicConfig(filename='geoglows_saber.log', filemode='w', datefmt='%Y-%m-%d %X', format='%(name)s - %(asctime)s: %(message)s') logger = logging.getLogger(__name__) diff --git a/saber/cluster.py b/saber/cluster.py index f38ddf2..b562d9b 100755 --- a/saber/cluster.py +++ b/saber/cluster.py @@ -50,16 +50,23 @@ def generate(workdir: str, df: pd.DataFrame = None, max_clusters: int = 12) -> N return -def summarize(workdir: str) -> None: +def summarize(workdir: str, df: pd.DataFrame = None) -> None: """ Generate a summary of the clustering results, calculate the silhouette score, save the centers and labels to parquet Args: workdir: path to the project directory + df: dataframe of the prepared FDC data Returns: None """ + # read the prepared data (array x) + if df is not None: + x = df.values + else: + x = read_table(workdir, 'hindcast_fdc_trans').values + summary = {'number': [], 'inertia': [], 'n_iter': [], 'silhouette': []} silhouette_scores = [] labels = [] From 4fc16dfcad80b055283fa1d02bbecaa7b16befed Mon Sep 17 00:00:00 2001 From: Riley Hales Date: Fri, 9 Sep 2022 10:38:32 -0600 Subject: [PATCH 11/17] improve options for passing arrays to cluster functions --- README.md | 46 +++++++++++++++++++++++++------------- conda-environment.yml | 2 ++ examples/global-scratch.py | 13 +++++------ requirements.txt | 2 ++ saber/cluster.py | 38 ++++++++++++------------------- 5 files changed, 55 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index e4016a9..80553eb 100755 --- a/README.md +++ b/README.md @@ -1,19 +1,28 @@ -# Stream Analysis for Bias Estimation and Reduction +# SABER Stream Analysis for Bias Estimation and Reduction -## Theory -Large scale hydrological models are often biased despite the best calibration efforts. Bias correction is a pre- or -post-processing step applied to the model's inputs or outputs to empirically alter datasets and improve model performance. -SABER is a bias correction post processor for discharge predictions. SABER builds on frequency matching ideas used in several -other methods that match biased predictions with observed data with corresponding exceedance probabilities; that is, -using the flow duration curve. SABER expands this approach to ungauged stream reaches using spatial analysis, clustering -(machine learning), and statistical refinements and a new concept- the scalar flow duration curve. +## Installation +It is recommended `saber` be installed in its own virtual environment. Saber requires `python>=3.10`. -## Python environment +```bash +pip install hydrosaber +``` + +## Example +The theory for the SABER method is presented in this open access journal article https://doi.org/10.3390/hydrology9070113. +Be sure to read and understand the theory before using this code. This package is a series of functions that perform the +steps outlined in the paper. An example script is provided in `examples/example.py`. + +This package is configured to use `logging` at the `INFO` level to give status updates for longer processing steps. + +```python +import logging +logging.basicConfig(level=logging.INFO, filename='path/to/info.log') +``` + +## Dependencies See requirements.txt -## Required Data/Inputs -You need the following data to follow this procedure. Geopackage format GIS data may be substituted with Shapefile or -equivalent formats that can be read by `geopandas`. +## Required Data/Inputs 1. Geopackage drainage Lines (usually delineated center lines) and catchments/subbasins (polygons) in the watershed. The attribute tables for both should contain (at least) the following entries for each feature: - An identifier column (alphanumeric) labeled `model_id` @@ -35,6 +44,14 @@ Things to check when preparing your data 3. The GIS datasets should only contain gauges and reaches/subbasins within the area of interest. Clip/delete anything else. You might find it helpful to keep a watershed boundary geopackage. +- `tables/` + - `model_id_list.parquet` + - `drain_table.parquet` + - `gauge_table.parquet` + - `hindcast_series.parquet` + - `hindcast_fdc.parquet` + - `hindcast_fdc_transformed.parquet` + ## Process ### 1 Create a Working Directory @@ -42,9 +59,8 @@ Your working directory should exactly like this. ``` working_directory tables/ - data_inputs/ - kmeans_outputs/ - gis_outputs/ + clusters/ + gis/ ``` ### 2 Prepare Spatial Data (scripts not provided) diff --git a/conda-environment.yml b/conda-environment.yml index f7879c0..37d642d 100644 --- a/conda-environment.yml +++ b/conda-environment.yml @@ -6,6 +6,7 @@ dependencies: - python=3 - contextily - geopandas + - fastparquet - hydrostats - joblib - kneed @@ -17,4 +18,5 @@ dependencies: - pandas - pyarrow - requests + - scikit-learn - scipy diff --git a/examples/global-scratch.py b/examples/global-scratch.py index 2224535..9df5665 100644 --- a/examples/global-scratch.py +++ b/examples/global-scratch.py @@ -1,5 +1,4 @@ import logging -import datetime import numpy as np @@ -93,17 +92,17 @@ def workflow(workdir, hist_sim_nc, obs_data_dir, drain_gis, gauge_gis): workdir = '/Users/rchales/Desktop/tmp' -logging.basicConfig(filename='geoglows_saber.log', filemode='w', datefmt='%Y-%m-%d %X', +logging.basicConfig(level=logging.INFO, filename='geoglows_saber.log', filemode='w', datefmt='%Y-%m-%d %X', format='%(name)s - %(asctime)s: %(message)s') logger = logging.getLogger(__name__) logger.info('Reading prepared data') -fdc_trans_table = saber.io.read_table(workdir, 'hindcast_fdc_trans') +x_fdc = saber.io.read_table(workdir, 'hindcast_fdc_trans').values # Generate clusters logger.info('Generate Clusters') -saber.cluster.generate(workdir) +saber.cluster.generate(workdir, x=x_fdc) logger.info('Summarize Clusters') -saber.cluster.summarize(workdir) +saber.cluster.summarize(workdir, x=x_fdc) +logger.info('Plot Clusters') +saber.cluster.plot_clusters(workdir, x=x_fdc) logger.info('Plot Silhouette') saber.cluster.plot_silhouette(workdir) -logger.info('Plot Clusters') -saber.cluster.plot_clusters(workdir) diff --git a/requirements.txt b/requirements.txt index 16193a7..572a059 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ contextily geopandas +fastparquet hydrostats joblib kneed @@ -11,4 +12,5 @@ numpy pandas pyarrow requests +scikit-learn scipy diff --git a/saber/cluster.py b/saber/cluster.py index b562d9b..6d90cfa 100755 --- a/saber/cluster.py +++ b/saber/cluster.py @@ -23,22 +23,19 @@ logger = logging.getLogger(__name__) -def generate(workdir: str, df: pd.DataFrame = None, max_clusters: int = 12) -> None: +def generate(workdir: str, x: np.ndarray = None, max_clusters: int = 12) -> None: """ Trains kmeans clustering models, saves the model as pickle, generates images and supplementary files Args: workdir: path to the project directory - df: dataframe of the prepared FDC data + x: a numpy array of the prepared FDC data max_clusters: maximum number of clusters to train Returns: None """ - # read the prepared data (array x) - if df is not None: - x = df.values - else: + if x is None: x = read_table(workdir, 'hindcast_fdc_trans').values # build the kmeans model for a range of cluster numbers @@ -50,21 +47,18 @@ def generate(workdir: str, df: pd.DataFrame = None, max_clusters: int = 12) -> N return -def summarize(workdir: str, df: pd.DataFrame = None) -> None: +def summarize(workdir: str, x: np.ndarray = None) -> None: """ Generate a summary of the clustering results, calculate the silhouette score, save the centers and labels to parquet Args: workdir: path to the project directory - df: dataframe of the prepared FDC data + x: a numpy array of the prepared FDC data Returns: None """ - # read the prepared data (array x) - if df is not None: - x = df.values - else: + if x is None: x = read_table(workdir, 'hindcast_fdc_trans').values summary = {'number': [], 'inertia': [], 'n_iter': [], 'silhouette': []} @@ -77,12 +71,12 @@ def summarize(workdir: str, df: pd.DataFrame = None) -> None: n_clusters = int(kmeans.n_clusters) # save the cluster centroids to table - columns are the cluster number, rows are the centroid FDC values - pd.DataFrame( - np.transpose(kmeans.cluster_centers_), - columns=np.array(range(n_clusters)).astype(str) - ).to_parquet(os.path.join(workdir, 'clusters', f'kmeans-centers-{n_clusters}.parquet')) + logger.info(f'Writing cluster centers') + pd.DataFrame(np.transpose(kmeans.cluster_centers_), columns=np.array(range(n_clusters)).astype(str))\ + .to_parquet(os.path.join(workdir, 'clusters', f'kmeans-centers-{n_clusters}.parquet')) # save the silhouette score for each cluster + logger.info('Calculating silhouette scores') silhouette_scores.append(silhouette_samples(x, kmeans.labels_).flatten()) labels.append(kmeans.labels_.flatten()) @@ -93,8 +87,7 @@ def summarize(workdir: str, df: pd.DataFrame = None) -> None: summary['silhouette'].append(np.mean(silhouette_scores[-1])) # save the summary results as a csv - pd.DataFrame.from_dict(summary) \ - .to_csv(os.path.join(workdir, 'clusters', f'clustering-summary-stats.csv')) + pd.DataFrame.from_dict(summary).to_csv(os.path.join(workdir, 'clusters', f'cluster-metrics.csv')) # save the silhouette scores as a parquet silhouette_scores = np.transpose(np.array(silhouette_scores)) pd.DataFrame(silhouette_scores, columns=np.array(range(2, silhouette_scores.shape[1] + 2)).astype(str)) \ @@ -113,14 +106,14 @@ def summarize(workdir: str, df: pd.DataFrame = None) -> None: return -def plot_clusters(workdir: str, df: pd.DataFrame = None, clusters: int or Iterable = 'all', +def plot_clusters(workdir: str, x: np.ndarray = None, clusters: int or Iterable = 'all', max_cols: int = 3, plt_width: int = 3, plt_height: int = 3, n_lines: int = 500) -> None: """ Generate figures of the clustered FDC's Args: workdir: path to the project directory - df: dataframe of the prepared FDC data + x: a numpy array of the prepared FDC data clusters: number of clusters to create figures for max_cols: maximum number of columns (subplots) in the figure plt_width: width of each subplot in inches @@ -130,10 +123,7 @@ def plot_clusters(workdir: str, df: pd.DataFrame = None, clusters: int or Iterab Returns: None """ - # read the prepared data (array x) - if df is not None: - x = df.values - else: + if x is None: x = read_table(workdir, 'hindcast_fdc_trans').values size = x.shape[1] From 92fffdc68ecb0a8732a0263e13f35e4236def58f Mon Sep 17 00:00:00 2001 From: Riley Hales Date: Fri, 9 Sep 2022 15:36:23 -0600 Subject: [PATCH 12/17] randomly sample elements for calculating silhouette scores --- saber/cluster.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/saber/cluster.py b/saber/cluster.py index 6d90cfa..4ef0fbb 100755 --- a/saber/cluster.py +++ b/saber/cluster.py @@ -47,19 +47,21 @@ def generate(workdir: str, x: np.ndarray = None, max_clusters: int = 12) -> None return -def summarize(workdir: str, x: np.ndarray = None) -> None: +def summarize(workdir: str, x: np.ndarray = None, samples: int = 100_000) -> None: """ Generate a summary of the clustering results, calculate the silhouette score, save the centers and labels to parquet Args: workdir: path to the project directory x: a numpy array of the prepared FDC data + samples: max number of randomly chosen samples to use when calculating the silhouette score Returns: None """ if x is None: x = read_table(workdir, 'hindcast_fdc_trans').values + fdc_df = pd.DataFrame(x) summary = {'number': [], 'inertia': [], 'n_iter': [], 'silhouette': []} silhouette_scores = [] @@ -75,9 +77,20 @@ def summarize(workdir: str, x: np.ndarray = None) -> None: pd.DataFrame(np.transpose(kmeans.cluster_centers_), columns=np.array(range(n_clusters)).astype(str))\ .to_parquet(os.path.join(workdir, 'clusters', f'kmeans-centers-{n_clusters}.parquet')) - # save the silhouette score for each cluster + # randomly sample fdcs from each cluster logger.info('Calculating silhouette scores') - silhouette_scores.append(silhouette_samples(x, kmeans.labels_).flatten()) + fdc_df['label'] = kmeans.labels_ + ss_df = pd.DataFrame(columns=fdc_df.columns.to_list()) + for i in range(n_clusters): + values = fdc_df[fdc_df['label'] == i].drop(columns='label').values + np.random.shuffle(values) + values = values[:samples] + tmp = pd.DataFrame(values) + tmp['label'] = i + ss_df = pd.concat([ss_df, tmp]) + + # calculate their silhouette scores + silhouette_scores.append(silhouette_samples(ss_df.drop(columns='label').values, ss_df['label'].values).flatten()) labels.append(kmeans.labels_.flatten()) # save the summary stats from this model From 6d34c1c1794f6849f751899d88a92063b9a8eb60 Mon Sep 17 00:00:00 2001 From: Riley Hales Date: Fri, 9 Sep 2022 15:45:14 -0600 Subject: [PATCH 13/17] randomly sample elements for calculating silhouette scores --- saber/cluster.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/saber/cluster.py b/saber/cluster.py index 4ef0fbb..8c1177b 100755 --- a/saber/cluster.py +++ b/saber/cluster.py @@ -90,7 +90,8 @@ def summarize(workdir: str, x: np.ndarray = None, samples: int = 100_000) -> Non ss_df = pd.concat([ss_df, tmp]) # calculate their silhouette scores - silhouette_scores.append(silhouette_samples(ss_df.drop(columns='label').values, ss_df['label'].values).flatten()) + silhouette_scores.append( + silhouette_samples(ss_df.drop(columns='label').values, ss_df['label'].values, n_jobs=-1).flatten()) labels.append(kmeans.labels_.flatten()) # save the summary stats from this model From f90210bebc3ffd5eeaa6ff9872d8f54de63c6e27 Mon Sep 17 00:00:00 2001 From: rileyhales Date: Sat, 10 Sep 2022 09:41:11 -0600 Subject: [PATCH 14/17] more logs, organize for readability --- examples/global-scratch.py | 12 ++++++++--- saber/_propagation.py | 3 ++- saber/cluster.py | 44 +++++++++++++++++++++++++++----------- 3 files changed, 42 insertions(+), 17 deletions(-) diff --git a/examples/global-scratch.py b/examples/global-scratch.py index 9df5665..ecb0a75 100644 --- a/examples/global-scratch.py +++ b/examples/global-scratch.py @@ -91,9 +91,15 @@ def workflow(workdir, hist_sim_nc, obs_data_dir, drain_gis, gauge_gis): # workflow(wdir, hist_sim_nc, grdc_dir, drain_gis, grdc_gis) -workdir = '/Users/rchales/Desktop/tmp' -logging.basicConfig(level=logging.INFO, filename='geoglows_saber.log', filemode='w', datefmt='%Y-%m-%d %X', - format='%(name)s - %(asctime)s: %(message)s') +# workdir = '/Users/rchales/Desktop/tmp' +workdir = '/Volumes/T7/geoglows_saber' +logging.basicConfig( + level=logging.INFO, + filename='geoglows_saber.log', + filemode='w', + datefmt='%Y-%m-%d %X', + format='%(asctime)s: %(name)s - %(message)s' +) logger = logging.getLogger(__name__) logger.info('Reading prepared data') x_fdc = saber.io.read_table(workdir, 'hindcast_fdc_trans').values diff --git a/saber/_propagation.py b/saber/_propagation.py index 7581d9f..41ee9f7 100644 --- a/saber/_propagation.py +++ b/saber/_propagation.py @@ -30,7 +30,8 @@ def walk_downstream(df: pd.DataFrame, start_id: int, same_order: bool = True, ou df_ = df.copy() if same_order: - df_ = df_[df_[order_col] == df_[df_[mid_col] == start_id][order_col].values[0]] + start_id_order = df_[df_[mid_col] == start_id][order_col].values[0] + df_ = df_[df_[order_col] == start_id_order] stream_row = df_[df_[mid_col] == start_id] while len(stream_row[down_mid_col].values) > 0 and stream_row[down_mid_col].values[0] != outlet_next_id: diff --git a/saber/cluster.py b/saber/cluster.py index 8c1177b..91da5f2 100755 --- a/saber/cluster.py +++ b/saber/cluster.py @@ -67,14 +67,16 @@ def summarize(workdir: str, x: np.ndarray = None, samples: int = 100_000) -> Non silhouette_scores = [] labels = [] + random_shuffler = np.random.default_rng() + for model_file in natsorted(glob.glob(os.path.join(workdir, 'clusters', 'kmeans-*.pickle'))): logger.info(model_file) kmeans = joblib.load(model_file) n_clusters = int(kmeans.n_clusters) - # save the cluster centroids to table - columns are the cluster number, rows are the centroid FDC values + # save cluster centroids to table - columns are the cluster number, rows are the centroid FDC values logger.info(f'Writing cluster centers') - pd.DataFrame(np.transpose(kmeans.cluster_centers_), columns=np.array(range(n_clusters)).astype(str))\ + pd.DataFrame(np.transpose(kmeans.cluster_centers_), columns=np.array(range(n_clusters)).astype(str)) \ .to_parquet(os.path.join(workdir, 'clusters', f'kmeans-centers-{n_clusters}.parquet')) # randomly sample fdcs from each cluster @@ -83,7 +85,7 @@ def summarize(workdir: str, x: np.ndarray = None, samples: int = 100_000) -> Non ss_df = pd.DataFrame(columns=fdc_df.columns.to_list()) for i in range(n_clusters): values = fdc_df[fdc_df['label'] == i].drop(columns='label').values - np.random.shuffle(values) + random_shuffler.shuffle(values) values = values[:samples] tmp = pd.DataFrame(values) tmp['label'] = i @@ -100,6 +102,8 @@ def summarize(workdir: str, x: np.ndarray = None, samples: int = 100_000) -> Non summary['n_iter'].append(kmeans.n_iter_) summary['silhouette'].append(np.mean(silhouette_scores[-1])) + logger.info('Writing cluster summary files') + # save the summary results as a csv pd.DataFrame.from_dict(summary).to_csv(os.path.join(workdir, 'clusters', f'cluster-metrics.csv')) # save the silhouette scores as a parquet @@ -154,12 +158,18 @@ def plot_clusters(workdir: str, x: np.ndarray = None, clusters: int or Iterable else: raise TypeError('n_clusters should be of type int or an iterable') + random_shuffler = np.random.default_rng() + for model_file in model_files: + logger.info(f'Plotting {model_file}') + + # load the model and calculate kmeans = joblib.load(model_file) n_clusters = int(kmeans.n_clusters) n_cols = min(n_clusters, max_cols) n_rows = math.ceil(n_clusters / n_cols) + # initialize the figure and labels fig, axs = plt.subplots( n_rows, n_cols, @@ -167,7 +177,7 @@ def plot_clusters(workdir: str, x: np.ndarray = None, clusters: int or Iterable dpi=500, squeeze=False, tight_layout=True, - sharey=True + sharey='row' ) fig.suptitle("KMeans FDC Clustering") fig.supxlabel('Exceedance Probability (%)') @@ -178,17 +188,19 @@ def plot_clusters(workdir: str, x: np.ndarray = None, clusters: int or Iterable ax.set_xlim(0, size) ax.set_xticks(x_values, x_ticks) ax.set_ylim(-2, 4) - fdc_to_plot = x[kmeans.labels_ == i] - np.random.shuffle(fdc_to_plot) - fdc_to_plot = fdc_to_plot[:n_lines] - for j in fdc_to_plot: + fdc_sample = x[kmeans.labels_ == i] + random_shuffler.shuffle(fdc_sample) + fdc_sample = fdc_sample[:n_lines] + for j in fdc_sample: ax.plot(j.ravel(), "k-") ax.plot(kmeans.cluster_centers_[i].flatten(), "r-") # turn off plotting axes which are blank (when ax number > n_clusters) for ax in fig.axes[n_clusters:]: ax.axis('off') - fig.savefig(os.path.join(workdir, 'clusters', f'kmeans-clusters-{n_clusters}.png')) + logger.info(f'Saving figure to {kmeans_dir}') + + fig.savefig(os.path.join(kmeans_dir, f'kmeans-clusters-{n_clusters}.png')) plt.close(fig) return @@ -206,11 +218,17 @@ def plot_silhouette(workdir: str, plt_width: int = 3, plt_height: int = 3) -> No Returns: None """ + logger.info('Plotting silhouette scores') + labels_df = pd.read_parquet(os.path.join(workdir, 'clusters', 'kmeans-labels.parquet')) - silhouette_df = pd.read_parquet(os.path.join(workdir, 'clusters', 'kmeans-silhouette_scores.parquet')) + sscore_df = pd.read_parquet(os.path.join(workdir, 'clusters', 'kmeans-silhouette_scores.parquet')) + + for tot_clusters in sscore_df.columns: + logger.info(f'Plotting for {tot_clusters} total clusters') - for tot_clusters in silhouette_df.columns: centers_df = pd.read_parquet(os.path.join(workdir, 'clusters', f'kmeans-centers-{tot_clusters}.parquet')) + + # initialize the figure fig, (ax1, ax2) = plt.subplots( nrows=1, ncols=2, @@ -232,12 +250,12 @@ def plot_silhouette(workdir: str, plt_width: int = 3, plt_height: int = 3) -> No ax2.set_ylabel("Discharge Z-Score") # The vertical line for average silhouette score of all the values - ax1.axvline(x=silhouette_df[tot_clusters].mean(), color="red", linestyle="--") + ax1.axvline(x=sscore_df[tot_clusters].mean(), color="red", linestyle="--") y_lower = 10 for sub_cluster in range(int(tot_clusters)): # select the rows applicable to the current sub cluster - cluster_silhouettes = silhouette_df[labels_df[tot_clusters] == sub_cluster][tot_clusters].values.flatten() + cluster_silhouettes = sscore_df[labels_df[tot_clusters] == sub_cluster][tot_clusters].values.flatten() cluster_silhouettes.sort() n = cluster_silhouettes.shape[0] From 6707672873353a5d8a17745841e31d6b12796025 Mon Sep 17 00:00:00 2001 From: rileyhales Date: Sat, 10 Sep 2022 14:39:58 -0600 Subject: [PATCH 15/17] separate post processors for easier segmented runs, bug fixes --- examples/global-scratch.py | 10 ++- saber/cluster.py | 165 +++++++++++++++++++------------------ saber/io.py | 44 +++++++++- 3 files changed, 135 insertions(+), 84 deletions(-) diff --git a/examples/global-scratch.py b/examples/global-scratch.py index ecb0a75..406565a 100644 --- a/examples/global-scratch.py +++ b/examples/global-scratch.py @@ -21,7 +21,7 @@ def workflow(workdir, hist_sim_nc, obs_data_dir, drain_gis, gauge_gis): print('Generate clusters') saber.cluster.generate(workdir) print('Summarize clusters') - saber.cluster.summarize(workdir) + saber.cluster.summarize_fit(workdir) print('Plot silhouettes') saber.cluster.plot_silhouette(workdir) print('Plot clusters') @@ -106,9 +106,11 @@ def workflow(workdir, hist_sim_nc, obs_data_dir, drain_gis, gauge_gis): # Generate clusters logger.info('Generate Clusters') saber.cluster.generate(workdir, x=x_fdc) -logger.info('Summarize Clusters') -saber.cluster.summarize(workdir, x=x_fdc) +logger.info('Summarize Fit') +saber.cluster.summarize_fit(workdir) logger.info('Plot Clusters') -saber.cluster.plot_clusters(workdir, x=x_fdc) +saber.cluster.plot_clusters(workdir, x=x_fdc, n_clusters=range(2, 7)) +logger.info('Calculate Silhouettes') +saber.cluster.calc_silhouette(workdir, x=x_fdc, n_clusters=(2, 7)) logger.info('Plot Silhouette') saber.cluster.plot_silhouette(workdir) diff --git a/saber/cluster.py b/saber/cluster.py index 91da5f2..5954b29 100755 --- a/saber/cluster.py +++ b/saber/cluster.py @@ -1,31 +1,31 @@ import glob -import json import logging import math import os from collections.abc import Iterable import joblib -import kneed import matplotlib.cm as cm import matplotlib.pyplot as plt import numpy as np import pandas as pd +from kneed import KneeLocator from natsort import natsorted from sklearn.cluster import MiniBatchKMeans from sklearn.metrics import silhouette_samples -from .io import cluster_count_file +from .io import _find_model_files from .io import read_table +from .io import write_table -__all__ = ['generate', 'summarize', 'plot_clusters', 'plot_silhouette'] +__all__ = ['generate', 'summarize_fit', 'plot_clusters', 'calc_silhouette', 'plot_silhouette'] logger = logging.getLogger(__name__) def generate(workdir: str, x: np.ndarray = None, max_clusters: int = 12) -> None: """ - Trains kmeans clustering models, saves the model as pickle, generates images and supplementary files + Trains scikit-learn MiniBatchKMeans models and saves as pickle Args: workdir: path to the project directory @@ -41,20 +41,65 @@ def generate(workdir: str, x: np.ndarray = None, max_clusters: int = 12) -> None # build the kmeans model for a range of cluster numbers for n_clusters in range(2, max_clusters + 1): logger.info(f'Clustering n={n_clusters}') - kmeans = MiniBatchKMeans(n_clusters=n_clusters, n_init=6) + kmeans = MiniBatchKMeans(n_clusters=n_clusters, n_init=15) kmeans.fit_predict(x) joblib.dump(kmeans, os.path.join(workdir, 'clusters', f'kmeans-{n_clusters}.pickle')) return -def summarize(workdir: str, x: np.ndarray = None, samples: int = 100_000) -> None: +def summarize_fit(workdir: str) -> None: """ - Generate a summary of the clustering results, calculate the silhouette score, save the centers and labels to parquet + Generate a summary of the clustering results save the centers and labels to parquet + + Args: + workdir: path to the project directory + + Returns: + None + """ + summary = {'number': [], 'inertia': [], 'n_iter': []} + labels = [] + + for model_file in _find_model_files(workdir, n_clusters='all'): + logger.info(f'Summarizing {os.path.basename(model_file)}') + kmeans = joblib.load(model_file) + n_clusters = int(kmeans.n_clusters) + labels.append(kmeans.labels_.flatten()) + + # save cluster centroids to table - columns are the cluster number, rows are the centroid FDC values + write_table( + pd.DataFrame(np.transpose(kmeans.cluster_centers_), columns=np.array(range(n_clusters)).astype(str)), + workdir, + f'cluster_centers_{n_clusters}' + ) + + # save the summary stats from this model + summary['number'].append(n_clusters) + summary['inertia'].append(kmeans.inertia_) + summary['n_iter'].append(kmeans.n_iter_) + + # save the summary results as a csv + sum_df = pd.DataFrame(summary) + sum_df['knee'] = KneeLocator(summary['number'], summary['inertia'], curve='convex', direction='decreasing').knee + write_table(sum_df, workdir, 'cluster_metrics') + + # save the labels as a parquet + labels = np.transpose(np.array(labels)) + write_table(pd.DataFrame(labels, columns=np.array(range(2, labels.shape[1] + 2)).astype(str)), + workdir, 'cluster_labels') + return + + +def calc_silhouette(workdir: str, x: np.ndarray, n_clusters: int or Iterable = 'all', + samples: int = 5e5) -> None: + """ + Calculate the silhouette score for the given number of clusters Args: workdir: path to the project directory x: a numpy array of the prepared FDC data - samples: max number of randomly chosen samples to use when calculating the silhouette score + n_clusters: the number of clusters to calculate the silhouette score for + samples: the number of samples to use for the silhouette score calculation Returns: None @@ -63,68 +108,41 @@ def summarize(workdir: str, x: np.ndarray = None, samples: int = 100_000) -> Non x = read_table(workdir, 'hindcast_fdc_trans').values fdc_df = pd.DataFrame(x) - summary = {'number': [], 'inertia': [], 'n_iter': [], 'silhouette': []} - silhouette_scores = [] - labels = [] + summary = {'number': [], 'silhouette': []} random_shuffler = np.random.default_rng() - for model_file in natsorted(glob.glob(os.path.join(workdir, 'clusters', 'kmeans-*.pickle'))): - logger.info(model_file) + for model_file in _find_model_files(workdir, n_clusters): + logger.info(f'Calculating silhouette for {os.path.basename(model_file)}') kmeans = joblib.load(model_file) - n_clusters = int(kmeans.n_clusters) - - # save cluster centroids to table - columns are the cluster number, rows are the centroid FDC values - logger.info(f'Writing cluster centers') - pd.DataFrame(np.transpose(kmeans.cluster_centers_), columns=np.array(range(n_clusters)).astype(str)) \ - .to_parquet(os.path.join(workdir, 'clusters', f'kmeans-centers-{n_clusters}.parquet')) # randomly sample fdcs from each cluster - logger.info('Calculating silhouette scores') fdc_df['label'] = kmeans.labels_ ss_df = pd.DataFrame(columns=fdc_df.columns.to_list()) - for i in range(n_clusters): + for i in range(int(kmeans.n_clusters)): values = fdc_df[fdc_df['label'] == i].drop(columns='label').values random_shuffler.shuffle(values) - values = values[:samples] + values = values[:int(samples)] tmp = pd.DataFrame(values) tmp['label'] = i ss_df = pd.concat([ss_df, tmp]) # calculate their silhouette scores - silhouette_scores.append( - silhouette_samples(ss_df.drop(columns='label').values, ss_df['label'].values, n_jobs=-1).flatten()) - labels.append(kmeans.labels_.flatten()) + ss_df['silhouette'] = silhouette_samples(ss_df.drop(columns='label').values, ss_df['label'].values, n_jobs=-1) + ss_df['silhouette'] = ss_df['silhouette'].round(3) + ss_df.columns = ss_df.columns.astype(str) + write_table(ss_df, workdir, f'cluster_sscores_{kmeans.n_clusters}') # save the summary stats from this model summary['number'].append(n_clusters) - summary['inertia'].append(kmeans.inertia_) - summary['n_iter'].append(kmeans.n_iter_) - summary['silhouette'].append(np.mean(silhouette_scores[-1])) - - logger.info('Writing cluster summary files') + summary['silhouette'].append(ss_df['silhouette'].mean()) - # save the summary results as a csv - pd.DataFrame.from_dict(summary).to_csv(os.path.join(workdir, 'clusters', f'cluster-metrics.csv')) - # save the silhouette scores as a parquet - silhouette_scores = np.transpose(np.array(silhouette_scores)) - pd.DataFrame(silhouette_scores, columns=np.array(range(2, silhouette_scores.shape[1] + 2)).astype(str)) \ - .to_parquet(os.path.join(workdir, 'clusters', 'kmeans-silhouette_scores.parquet')) - # save the labels as a parquet - labels = np.transpose(np.array(labels)) - pd.DataFrame(labels, columns=np.array(range(2, labels.shape[1] + 2)).astype(str)) \ - .to_parquet(os.path.join(workdir, 'clusters', 'kmeans-labels.parquet')) - - # find the knee/elbow - knee = kneed.KneeLocator(summary['number'], summary['inertia'], curve='convex', direction='decreasing').knee - - # save the best fitting cluster counts to a csv - with open(os.path.join(workdir, 'clusters', cluster_count_file), 'w') as f: - f.write(json.dumps({'historical': int(knee)})) + # save the summary stats + write_table(pd.DataFrame(summary), workdir, 'cluster_sscores') return -def plot_clusters(workdir: str, x: np.ndarray = None, clusters: int or Iterable = 'all', +def plot_clusters(workdir: str, x: np.ndarray = None, n_clusters: int or Iterable = 'all', max_cols: int = 3, plt_width: int = 3, plt_height: int = 3, n_lines: int = 500) -> None: """ Generate figures of the clustered FDC's @@ -132,7 +150,7 @@ def plot_clusters(workdir: str, x: np.ndarray = None, clusters: int or Iterable Args: workdir: path to the project directory x: a numpy array of the prepared FDC data - clusters: number of clusters to create figures for + n_clusters: number of clusters to create figures for max_cols: maximum number of columns (subplots) in the figure plt_width: width of each subplot in inches plt_height: height of each subplot in inches @@ -148,19 +166,9 @@ def plot_clusters(workdir: str, x: np.ndarray = None, clusters: int or Iterable x_values = np.linspace(0, size, 5) x_ticks = np.linspace(0, 100, 5).astype(int) - kmeans_dir = os.path.join(workdir, 'clusters') - if clusters == 'all': - model_files = natsorted(glob.glob(os.path.join(kmeans_dir, 'kmeans-*.pickle'))) - elif isinstance(clusters, int): - model_files = glob.glob(os.path.join(kmeans_dir, f'kmeans-{clusters}.pickle')) - elif isinstance(clusters, Iterable): - model_files = natsorted([os.path.join(kmeans_dir, f'kmeans-{i}.pickle') for i in clusters]) - else: - raise TypeError('n_clusters should be of type int or an iterable') - random_shuffler = np.random.default_rng() - for model_file in model_files: + for model_file in _find_model_files(workdir, n_clusters): logger.info(f'Plotting {model_file}') # load the model and calculate @@ -198,9 +206,7 @@ def plot_clusters(workdir: str, x: np.ndarray = None, clusters: int or Iterable for ax in fig.axes[n_clusters:]: ax.axis('off') - logger.info(f'Saving figure to {kmeans_dir}') - - fig.savefig(os.path.join(kmeans_dir, f'kmeans-clusters-{n_clusters}.png')) + fig.savefig(os.path.join(workdir, 'clusters', f'kmeans-clusters-{n_clusters}.png')) plt.close(fig) return @@ -220,13 +226,14 @@ def plot_silhouette(workdir: str, plt_width: int = 3, plt_height: int = 3) -> No """ logger.info('Plotting silhouette scores') - labels_df = pd.read_parquet(os.path.join(workdir, 'clusters', 'kmeans-labels.parquet')) - sscore_df = pd.read_parquet(os.path.join(workdir, 'clusters', 'kmeans-silhouette_scores.parquet')) - - for tot_clusters in sscore_df.columns: - logger.info(f'Plotting for {tot_clusters} total clusters') + clusters_dir = os.path.join(workdir, 'clusters') - centers_df = pd.read_parquet(os.path.join(workdir, 'clusters', f'kmeans-centers-{tot_clusters}.parquet')) + for sscore_table in natsorted(glob.glob(os.path.join(clusters_dir, 'cluster_sscores_*.parquet'))): + logger.info(f'Plotting {sscore_table}') + n_clusters = int(sscore_table.split('_')[-1].split('.')[0]) + sscore_df = pd.read_parquet(sscore_table, engine='fastparquet') + centers_df = read_table(workdir, f'cluster_centers_{n_clusters}') + mean_ss = sscore_df['silhouette'].mean() # initialize the figure fig, (ax1, ax2) = plt.subplots( @@ -238,7 +245,7 @@ def plot_silhouette(workdir: str, plt_width: int = 3, plt_height: int = 3) -> No ) # Plot 1 titles and labels - ax1.set_title("Silhouette Plot") + ax1.set_title(f"Silhouette Plot (mean={mean_ss:.3f})") ax1.set_xlabel("Silhouette Score") ax1.set_ylabel("Cluster Label") ax1.set_yticks([]) # Clear the yaxis labels / ticks @@ -250,22 +257,22 @@ def plot_silhouette(workdir: str, plt_width: int = 3, plt_height: int = 3) -> No ax2.set_ylabel("Discharge Z-Score") # The vertical line for average silhouette score of all the values - ax1.axvline(x=sscore_df[tot_clusters].mean(), color="red", linestyle="--") + ax1.axvline(x=mean_ss, color="red", linestyle="--") y_lower = 10 - for sub_cluster in range(int(tot_clusters)): + for sub_cluster in range(int(n_clusters)): # select the rows applicable to the current sub cluster - cluster_silhouettes = sscore_df[labels_df[tot_clusters] == sub_cluster][tot_clusters].values.flatten() - cluster_silhouettes.sort() + cluster_sscores = sscore_df[sscore_df['label'] == sub_cluster]['silhouette'].values.flatten() + cluster_sscores.sort() - n = cluster_silhouettes.shape[0] + n = cluster_sscores.shape[0] y_upper = y_lower + n - color = cm.nipy_spectral(sub_cluster / int(tot_clusters)) + color = cm.nipy_spectral(sub_cluster / int(n_clusters)) ax1.fill_betweenx( np.arange(y_lower, y_upper), 0, - cluster_silhouettes, + cluster_sscores, facecolor=color, edgecolor=color, alpha=0.7, @@ -279,6 +286,6 @@ def plot_silhouette(workdir: str, plt_width: int = 3, plt_height: int = 3) -> No # add some buffer before the next cluster y_lower = y_upper + 10 - fig.savefig(os.path.join(workdir, 'clusters', f'kmeans-silhouettes-{tot_clusters}.png')) + fig.savefig(os.path.join(clusters_dir, f'silhouette-diagram-{n_clusters}.png')) plt.close(fig) return diff --git a/saber/io.py b/saber/io.py index 0e1a9d1..029f34f 100644 --- a/saber/io.py +++ b/saber/io.py @@ -1,6 +1,10 @@ +import glob import os +from collections.abc import Iterable +from typing import List import pandas as pd +from natsort import natsorted # assign table and gis_input file required column names mid_col = 'model_id' @@ -31,6 +35,11 @@ table_gauge = 'gauge_table.parquet' table_assign = 'assign_table.csv' +# tables generated by the clustering functions +table_cluster_metrics = 'cluster_metrics.csv' +table_cluster_sscores = 'cluster_sscores.csv' +table_cluster_labels = 'cluster_labels.parquet' + # metrics computed on validation sets metric_list = ['ME', 'MAE', 'RMSE', 'MAPE', 'NSE', 'KGE (2012)'] metric_nc_name_list = ['ME', 'MAE', 'RMSE', 'MAPE', 'NSE', 'KGE2012'] @@ -74,6 +83,18 @@ def get_table_path(workdir: str, table_name: str) -> str: return os.path.join(workdir, dir_tables, table_gauge) elif table_name == 'assign_table': return os.path.join(workdir, dir_tables, table_assign) + + elif table_name == 'cluster_metrics': + return os.path.join(workdir, dir_clusters, table_cluster_metrics) + elif table_name == 'cluster_sscores': + return os.path.join(workdir, dir_clusters, table_cluster_sscores) + elif table_name == 'cluster_labels': + return os.path.join(workdir, dir_clusters, table_cluster_labels) + elif table_name.startswith('cluster_centers_'): # cluster_centers_{n_clusters}.parquet - 1 per cluster + return os.path.join(workdir, dir_clusters, f'{table_name}.parquet') + elif table_name.startswith('cluster_sscores_'): # cluster_sscores_{n_clusters}.parquet - 1 per cluster + return os.path.join(workdir, dir_clusters, f'{table_name}.parquet') + else: raise ValueError(f'Unknown table name: {table_name}') @@ -99,6 +120,27 @@ def write_table(table: pd.DataFrame, workdir: str, table_name: str) -> None: elif table_format == '.feather': return table.to_feather(table_path) elif table_format == '.csv': - return table.to_csv(table_path) + return table.to_csv(table_path, index=False) else: raise ValueError(f'Unknown table format: {table_format}') + + +def _find_model_files(workdir: str, n_clusters: int or Iterable = 'all') -> List[str]: + """ + Find all the kmeans model files in the project directory. + + Args: + workdir: path to the project directory + + Returns: + List of paths to the kmeans model files + """ + kmeans_dir = os.path.join(workdir, 'clusters') + if n_clusters == 'all': + return natsorted(glob.glob(os.path.join(kmeans_dir, 'kmeans-*.pickle'))) + elif isinstance(n_clusters, int): + return glob.glob(os.path.join(kmeans_dir, f'kmeans-{n_clusters}.pickle')) + elif isinstance(n_clusters, Iterable): + return natsorted([os.path.join(kmeans_dir, f'kmeans-{i}.pickle') for i in n_clusters]) + else: + raise TypeError('n_clusters should be of type int or an iterable') From 9104815d53465ec3f311f233ffc15da895bc23dc Mon Sep 17 00:00:00 2001 From: rileyhales Date: Sun, 11 Sep 2022 13:57:14 -0600 Subject: [PATCH 16/17] more figures to plot --- examples/global-scratch.py | 57 +++++++++------ saber/cluster.py | 140 ++++++++++++++++++++++++++++++++----- saber/io.py | 49 ++++++++++++- 3 files changed, 207 insertions(+), 39 deletions(-) diff --git a/examples/global-scratch.py b/examples/global-scratch.py index 406565a..893921f 100644 --- a/examples/global-scratch.py +++ b/examples/global-scratch.py @@ -1,6 +1,7 @@ import logging import numpy as np +import pandas as pd import saber @@ -9,23 +10,25 @@ def workflow(workdir, hist_sim_nc, obs_data_dir, drain_gis, gauge_gis): # scaffold the project working directory - print("Scaffold the project working directory") - saber.io.scaffold_workdir(workdir) + logger.info('Scaffold project working directory') + # saber.io.scaffold_workdir(workdir) # Prepare input data - print('Prepare inputs') - saber.prep.gis_tables(workdir, drain_gis=drain_gis, gauge_gis=gauge_gis) - saber.prep.hindcast(workdir, hist_sim_nc, drop_order_1=True) + logger.info('Prepare input data') + # saber.prep.gis_tables(workdir, drain_gis=drain_gis, gauge_gis=gauge_gis) + # saber.prep.hindcast(workdir, hist_sim_nc, drop_order_1=True) # Generate clusters - print('Generate clusters') - saber.cluster.generate(workdir) - print('Summarize clusters') + logger.info('Generate Clusters') + saber.cluster.generate(workdir, x=x_fdc) + logger.info('Cluster Post-Processing and Metrics') saber.cluster.summarize_fit(workdir) - print('Plot silhouettes') - saber.cluster.plot_silhouette(workdir) - print('Plot clusters') - saber.cluster.plot_clusters(workdir) + saber.cluster.calc_silhouette(workdir, x=x_fdc, n_clusters=range(2, 8)) + logger.info('Create Plots') + saber.cluster.plot_clusters(workdir, x=x_fdc) + saber.cluster.plot_silhouettes(workdir) + saber.cluster.plot_centers(workdir) + saber.cluster.plot_fit_metrics(workdir) # ALL COMPLETED ABOVE @@ -55,6 +58,7 @@ def workflow(workdir, hist_sim_nc, obs_data_dir, drain_gis, gauge_gis): # vtab = saber.validate.gen_val_table(workdir) # saber.gis.validation_maps(workdir, gauge_shape, vtab) + logger.info('SABER Completed') return @@ -102,15 +106,28 @@ def workflow(workdir, hist_sim_nc, obs_data_dir, drain_gis, gauge_gis): ) logger = logging.getLogger(__name__) logger.info('Reading prepared data') -x_fdc = saber.io.read_table(workdir, 'hindcast_fdc_trans').values +# x_fdc = saber.io.read_table(workdir, 'hindcast_fdc_trans').values +x_fdc = pd.read_parquet('/Volumes/T7/geoglows_saber/Backups/hindcast_fdc_transformed_noord1.parquet').values + +# logger.info('Scaffold project working directory') +# saber.io.scaffold_workdir(workdir) + +# logger.info('Prepare input data') +# saber.prep.gis_tables(workdir, drain_gis=drain_gis, gauge_gis=gauge_gis) +# saber.prep.hindcast(workdir, hist_sim_nc, drop_order_1=True) + # Generate clusters logger.info('Generate Clusters') saber.cluster.generate(workdir, x=x_fdc) -logger.info('Summarize Fit') + +logger.info('Cluster Post-Processing and Metrics') saber.cluster.summarize_fit(workdir) -logger.info('Plot Clusters') -saber.cluster.plot_clusters(workdir, x=x_fdc, n_clusters=range(2, 7)) -logger.info('Calculate Silhouettes') -saber.cluster.calc_silhouette(workdir, x=x_fdc, n_clusters=(2, 7)) -logger.info('Plot Silhouette') -saber.cluster.plot_silhouette(workdir) +saber.cluster.calc_silhouette(workdir, x=x_fdc, n_clusters=range(2, 10)) + +logger.info('Create Plots') +saber.cluster.plot_clusters(workdir, x=x_fdc) +saber.cluster.plot_silhouettes(workdir) +saber.cluster.plot_centers(workdir) +saber.cluster.plot_fit_metrics(workdir) + +logger.info('SABER Completed') diff --git a/saber/cluster.py b/saber/cluster.py index 5954b29..2cda3e6 100755 --- a/saber/cluster.py +++ b/saber/cluster.py @@ -18,12 +18,12 @@ from .io import read_table from .io import write_table -__all__ = ['generate', 'summarize_fit', 'plot_clusters', 'calc_silhouette', 'plot_silhouette'] +__all__ = ['generate', 'summarize_fit', 'plot_clusters', 'calc_silhouette', 'plot_silhouettes'] logger = logging.getLogger(__name__) -def generate(workdir: str, x: np.ndarray = None, max_clusters: int = 12) -> None: +def generate(workdir: str, x: np.ndarray = None, max_clusters: int = 13) -> None: """ Trains scikit-learn MiniBatchKMeans models and saves as pickle @@ -41,7 +41,7 @@ def generate(workdir: str, x: np.ndarray = None, max_clusters: int = 12) -> None # build the kmeans model for a range of cluster numbers for n_clusters in range(2, max_clusters + 1): logger.info(f'Clustering n={n_clusters}') - kmeans = MiniBatchKMeans(n_clusters=n_clusters, n_init=15) + kmeans = MiniBatchKMeans(n_clusters=n_clusters, init='k-means++', n_init=100) kmeans.fit_predict(x) joblib.dump(kmeans, os.path.join(workdir, 'clusters', f'kmeans-{n_clusters}.pickle')) return @@ -61,7 +61,7 @@ def summarize_fit(workdir: str) -> None: labels = [] for model_file in _find_model_files(workdir, n_clusters='all'): - logger.info(f'Summarizing {os.path.basename(model_file)}') + logger.info(f'Post Processing {os.path.basename(model_file)}') kmeans = joblib.load(model_file) n_clusters = int(kmeans.n_clusters) labels.append(kmeans.labels_.flatten()) @@ -90,8 +90,7 @@ def summarize_fit(workdir: str) -> None: return -def calc_silhouette(workdir: str, x: np.ndarray, n_clusters: int or Iterable = 'all', - samples: int = 5e5) -> None: +def calc_silhouette(workdir: str, x: np.ndarray, n_clusters: int or Iterable = 'all', samples: int = 100_000) -> None: """ Calculate the silhouette score for the given number of clusters @@ -113,7 +112,7 @@ def calc_silhouette(workdir: str, x: np.ndarray, n_clusters: int or Iterable = ' random_shuffler = np.random.default_rng() for model_file in _find_model_files(workdir, n_clusters): - logger.info(f'Calculating silhouette for {os.path.basename(model_file)}') + logger.info(f'Calculating Silhouettes for {os.path.basename(model_file)}') kmeans = joblib.load(model_file) # randomly sample fdcs from each cluster @@ -134,7 +133,7 @@ def calc_silhouette(workdir: str, x: np.ndarray, n_clusters: int or Iterable = ' write_table(ss_df, workdir, f'cluster_sscores_{kmeans.n_clusters}') # save the summary stats from this model - summary['number'].append(n_clusters) + summary['number'].append(kmeans.n_clusters) summary['silhouette'].append(ss_df['silhouette'].mean()) # save the summary stats @@ -143,7 +142,7 @@ def calc_silhouette(workdir: str, x: np.ndarray, n_clusters: int or Iterable = ' def plot_clusters(workdir: str, x: np.ndarray = None, n_clusters: int or Iterable = 'all', - max_cols: int = 3, plt_width: int = 3, plt_height: int = 3, n_lines: int = 500) -> None: + max_cols: int = 3, plt_width: int = 2, plt_height: int = 2, n_lines: int = 2_500) -> None: """ Generate figures of the clustered FDC's @@ -169,7 +168,7 @@ def plot_clusters(workdir: str, x: np.ndarray = None, n_clusters: int or Iterabl random_shuffler = np.random.default_rng() for model_file in _find_model_files(workdir, n_clusters): - logger.info(f'Plotting {model_file}') + logger.info(f'Plotting Clusters {os.path.basename(model_file)}') # load the model and calculate kmeans = joblib.load(model_file) @@ -182,7 +181,7 @@ def plot_clusters(workdir: str, x: np.ndarray = None, n_clusters: int or Iterabl n_rows, n_cols, figsize=(plt_width * n_cols + 1, plt_height * n_rows + 1), - dpi=500, + dpi=750, squeeze=False, tight_layout=True, sharey='row' @@ -206,12 +205,12 @@ def plot_clusters(workdir: str, x: np.ndarray = None, n_clusters: int or Iterabl for ax in fig.axes[n_clusters:]: ax.axis('off') - fig.savefig(os.path.join(workdir, 'clusters', f'kmeans-clusters-{n_clusters}.png')) + fig.savefig(os.path.join(workdir, 'clusters', f'figure-clusters-{n_clusters}.png')) plt.close(fig) return -def plot_silhouette(workdir: str, plt_width: int = 3, plt_height: int = 3) -> None: +def plot_silhouettes(workdir: str, plt_width: int = 3, plt_height: int = 3) -> None: """ Plot the silhouette scores for each cluster. Based on https://scikit-learn.org/stable/auto_examples/cluster/plot_kmeans_silhouette_analysis.html @@ -224,12 +223,12 @@ def plot_silhouette(workdir: str, plt_width: int = 3, plt_height: int = 3) -> No Returns: None """ - logger.info('Plotting silhouette scores') + logger.info('Generating Silhouette Diagrams') clusters_dir = os.path.join(workdir, 'clusters') for sscore_table in natsorted(glob.glob(os.path.join(clusters_dir, 'cluster_sscores_*.parquet'))): - logger.info(f'Plotting {sscore_table}') + logger.info(f'Generating Silhouette Diagram: {os.path.basename(sscore_table)}') n_clusters = int(sscore_table.split('_')[-1].split('.')[0]) sscore_df = pd.read_parquet(sscore_table, engine='fastparquet') centers_df = read_table(workdir, f'cluster_centers_{n_clusters}') @@ -240,7 +239,7 @@ def plot_silhouette(workdir: str, plt_width: int = 3, plt_height: int = 3) -> No nrows=1, ncols=2, figsize=(plt_width * 2 + 1, plt_height + 1), - dpi=500, + dpi=600, tight_layout=True, ) @@ -278,7 +277,7 @@ def plot_silhouette(workdir: str, plt_width: int = 3, plt_height: int = 3) -> No alpha=0.7, ) # Label the silhouette plots with their cluster numbers at the middle - ax1.text(-0.05, y_lower + 0.5 * n, str(sub_cluster + 1)) + ax1.text(-0.05, y_lower + 0.5 * n, f'{sub_cluster + 1}: n={n:,}') # plot the cluster center ax2.plot(centers_df[f'{sub_cluster}'].values, alpha=0.7, c=color, label=f'Cluster {sub_cluster + 1}') @@ -286,6 +285,111 @@ def plot_silhouette(workdir: str, plt_width: int = 3, plt_height: int = 3) -> No # add some buffer before the next cluster y_lower = y_upper + 10 - fig.savefig(os.path.join(clusters_dir, f'silhouette-diagram-{n_clusters}.png')) + fig.savefig(os.path.join(clusters_dir, f'figure-silhouette-diagram-{n_clusters}.png')) plt.close(fig) return + + +def plot_centers(workdir: str, plt_width: int = 2, plt_height: int = 2, max_cols: int = 3) -> None: + """ + Plot the cluster centers for each cluster. + + Args: + workdir: path to the project directory + plt_width: width of each subplot in inches + plt_height: height of each subplot in inches + max_cols: maximum number of columns of subplots in the figure + + Returns: + None + """ + logger.info('Plotting Cluster Centers') + + clusters_dir = os.path.join(workdir, 'clusters') + + # count number of files to plot + centers_files = natsorted(glob.glob(os.path.join(clusters_dir, 'cluster_centers_*.parquet'))) + n_files = len(centers_files) + n_cols = min(n_files, max_cols) + n_rows = math.ceil(n_files / n_cols) + + # initialize the figure and labels + fig, axes = plt.subplots( + n_rows, + n_cols, + figsize=(plt_width * n_cols + 1.25, plt_height * n_rows + 1.25), + dpi=750, + squeeze=False, + tight_layout=True, + sharey='row', + sharex='col' + ) + + fig.suptitle('Cluster Centers', fontsize=16) + fig.supylabel('Discharge Z-Score') + fig.supxlabel('Exceedance Probability (%)') + + for centers_table, ax in zip(centers_files, fig.axes[:n_files]): + n_clusters = int(centers_table.split('_')[-1].split('.')[0]) + centers_df = pd.read_parquet(centers_table, engine='fastparquet') + + for i in range(int(n_clusters)): + ax.plot(centers_df[f'{i}'].values, label=f'Cluster {i + 1}') + + # Plot titles and labels + ax.set_title(f"k={n_clusters} clusters") + ax.set_xlim(0, 40) + ax.set_ylim(-1, 5) + + fig.savefig(os.path.join(clusters_dir, f'figure-cluster-centers.png')) + plt.close(fig) + return + + +def plot_fit_metrics(workdir: str, plt_width: int = 5, plt_height: int = 3) -> None: + """ + Plot the cluster metrics, inertia and silhouette score, vs number of clusters + + Args: + workdir: path to the project directory + plt_width: width of each subplot in inches + plt_height: height of each subplot in inches + + Returns: + None + """ + logger.info('Plotting Cluster Fit Metrics') + + clusters_dir = os.path.join(workdir, 'clusters') + + df = read_table(workdir, 'cluster_metrics') + df = df.merge(read_table(workdir, 'cluster_sscores'), on='number', how='outer') + + # initialize the figure and labels + fig, ax1 = plt.subplots( + figsize=(plt_width, plt_height), + dpi=750, + tight_layout=True, + ) + ax2 = ax1.twinx() + + # Plot titles and labels + ax1.set_title("Clustering Fit Metrics") + ax1.set_xlabel("Number of Clusters") + ax1.set_ylabel("Inertia") + ax2.set_ylabel("Silhouette Score") + + ticks = np.arange(1, df['number'].max() + 2) + ax1.set_xlim(ticks[0], ticks[-1]) + ax1.set_xticks(ticks) + ax2.set_ylim(0, 1) + + # plot the inertia + knee = df['knee'].values[0] + ax1.plot(df['number'], df['inertia'], marker='o', label='Inertia') + ax1.plot(knee, df[df['number'] == knee]['inertia'], marker='o', c='red', label='Knee') + ax2.plot(df['number'], df['silhouette'], marker='o', c='green', label='Silhouette Score') + + fig.savefig(os.path.join(clusters_dir, f'figure-fit-metrics.png')) + plt.close(fig) + return diff --git a/saber/io.py b/saber/io.py index 029f34f..3ee6651 100644 --- a/saber/io.py +++ b/saber/io.py @@ -69,6 +69,19 @@ def scaffold_workdir(path: str, include_validation: bool = True) -> None: def get_table_path(workdir: str, table_name: str) -> str: + """ + Get the path to a table in the project directory by name + + Args: + workdir: + table_name: + + Returns: + Path (str) to the table + + Raises: + ValueError: if the table name is not recognized + """ if table_name == 'hindcast': return os.path.join(workdir, dir_tables, table_hindcast) elif table_name == 'hindcast_fdc': @@ -100,7 +113,24 @@ def get_table_path(workdir: str, table_name: str) -> str: def read_table(workdir: str, table_name: str) -> pd.DataFrame: + """ + Read a table from the project directory by name. + + Args: + workdir: path to the project directory + table_name: name of the table to read + + Returns: + pd.DataFrame + + Raises: + FileNotFoundError: if the table does not exist in the correct directory with the correct name + ValueError: if the table format is not recognized + """ table_path = get_table_path(workdir, table_name) + if not os.path.exists(table_path): + raise FileNotFoundError(f'Table does not exist: {table_path}') + table_format = os.path.splitext(table_path)[-1] if table_format == '.parquet': return pd.read_parquet(table_path, engine='fastparquet') @@ -113,6 +143,20 @@ def read_table(workdir: str, table_name: str) -> pd.DataFrame: def write_table(table: pd.DataFrame, workdir: str, table_name: str) -> None: + """ + Write a table to the correct location in the project directory + + Args: + table: the pandas DataFrame to write + workdir: the path to the project directory + table_name: the name of the table to write + + Returns: + None + + Raises: + ValueError: if the table format is not recognized + """ table_path = get_table_path(workdir, table_name) table_format = os.path.splitext(table_path)[-1] if table_format == '.parquet': @@ -134,8 +178,11 @@ def _find_model_files(workdir: str, n_clusters: int or Iterable = 'all') -> List Returns: List of paths to the kmeans model files + + Raises: + TypeError: if n_clusters is not an int, iterable of int, or 'all' """ - kmeans_dir = os.path.join(workdir, 'clusters') + kmeans_dir = os.path.join(workdir, dir_clusters) if n_clusters == 'all': return natsorted(glob.glob(os.path.join(kmeans_dir, 'kmeans-*.pickle'))) elif isinstance(n_clusters, int): From ba9a3df18af81188f3b6f9ca7d9efea11fc7cb29 Mon Sep 17 00:00:00 2001 From: rileyhales Date: Sun, 11 Sep 2022 14:03:41 -0600 Subject: [PATCH 17/17] increase version number 0.5.0 --- saber/__init__.py | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/saber/__init__.py b/saber/__init__.py index d9eeb2c..46a73e5 100644 --- a/saber/__init__.py +++ b/saber/__init__.py @@ -14,5 +14,5 @@ ] __author__ = 'Riley Hales' -__version__ = '0.4.0' +__version__ = '0.5.0' __license__ = 'BSD 3 Clause Clear' diff --git a/setup.py b/setup.py index c7d7807..aebd7b4 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ install_requires = req.read().splitlines() description = 'tools for bias correction on large hydrological models' -version = '0.4.0' +version = '0.5.0' setup( name='hydrosaber',