Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add quality control modules and scripts for ISD #7

Merged
merged 4 commits into from
Sep 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ disable=
too-many-branches,
too-many-statements,
too-many-return-statements,
too-many-instance-attributes
too-many-instance-attributes,
too-many-positional-arguments,
duplicate-code
8 changes: 8 additions & 0 deletions quality_control/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
The quality control code in this directory is written in Python and is used by WeatherReal for downloading, post-processing, and quality control of ISD data. However, its modules can also be used for quality control of observation data from other sources. It includes four launch scripts:

1. `download_ISD.py`, used to download ISD data from the NCEI server;
2. `raw_ISD_to_hourly.py`, used to convert ISD data into hourly data;
3. `station_merging.py`, used to merge data from duplicate stations;
4. `quality_control.py`, used to perform quality control on hourly data.

For the specific workflow, please refer to the WeatherReal paper.
Empty file added quality_control/__init__.py
Empty file.
Empty file.
92 changes: 92 additions & 0 deletions quality_control/algo/cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import numpy as np
from sklearn.cluster import DBSCAN
from .utils import intra_station_check, quality_control_statistics, get_config


CONFIG = get_config()


def _cluster_check(ts, reanalysis, min_samples_ratio, eps_scale, max_std_scale=None, min_num=None):
"""
Perform a cluster-based check on time series data compared to reanalysis data.

This function uses DBSCAN (Density-Based Spatial Clustering of Applications with Noise)
to identify clusters in the difference between the time series and reanalysis data.
It then flags outliers based on cluster membership.

Parameters:
-----------
ts : array-like
The time series data to be checked.
reanalysis : array-like
The corresponding reanalysis data for comparison.
min_samples_ratio : float
The ratio of minimum samples required to form a cluster in DBSCAN.
eps_scale : float
The scale factor for epsilon in DBSCAN, relative to the standard deviation of reanalysis.
max_std_scale : float, optional
If the ratio of standard deviations between ts and reanalysis is less than max_std_scale,
the check is not performed
min_num : int, optional
Minimum number of valid data points required to perform the check.

Returns:
--------
flag : np.ndarray
1D array with the same length as ts, containing flags
"""
flag = np.full(len(ts), CONFIG["flag_missing"], dtype=np.int8)
isnan = np.isnan(ts)
flag[~isnan] = CONFIG["flag_normal"]
both_valid = (~isnan) & (~np.isnan(reanalysis))

if min_num is not None and both_valid.sum() < min_num:
return flag

if max_std_scale is not None:
if np.std(ts[both_valid]) / np.std(reanalysis[both_valid]) <= max_std_scale:
return flag

indices = np.argwhere(both_valid).flatten()
values1 = ts[indices]
values2 = reanalysis[indices]

cluster = DBSCAN(min_samples=int(indices.size * min_samples_ratio), eps=np.std(reanalysis) * eps_scale)
labels = cluster.fit((values1 - values2).reshape(-1, 1)).labels_

# If all data points except noise are in the same cluster, just remove the noise
if np.max(labels) <= 0:
indices_outliers = indices[np.argwhere(labels == -1).flatten()]
flag[indices_outliers] = CONFIG["flag_error"]
# If there is more than one cluster, select the cluster nearest to the reanalysis
else:
cluster_labels = np.unique(labels)
cluster_labels = list(cluster_labels >= 0)
best_cluster = 0
min_median = np.inf
for label in cluster_labels:
indices_cluster = indices[np.argwhere(labels == label).flatten()]
median = np.median(ts[indices_cluster] - reanalysis[indices_cluster])
if np.abs(median) < min_median:
best_cluster = label
min_median = np.abs(median)
indices_outliers = indices[np.argwhere(labels != best_cluster).flatten()]
flag[indices_outliers] = CONFIG["flag_error"]

return flag


def run(da, reanalysis, varname):
"""
To ensure the accuracy of the parameters in the distributional gap method, a DBSCAN is used first,
so that the median and MAD used for filtering are only calculated by the normal data
"""
flag = intra_station_check(
da,
reanalysis,
qc_func=_cluster_check,
input_core_dims=[["time"], ["time"]],
kwargs=CONFIG["cluster"][varname],
)
quality_control_statistics(da, flag)
return flag.rename("cluster")
189 changes: 189 additions & 0 deletions quality_control/algo/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
############################################################
# The flag values for the data quality check
############################################################

flag_error: 2
flag_suspect: 1
flag_normal: 0
flag_missing: -1

############################################################
# The parameters for the data quality check
############################################################

# record extreme check
record:
t:
upper: 57.8
lower: -89.2
td:
upper: 57.8
lower: -100.0
ws:
upper: 113.2
lower: 0
wd:
upper: 360
lower: 0
sp:
upper: 1100.0
lower: 300.0
msl:
upper: 1083.3
lower: 870.0
c:
upper: 8
lower: 0
ra1:
upper: 305.0
lower: 0
ra3:
upper: 915.0
lower: 0
ra6:
upper: 1144.0
lower: 0
ra12:
upper: 1144.0
lower: 0
ra24:
upper: 1825.0
lower: 0

