Skip to content

Commit

Permalink
Feature/smoothing and extrapolating gps coordinates (#268)
Browse files Browse the repository at this point in the history
* implemented gps postprocessing on top of the #262

This update:
- clears up the SHF LHF calculation
- reads dates of station relocations (when station coordinates are discontinuous) from the `aws-l0/metadata/station_configurations` 
- for each interval between station relocations, a linear function is fitted to the GPS observations of latitude longitude and altitude and is used to interpolate and extrapolate the gps observations
- these new smoothed and gap-free coordinates are the variables `lat, lon, alt`
- for bedrock stations (like KAN_B) static coordinates are used to build `lat, lon, alt`
- eventually `lat_avg`, `lon_avg` `alt_avg` are calculated from `lat, lon, alt` and added as attributes to the netcdf files.

Several minor fixes were also brought like:
- better encoding info removal when reading netcdf
- printing to files variables full of NaNs at `L2` and `L3/stations` but not printing them in the `L3/sites files`.
- recalculate dirWindSpd if needed for historical data
- due to xarray version, new columns need to be added manually before a concatenation of different datasets in join_l3
  • Loading branch information
BaptisteVandecrux committed Aug 20, 2024
1 parent 4fac07a commit 5f054b3
Show file tree
Hide file tree
Showing 11 changed files with 431 additions and 290 deletions.
318 changes: 186 additions & 132 deletions src/pypromice/process/L2toL3.py

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion src/pypromice/process/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ def getL1(self):
logger.info('Level 1 processing...')
self.L0 = [utilities.addBasicMeta(item, self.vars) for item in self.L0]
self.L1 = [toL1(item, self.vars) for item in self.L0]
self.L1A = reduce(xr.Dataset.combine_first, self.L1)
self.L1A = reduce(xr.Dataset.combine_first, reversed(self.L1))

def getL2(self):
'''Perform L1 to L2 data processing'''
Expand Down
19 changes: 13 additions & 6 deletions src/pypromice/process/get_l2tol3.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python
import os, logging, sys
import os, logging, sys, toml
import xarray as xr
from argparse import ArgumentParser
import pypromice
Expand All @@ -12,6 +12,9 @@ def parse_arguments_l2tol3(debug_args=None):
parser = ArgumentParser(description="AWS L3 script for the processing L3 "+
"data from L2. An hourly, daily and monthly L3 "+
"data product is outputted to the defined output path")
parser.add_argument('-c', '--config_folder', type=str, required=True,
default='../aws-l0/metadata/station_configurations/',
help='Path to folder with sites configuration (TOML) files')
parser.add_argument('-i', '--inpath', type=str, required=True,
help='Path to Level 2 .nc data file')
parser.add_argument('-o', '--outpath', default=None, type=str, required=False,
Expand All @@ -24,7 +27,7 @@ def parse_arguments_l2tol3(debug_args=None):
args = parser.parse_args(args=debug_args)
return args

def get_l2tol3(inpath, outpath, variables, metadata):
def get_l2tol3(config_folder, inpath, outpath, variables, metadata):
logging.basicConfig(
format="%(asctime)s; %(levelname)s; %(name)s; %(message)s",
level=logging.INFO,
Expand All @@ -38,15 +41,19 @@ def get_l2tol3(inpath, outpath, variables, metadata):
# Remove encoding attributes from NetCDF
for varname in l2.variables:
if l2[varname].encoding!={}:
l2[varname].encoding = {}
l2[varname].encoding = {}

if 'bedrock' in l2.attrs.keys():
l2.attrs['bedrock'] = l2.attrs['bedrock'] == 'True'
if 'number_of_booms' in l2.attrs.keys():
l2.attrs['number_of_booms'] = int(l2.attrs['number_of_booms'])

# importing station_config (dict) from config_folder (str path)
config_file = config_folder+l2.attrs['station_id']+'.toml'
with open(config_file) as fp:
station_config = toml.load(fp)
# Perform Level 3 processing
l3 = toL3(l2)
l3 = toL3(l2, station_config)

# Write Level 3 dataset to file if output directory given
v = getVars(variables)
Expand All @@ -59,7 +66,7 @@ def get_l2tol3(inpath, outpath, variables, metadata):

def main():
args = parse_arguments_l2tol3()
_ = get_l2tol3(args.inpath, args.outpath, args.variables, args.metadata)
_ = get_l2tol3(args.config_folder, args.inpath, args.outpath, args.variables, args.metadata)

if __name__ == "__main__":
main()
2 changes: 1 addition & 1 deletion src/pypromice/process/join_l2.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def loadArr(infile):
elif infile.split('.')[-1].lower() == 'nc':
with xr.open_dataset(infile) as ds:
ds.load()
# Remove encoding attributes from NetCDF
# Remove encoding attributes from NetCDF
for varname in ds.variables:
if ds[varname].encoding!={}:
ds[varname].encoding = {}
Expand Down
56 changes: 37 additions & 19 deletions src/pypromice/process/join_l3.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@
import numpy as np
import pandas as pd
import xarray as xr
logging.basicConfig(
format="%(asctime)s; %(levelname)s; %(name)s; %(message)s",
level=logging.INFO,
stream=sys.stdout,
)
logger = logging.getLogger(__name__)

def parse_arguments_joinl3(debug_args=None):
Expand Down Expand Up @@ -100,8 +105,20 @@ def readNead(infile):
# combining thermocouple and CS100 temperatures
ds['TA1'] = ds['TA1'].combine_first(ds['TA3'])
ds['TA2'] = ds['TA2'].combine_first(ds['TA4'])

ds=ds.rename(var_name)

standard_vars_to_drop = ["NR", "TA3", "TA4", "TA5", "NR_cor",
"z_surf_1", "z_surf_2", "z_surf_combined",
"TA2m", "RH2m", "VW10m", "SZA", "SAA",
"depth_t_i_1", "depth_t_i_2", "depth_t_i_3", "depth_t_i_4", "depth_t_i_5",
"depth_t_i_6", "depth_t_i_7", "depth_t_i_8", "depth_t_i_9", "depth_t_i_10", "t_i_10m"
]
standard_vars_to_drop = standard_vars_to_drop + [v for v in list(ds.keys()) if v.endswith("_adj_flag")]

# Drop the variables if they are present in the dataset
ds = ds.drop_vars([var for var in standard_vars_to_drop if var in ds])

ds=ds.rename({'timestamp':'time'})
return ds

Expand All @@ -116,7 +133,8 @@ def loadArr(infile, isNead):
ds = xr.Dataset.from_dataframe(df)

elif infile.split('.')[-1].lower() in 'nc':
ds = xr.open_dataset(infile)
with xr.open_dataset(infile) as ds:
ds.load()
# Remove encoding attributes from NetCDF
for varname in ds.variables:
if ds[varname].encoding!={}:
Expand Down Expand Up @@ -202,16 +220,23 @@ def join_l3(config_folder, site, folder_l3, folder_gcnet, outpath, variables, me

filepath = os.path.join(folder_l3, stid, stid+'_hour.nc')
isNead = False
if station_info["project"].lower() in ["historical gc-net", "glaciobasis"]:
if station_info["project"].lower() in ["historical gc-net"]:
filepath = os.path.join(folder_gcnet, stid+'.csv')
isNead = True
if not os.path.isfile(filepath):
logger.info(stid+' is from an project '+folder_l3+' or '+folder_gcnet)
if not os.path.isfile(filepath):
logger.info(stid+' was listed as station but could not be found in '+folder_l3+' nor '+folder_gcnet)
continue

l3, _ = loadArr(filepath, isNead)
l3, _ = loadArr(filepath, isNead)

# removing specific variable from a given file
specific_vars_to_drop = station_info.get("skipped_variables",[])
if len(specific_vars_to_drop)>0:
logger.info("Skipping %s from %s"%(specific_vars_to_drop, stid))
l3 = l3.drop_vars([var for var in specific_vars_to_drop if var in l3])

list_station_data.append((l3, station_info))

# Sort the list in reverse chronological order so that we start with the latest data
sorted_list_station_data = sorted(list_station_data, key=lambda x: x[0].time.max(), reverse=True)
sorted_stids = [info["stid"] for _, info in sorted_list_station_data]
Expand Down Expand Up @@ -246,19 +271,10 @@ def join_l3(config_folder, site, folder_l3, folder_gcnet, outpath, variables, me
for v in l3_merged.data_vars:
if v not in l3.data_vars:
l3[v] = l3.t_u*np.nan

# if l3 (older data) has variables that does not have l3_merged (newer data)
# then they are removed from l3
list_dropped = []
for v in l3.data_vars:
if v not in l3_merged.data_vars:
if v != 'z_stake':
list_dropped.append(v)
l3 = l3.drop(v)
else:
l3_merged[v] = ('time', l3_merged.t_u.data*np.nan)
logger.info('Unused variables in older dataset: '+' '.join(list_dropped))

if v not in l3_merged.data_vars:
l3_merged[v] = l3_merged.t_u*np.nan

# saving attributes of station under an attribute called $stid
st_attrs = l3_merged.attrs.get('stations_attributes', {})
st_attrs[stid] = l3.attrs.copy()
Expand All @@ -280,6 +296,8 @@ def join_l3(config_folder, site, folder_l3, folder_gcnet, outpath, variables, me


# Assign site id
if not l3_merged:
logger.error('No level 2 data file found for '+site)
l3_merged.attrs['site_id'] = site
l3_merged.attrs['stations'] = ' '.join(sorted_stids)
l3_merged.attrs['level'] = 'L3'
Expand Down
12 changes: 8 additions & 4 deletions src/pypromice/process/resample.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import logging
import numpy as np
import xarray as xr
from pypromice.process.L1toL2 import calcDirWindSpeeds
logger = logging.getLogger(__name__)

def resample_dataset(ds_h, t):
Expand Down Expand Up @@ -35,12 +36,15 @@ def resample_dataset(ds_h, t):

# recalculating wind direction from averaged directional wind speeds
for var in ['wdir_u','wdir_l']:
boom = var.split('_')[1]
if var in df_d.columns:
if ('wspd_x_'+var.split('_')[1] in df_d.columns) & ('wspd_y_'+var.split('_')[1] in df_d.columns):
df_d[var] = _calcWindDir(df_d['wspd_x_'+var.split('_')[1]],
df_d['wspd_y_'+var.split('_')[1]])
if ('wspd_x_'+boom in df_d.columns) & ('wspd_y_'+boom in df_d.columns):
df_d[var] = _calcWindDir(df_d['wspd_x_'+boom], df_d['wspd_y_'+boom])
else:
logger.info(var+' in dataframe but not wspd_x_'+var.split('_')[1]+' nor wspd_y_'+var.split('_')[1])
logger.info(var+' in dataframe but not wspd_x_'+boom+' nor wspd_y_'+boom+', recalculating them')
ds_h['wspd_x_'+boom], ds_h['wspd_y_'+boom] = calcDirWindSpeeds(ds_h['wspd_'+boom], ds_h['wdir_'+boom])
df_d[['wspd_x_'+boom, 'wspd_y_'+boom]] = ds_h[['wspd_x_'+boom, 'wspd_y_'+boom]].to_dataframe().resample(t).mean()
df_d[var] = _calcWindDir(df_d['wspd_x_'+boom], df_d['wspd_y_'+boom])

# recalculating relative humidity from average vapour pressure and average
# saturation vapor pressure
Expand Down
111 changes: 84 additions & 27 deletions src/pypromice/process/write.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def prepare_and_write(dataset, outpath, vars_df=None, meta_dict=None, time='60mi
if 'gps_lon' in d2.keys():
d2 = reformat_lon(d2)
else:
logger.info('%s does not have gpd_lon'%name)
logger.info('%s does not have gps_lon'%name)

# Add variable attributes and metadata
if vars_df is None:
Expand All @@ -63,7 +63,11 @@ def prepare_and_write(dataset, outpath, vars_df=None, meta_dict=None, time='60mi
d2 = roundValues(d2, vars_df)

# Get variable names to write out
col_names = getColNames(vars_df, d2, remove_nan_fields=True)
if 'site_id' in d2.attrs.keys():
remove_nan_fields = True
else:
remove_nan_fields = False
col_names = getColNames(vars_df, d2, remove_nan_fields=remove_nan_fields)

# Define filename based on resample rate
t = int(pd.Timedelta((d2['time'][1] - d2['time'][0]).values).total_seconds())
Expand Down Expand Up @@ -100,7 +104,6 @@ def prepare_and_write(dataset, outpath, vars_df=None, meta_dict=None, time='60mi
writeCSV(out_csv, d2, col_names)

# Write to netcdf file
col_names = col_names + ['lat', 'lon', 'alt']
writeNC(out_nc, d2, col_names)
logger.info(f'Written to {out_csv}')
logger.info(f'Written to {out_nc}')
Expand Down Expand Up @@ -245,16 +248,28 @@ def addMeta(ds, meta):
ds : xarray.Dataset
Dataset with metadata
'''
if 'gps_lon' in ds.keys():
ds['lon'] = ds['gps_lon'].mean()
ds['lon'].attrs = ds['gps_lon'].attrs

ds['lat'] = ds['gps_lat'].mean()
ds['lat'].attrs = ds['gps_lat'].attrs

ds['alt'] = ds['gps_alt'].mean()
ds['alt'].attrs = ds['gps_alt'].attrs

if 'lon' in ds.keys():
# caluclating average coordinates based on the extra/interpolated coords
for v in ['lat','lon','alt']:
ds.attrs[v+'_avg'] = ds[v].mean().item()
# dropping the less accurate standard coordinates given in the
# raw or tx config files
for v in ['latitude','longitude']:
if v in ds.attrs.keys():
del ds.attrs[v]
elif 'gps_lon' in ds.keys():
# caluclating average coordinates based on the measured coords (can be gappy)
for v in ['gps_lat','gps_lon','gps_alt']:
if v in ds.keys():
ds.attrs[v+'_avg'] = ds[v].mean().item()
else:
ds.attrs[v+'_avg'] = np.nan
# dropping the less accurate standard coordinates given in the
# raw or tx config files
for v in ['latitude','longitude']:
if v in ds.attrs.keys():
del ds.attrs[v]

# Attribute convention for data discovery
# https://wiki.esipfed.org/Attribute_Convention_for_Data_Discovery_1-3

Expand Down Expand Up @@ -283,19 +298,61 @@ def addMeta(ds, meta):
ds.attrs['date_metadata_modified'] = ds.attrs['date_created']
ds.attrs['processing_level'] = ds.attrs['level'].replace('L','level ')


if 'lat' in ds.keys():
lat_min = ds['lat'].min().values
lat_max = ds['lat'].max().values
elif 'gps_lat' in ds.keys():
lat_min = ds['gps_lat'].min().values
lat_max = ds['gps_lat'].max().values
elif 'latitude' in ds.attrs.keys():
lat_min = ds.attrs['latitude']
lat_max = ds.attrs['latitude']
else:
lat_min =np.nan
lat_max = np.nan


if 'lon' in ds.keys():
lon_min = ds['lon'].min().values
lon_max = ds['lon'].max().values
elif 'gps_lon' in ds.keys():
lon_min = ds['gps_lon'].min().values
lon_max = ds['gps_lon'].max().values
elif 'longitude' in ds.attrs.keys():
lon_min = ds.attrs['longitude']
lon_max = ds.attrs['longitude']
else:
lon_min =np.nan
lon_max = np.nan

if 'alt' in ds.keys():
alt_min = ds['alt'].min().values
alt_max = ds['alt'].max().values
elif 'gps_alt' in ds.keys():
alt_min = ds['gps_alt'].min().values
alt_max = ds['gps_alt'].max().values
elif 'altitude' in ds.attrs.keys():
alt_min = ds.attrs['altitude']
alt_max = ds.attrs['altitude']
else:
alt_min =np.nan
alt_max = np.nan

ds.attrs['geospatial_bounds'] = "POLYGON((" + \
f"{ds['lat'].min().values} {ds['lon'].min().values}, " + \
f"{ds['lat'].min().values} {ds['lon'].max().values}, " + \
f"{ds['lat'].max().values} {ds['lon'].max().values}, " + \
f"{ds['lat'].max().values} {ds['lon'].min().values}, " + \
f"{ds['lat'].min().values} {ds['lon'].min().values}))"

ds.attrs['geospatial_lat_min'] = str(ds['lat'].min().values)
ds.attrs['geospatial_lat_max'] = str(ds['lat'].max().values)
ds.attrs['geospatial_lon_min'] = str(ds['lon'].min().values)
ds.attrs['geospatial_lon_max'] = str(ds['lon'].max().values)
ds.attrs['geospatial_vertical_min'] = str(ds['alt'].min().values)
ds.attrs['geospatial_vertical_max'] = str(ds['alt'].max().values)
f"{lat_min} {lon_min}, " + \
f"{lat_min} {lon_max}, " + \
f"{lat_max} {lon_max}, " + \
f"{lat_max} {lon_min}, " + \
f"{lat_min} {lon_min}))"

ds.attrs['geospatial_lat_min'] = str(lat_min)
ds.attrs['geospatial_lat_max'] = str(lat_max)
ds.attrs['geospatial_lon_min'] = str(lon_min)
ds.attrs['geospatial_lon_max'] = str(lon_max)
ds.attrs['geospatial_vertical_min'] = str(alt_min)
ds.attrs['geospatial_vertical_max'] = str(alt_max)

ds.attrs['geospatial_vertical_positive'] = 'up'
ds.attrs['time_coverage_start'] = str(ds['time'][0].values)
ds.attrs['time_coverage_end'] = str(ds['time'][-1].values)
Expand Down Expand Up @@ -386,5 +443,5 @@ def reformat_lon(dataset, exempt=['UWN', 'Roof_GEUS', 'Roof_PROMICE']):
if id not in exempt:
if 'gps_lon' not in dataset.keys():
return dataset
dataset['gps_lon'] = dataset['gps_lon'] * -1
return dataset
dataset['gps_lon'] = np.abs(dataset['gps_lon']) * -1
return dataset
11 changes: 6 additions & 5 deletions src/pypromice/qc/github_data_issues.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ def flagNAN(ds_in,

for v in varlist:
if v in list(ds.keys()):
logger.info(f'---> flagging {t0} {t1} {v}')
logger.debug(f'---> flagging {t0} {t1} {v}')
ds[v] = ds[v].where((ds['time'] < t0) | (ds['time'] > t1))
else:
logger.info(f'---> could not flag {v} not in dataset')
logger.debug(f'---> could not flag {v} not in dataset')

return ds

Expand Down Expand Up @@ -206,13 +206,14 @@ def adjustData(ds,
t1 = pd.to_datetime(t1, utc=True).tz_localize(None)

index_slice = dict(time=slice(t0, t1))

if len(ds_out[var].loc[index_slice].time.time) == 0:
logger.info(f'---> {t0} {t1} {var} {func} {val}')
logger.info("Time range does not intersect with dataset")
continue

logger.info(f'---> {t0} {t1} {var} {func} {val}')

else:
logger.debug(f'---> {t0} {t1} {var} {func} {val}')

if func == "add":
ds_out[var].loc[index_slice] = ds_out[var].loc[index_slice].values + val
# flagging adjusted values
Expand Down
Loading

0 comments on commit 5f054b3

Please sign in to comment.