Skip to content

Commit

Permalink
Merge pull request #7 from microsoft/add_qc_modules
Browse files Browse the repository at this point in the history
Add quality control modules and scripts for ISD
  • Loading branch information
jinwx authored Sep 27, 2024
2 parents 9d7954a + de9cbbf commit 038a6de
Show file tree
Hide file tree
Showing 21 changed files with 2,519 additions and 1 deletion.
4 changes: 3 additions & 1 deletion pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ disable=
too-many-branches,
too-many-statements,
too-many-return-statements,
too-many-instance-attributes
too-many-instance-attributes,
too-many-positional-arguments,
duplicate-code
8 changes: 8 additions & 0 deletions quality_control/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
The quality control code in this directory is written in Python and is used by WeatherReal for downloading, post-processing, and quality control of ISD data. However, its modules can also be used for quality control of observation data from other sources. It includes four launch scripts:

1. `download_ISD.py`, used to download ISD data from the NCEI server;
2. `raw_ISD_to_hourly.py`, used to convert ISD data into hourly data;
3. `station_merging.py`, used to merge data from duplicate stations;
4. `quality_control.py`, used to perform quality control on hourly data.

For the specific workflow, please refer to the WeatherReal paper.
Empty file added quality_control/__init__.py
Empty file.
Empty file.
92 changes: 92 additions & 0 deletions quality_control/algo/cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import numpy as np
from sklearn.cluster import DBSCAN
from .utils import intra_station_check, quality_control_statistics, get_config


CONFIG = get_config()


def _cluster_check(ts, reanalysis, min_samples_ratio, eps_scale, max_std_scale=None, min_num=None):
"""
Perform a cluster-based check on time series data compared to reanalysis data.
This function uses DBSCAN (Density-Based Spatial Clustering of Applications with Noise)
to identify clusters in the difference between the time series and reanalysis data.
It then flags outliers based on cluster membership.
Parameters:
-----------
ts : array-like
The time series data to be checked.
reanalysis : array-like
The corresponding reanalysis data for comparison.
min_samples_ratio : float
The ratio of minimum samples required to form a cluster in DBSCAN.
eps_scale : float
The scale factor for epsilon in DBSCAN, relative to the standard deviation of reanalysis.
max_std_scale : float, optional
If the ratio of standard deviations between ts and reanalysis is less than max_std_scale,
the check is not performed
min_num : int, optional
Minimum number of valid data points required to perform the check.
Returns:
--------
flag : np.ndarray
1D array with the same length as ts, containing flags
"""
flag = np.full(len(ts), CONFIG["flag_missing"], dtype=np.int8)
isnan = np.isnan(ts)
flag[~isnan] = CONFIG["flag_normal"]
both_valid = (~isnan) & (~np.isnan(reanalysis))

if min_num is not None and both_valid.sum() < min_num:
return flag

if max_std_scale is not None:
if np.std(ts[both_valid]) / np.std(reanalysis[both_valid]) <= max_std_scale:
return flag

indices = np.argwhere(both_valid).flatten()
values1 = ts[indices]
values2 = reanalysis[indices]

cluster = DBSCAN(min_samples=int(indices.size * min_samples_ratio), eps=np.std(reanalysis) * eps_scale)
labels = cluster.fit((values1 - values2).reshape(-1, 1)).labels_

# If all data points except noise are in the same cluster, just remove the noise
if np.max(labels) <= 0:
indices_outliers = indices[np.argwhere(labels == -1).flatten()]
flag[indices_outliers] = CONFIG["flag_error"]
# If there is more than one cluster, select the cluster nearest to the reanalysis
else:
cluster_labels = np.unique(labels)
cluster_labels = list(cluster_labels >= 0)
best_cluster = 0
min_median = np.inf
for label in cluster_labels:
indices_cluster = indices[np.argwhere(labels == label).flatten()]
median = np.median(ts[indices_cluster] - reanalysis[indices_cluster])
if np.abs(median) < min_median:
best_cluster = label
min_median = np.abs(median)
indices_outliers = indices[np.argwhere(labels != best_cluster).flatten()]
flag[indices_outliers] = CONFIG["flag_error"]

return flag


def run(da, reanalysis, varname):
"""
To ensure the accuracy of the parameters in the distributional gap method, a DBSCAN is used first,
so that the median and MAD used for filtering are only calculated by the normal data
"""
flag = intra_station_check(
da,
reanalysis,
qc_func=_cluster_check,
input_core_dims=[["time"], ["time"]],
kwargs=CONFIG["cluster"][varname],
)
quality_control_statistics(da, flag)
return flag.rename("cluster")
189 changes: 189 additions & 0 deletions quality_control/algo/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
############################################################
# The flag values for the data quality check
############################################################

flag_error: 2
flag_suspect: 1
flag_normal: 0
flag_missing: -1

############################################################
# The parameters for the data quality check
############################################################