# Persistence check
persistence:
defaults: &persistence_defaults
min_num: 24
max_window: 72
min_var: 0.1
error_length: 72
t:
<<: *persistence_defaults
td:
<<: *persistence_defaults
sp:
<<: *persistence_defaults
msl:
<<: *persistence_defaults
ws:
<<: *persistence_defaults
exclude_value: 0
wd:
<<: *persistence_defaults
exclude_value: 0
c:
<<: *persistence_defaults
exclude_value:
- 0
- 8
ra1:
<<: *persistence_defaults
exclude_value: 0
ra3:
<<: *persistence_defaults
exclude_value: 0
ra6:
<<: *persistence_defaults
exclude_value: 0
ra12:
<<: *persistence_defaults
exclude_value: 0
ra24:
<<: *persistence_defaults
exclude_value: 0

# Spike check
spike:
t:
max_change:
- 6
- 8
- 10
td:
max_change:
- 5
- 7
- 9
sp:
max_change:
- 3
- 5
- 7
msl:
max_change:
- 3
- 5
- 7

# Distributional gap check with ERA5
distribution:
defaults: &distribution_defaults
shift_step: 1
gap_scale: 2
default_mad: 1
suspect_std_scale: 2.72
min_num: 365
t:
<<: *distribution_defaults
td:
<<: *distribution_defaults
sp:
<<: *distribution_defaults
default_mad: 0.5
msl:
<<: *distribution_defaults
default_mad: 0.5

# Cluster check
cluster:
defaults: &cluster_defaults
min_samples_ratio: 0.1
eps_scale: 2
max_std_scale: 2
min_num: 365
t:
<<: *cluster_defaults

td:
<<: *cluster_defaults
sp:
<<: *cluster_defaults
msl:
<<: *cluster_defaults

# Neighbouring station check
neighbouring:
defaults: &neighbouring_defaults
<<: *distribution_defaults
max_dist: 300
max_elev_diff: 500
min_data_overlap: 0.3
t:
<<: *neighbouring_defaults
td:
<<: *neighbouring_defaults
sp:
<<: *neighbouring_defaults
default_mad: 0.5
msl:
<<: *neighbouring_defaults
default_mad: 0.5

# Flag refinement
refinement:
t:
check_monotonic: true
check_ridge_trough: false
td:
check_monotonic: true
check_ridge_trough: false
sp:
check_monotonic: true
check_ridge_trough: true
msl:
check_monotonic: true
check_ridge_trough: true

diurnal:
t:
max_bias: 0.5
58 changes: 58 additions & 0 deletions quality_control/algo/cross_variable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import numpy as np
import xarray as xr
from .utils import get_config


CONFIG = get_config()


def supersaturation(t, td):
flag = xr.where(t < td, CONFIG["flag_error"], CONFIG["flag_normal"])
isnan = np.isnan(t) | np.isnan(td)
flag = flag.where(~isnan, CONFIG["flag_missing"])
return flag.astype(np.int8)


def wind_consistency(ws, wd):
zero_ws = ws == 0
zero_wd = wd == 0
inconsistent = zero_ws != zero_wd
flag = xr.where(inconsistent, CONFIG["flag_error"], CONFIG["flag_normal"])
isnan = np.isnan(ws) | np.isnan(wd)
flag = flag.where(~isnan, CONFIG["flag_missing"])
return flag.astype(np.int8)


def ra_consistency(ds):
"""
Precipitation in different period length is cross validated
If there is a conflict (ra1 at 18:00 is 5mm, while ra3 at 19:00 is 0mm),
both of them are flagged
Parameters:
-----------
ds: xarray dataset with precipitation data including ra1, ra3, ra6, ra12 and ra24
Returns:
--------
flag : numpy.ndarray
1D array with the same length as ts, containing flags for each value.
"""
flag = xr.full_like(ds, CONFIG["flag_normal"], dtype=np.int8)
periods = [3, 6, 12, 24]
for period in periods:
da_longer = ds[f"ra{period}"]
for shift in range(1, period):
# Shift the precipitation in the longer period to align with the shorter period
shifted = da_longer.roll(time=-shift)
shifted[:, -shift:] = np.nan
for target_period in [1, 3, 6, 12, 24]:
# Check if the two periods are overlapping
if target_period >= period or target_period + shift > period:
continue
# If the precipitation in the shorter period is larger than the longer period, flag both
flag_indices = np.where(ds[f"ra{target_period}"].values - shifted.values > 0.101)
if len(flag_indices[0]) == 0:
continue
flag[f"ra{target_period}"].values[flag_indices] = 1
flag[f"ra{period}"].values[(flag_indices[0], flag_indices[1]+shift)] = 1
flag = flag.where(ds.notnull(), CONFIG["flag_missing"])
return flag
Loading
Loading