# record extreme check
record:
t:
upper: 57.8
lower: -89.2
td:
upper: 57.8
lower: -100.0
ws:
upper: 113.2
lower: 0
wd:
upper: 360
lower: 0
sp:
upper: 1100.0
lower: 300.0
msl:
upper: 1083.3
lower: 870.0
c:
upper: 8
lower: 0
ra1:
upper: 305.0
lower: 0
ra3:
upper: 915.0
lower: 0
ra6:
upper: 1144.0
lower: 0
ra12:
upper: 1144.0
lower: 0
ra24:
upper: 1825.0
lower: 0

# Persistence check
persistence:
defaults: &persistence_defaults
min_num: 24
max_window: 72
min_var: 0.1
error_length: 72
t:
<<: *persistence_defaults
td:
<<: *persistence_defaults
sp:
<<: *persistence_defaults
msl:
<<: *persistence_defaults
ws:
<<: *persistence_defaults
exclude_value: 0
wd:
<<: *persistence_defaults
exclude_value: 0
c:
<<: *persistence_defaults
exclude_value:
- 0
- 8
ra1:
<<: *persistence_defaults
exclude_value: 0
ra3:
<<: *persistence_defaults
exclude_value: 0
ra6:
<<: *persistence_defaults
exclude_value: 0
ra12:
<<: *persistence_defaults
exclude_value: 0
ra24:
<<: *persistence_defaults
exclude_value: 0

# Spike check
spike:
t:
max_change:
- 6
- 8
- 10
td:
max_change:
- 5
- 7
- 9
sp:
max_change:
- 3
- 5
- 7
msl:
max_change:
- 3
- 5
- 7

# Distributional gap check with ERA5
distribution:
defaults: &distribution_defaults
shift_step: 1
gap_scale: 2
default_mad: 1
suspect_std_scale: 2.72
min_num: 365
t:
<<: *distribution_defaults
td:
<<: *distribution_defaults
sp:
<<: *distribution_defaults
default_mad: 0.5
msl:
<<: *distribution_defaults
default_mad: 0.5

# Cluster check
cluster:
defaults: &cluster_defaults
min_samples_ratio: 0.1
eps_scale: 2
max_std_scale: 2
min_num: 365
t:
<<: *cluster_defaults

td:
<<: *cluster_defaults
sp:
<<: *cluster_defaults
msl:
<<: *cluster_defaults

# Neighbouring station check
neighbouring:
defaults: &neighbouring_defaults
<<: *distribution_defaults
max_dist: 300
max_elev_diff: 500
min_data_overlap: 0.3
t:
<<: *neighbouring_defaults
td:
<<: *neighbouring_defaults
sp:
<<: *neighbouring_defaults
default_mad: 0.5
msl:
<<: *neighbouring_defaults
default_mad: 0.5

# Flag refinement
refinement:
t:
check_monotonic: true
check_ridge_trough: false
td:
check_monotonic: true
check_ridge_trough: false
sp:
check_monotonic: true
check_ridge_trough: true
msl:
check_monotonic: true
check_ridge_trough: true

diurnal:
t:
max_bias: 0.5
58 changes: 58 additions & 0 deletions quality_control/algo/cross_variable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import numpy as np
import xarray as xr
from .utils import get_config


CONFIG = get_config()


def supersaturation(t, td):
flag = xr.where(t < td, CONFIG["flag_error"], CONFIG["flag_normal"])
isnan = np.isnan(t) | np.isnan(td)
flag = flag.where(~isnan, CONFIG["flag_missing"])
return flag.astype(np.int8)


def wind_consistency(ws, wd):
zero_ws = ws == 0
zero_wd = wd == 0
inconsistent = zero_ws != zero_wd
flag = xr.where(inconsistent, CONFIG["flag_error"], CONFIG["flag_normal"])
isnan = np.isnan(ws) | np.isnan(wd)
flag = flag.where(~isnan, CONFIG["flag_missing"])
return flag.astype(np.int8)


def ra_consistency(ds):
"""
Precipitation in different period length is cross validated
If there is a conflict (ra1 at 18:00 is 5mm, while ra3 at 19:00 is 0mm),
both of them are flagged
Parameters:
-----------
ds: xarray dataset with precipitation data including ra1, ra3, ra6, ra12 and ra24
Returns:
--------
flag : numpy.ndarray
1D array with the same length as ts, containing flags for each value.
"""
flag = xr.full_like(ds, CONFIG["flag_normal"], dtype=np.int8)
periods = [3, 6, 12, 24]
for period in periods:
da_longer = ds[f"ra{period}"]
for shift in range(1, period):
# Shift the precipitation in the longer period to align with the shorter period
shifted = da_longer.roll(time=-shift)
shifted[:, -shift:] = np.nan
for target_period in [1, 3, 6, 12, 24]:
# Check if the two periods are overlapping
if target_period >= period or target_period + shift > period:
continue
# If the precipitation in the shorter period is larger than the longer period, flag both
flag_indices = np.where(ds[f"ra{target_period}"].values - shifted.values > 0.101)
if len(flag_indices[0]) == 0:
continue
flag[f"ra{target_period}"].values[flag_indices] = 1
flag[f"ra{period}"].values[(flag_indices[0], flag_indices[1]+shift)] = 1
flag = flag.where(ds.notnull(), CONFIG["flag_missing"])
return flag
Loading

0 comments on commit 038a6de

Please sign in to comment.