Skip to content

Commit

Permalink
Merge pull request #418 from euroargodev/fix-for-bad-STATION_PARAMETERS
Browse files Browse the repository at this point in the history
Fix bug with GDAC data source and BGC dataset
  • Loading branch information
gmaze authored Dec 12, 2024
2 parents 2b69c06 + 8420462 commit 3a876a6
Show file tree
Hide file tree
Showing 7 changed files with 170 additions and 31 deletions.
4 changes: 4 additions & 0 deletions argopy/data_fetchers/gdac_data_processors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
import numpy as np
import xarray as xr
import logging


log = logging.getLogger("argopy.gdac.data")


def pre_process_multiprof(
Expand Down
45 changes: 36 additions & 9 deletions argopy/extensions/params_data_mode.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
import copy

from ..utils import to_list, list_core_parameters
from ..utils.transform import split_data_mode, merge_param_with_param_adjusted, filter_param_by_data_mode
from ..utils.transform import (
split_data_mode,
merge_param_with_param_adjusted,
filter_param_by_data_mode,
)
from ..stores import (
indexstore_pd as ArgoIndex,
) # make sure we work with a Pandas index store
Expand Down Expand Up @@ -43,10 +47,12 @@ class ParamsDataMode(ArgoAccessorExtension):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def compute(self, indexfs: Union[None, ArgoIndex]) -> xr.Dataset: # noqa: C901
"""Compute and add <PARAM>_DATA_MODE variables to a xarray dataset
def _compute_from_ArgoIndex(
self, indexfs: Union[None, ArgoIndex]
) -> xr.Dataset: # noqa: C901
"""Compute <PARAM>_DATA_MODE variables from ArgoIndex
This method consume a collection of points.
This method consumes a collection of points.
Parameters
----------
Expand All @@ -55,9 +61,9 @@ def compute(self, indexfs: Union[None, ArgoIndex]) -> xr.Dataset: # noqa: C901
Returns
-------
:class:`xr.Dataset`
:class:`xarray.Dataset`
"""
idx = copy.copy(indexfs) if isinstance(indexfs, ArgoIndex) else ArgoIndex()
idx = indexfs.copy(deep=True) if isinstance(indexfs, ArgoIndex) else ArgoIndex()

def complete_df(this_df, params):
"""Add 'wmo', 'cyc' and '<param>_data_mode' columns to this dataframe"""
Expand Down Expand Up @@ -103,6 +109,7 @@ def print_etime(txt, t0):

profiles = self._argo.list_WMO_CYC
idx.search_wmo(self._argo.list_WMO)

params = [
p
for p in idx.read_params()
Expand Down Expand Up @@ -168,10 +175,30 @@ def print_etime(txt, t0):
self._obj = self._obj[np.sort(self._obj.data_vars)]
return self._obj

def split(self):
def compute(self, indexfs: Union[None, ArgoIndex]) -> xr.Dataset:
"""Compute <PARAM>_DATA_MODE variables"""
if "STATION_PARAMETERS" in self._obj and "PARAMETER_DATA_MODE" in self._obj:
return split_data_mode(self._obj)
else:
return self._compute_from_ArgoIndex(indexfs=indexfs)

def split(self) -> xr.Dataset:
"""Convert PARAMETER_DATA_MODE(N_PROF, N_PARAM) into several <PARAM>_DATA_MODE(N_PROF) variables
Using the list of *PARAM* found in ``STATION_PARAMETERS``, this method will create ``N_PARAM``
new variables in the dataset ``<PARAM>_DATA_MODE(N_PROF)``.
The variable ``PARAMETER_DATA_MODE`` is drop from the dataset at the end of the process.
Returns
-------
:class:`xarray.Dataset`
"""
return split_data_mode(self._obj)

def merge(self, params: Union[str, List[str]] = "all", errors: str = "raise") -> xr.Dataset:
def merge(
self, params: Union[str, List[str]] = "all", errors: str = "raise"
) -> xr.Dataset:
"""Merge <PARAM> and <PARAM>_ADJUSTED variables according to DATA_MODE or <PARAM>_DATA_MODE
Merging is done as follows:
Expand Down Expand Up @@ -251,7 +278,7 @@ def filter(
logical: str = "and",
mask: bool = False,
errors: str = "raise",
):
) -> xr.Dataset:
"""Filter measurements according to parameters data mode
Filter the dataset to keep points where all or some of the parameters are in any of the data mode specified.
Expand Down
2 changes: 2 additions & 0 deletions argopy/fetchers.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,8 @@ def __init__(self, mode: str = "", src: str = "", ds: str = "", **fetcher_kwargs
raise OptionValueError(
"The 'argovis' data source fetching is only available in 'standard' user mode"
)
if self._src == "gdac" and "bgc" in self._dataset_id:
warnings.warn("BGC data support with the 'gdac' data source is still in Work In Progress")

@property
def _icon_user_mode(self):
Expand Down
99 changes: 88 additions & 11 deletions argopy/stores/argo_index_proto.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Argo file index store prototype
"""

import copy
import numpy as np
import pandas as pd
import logging
Expand All @@ -11,6 +13,13 @@
from urllib.parse import urlparse
from typing import Union
from pathlib import Path
import sys


if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self

from ..options import OPTIONS
from ..errors import GdacPathError, S3PathError, InvalidDataset, OptionValueError
Expand Down Expand Up @@ -126,7 +135,7 @@ def __init__(
# Create a File Store to access index file:
self.cache = cache
self.cachedir = OPTIONS["cachedir"] if cachedir == "" else cachedir
timeout = OPTIONS["api_timeout"] if timeout == 0 else timeout
self.timeout = OPTIONS["api_timeout"] if timeout == 0 else timeout
self.fs = {}
if split_protocol(host)[0] is None:
self.fs["src"] = filestore(cache=cache, cachedir=cachedir)
Expand All @@ -153,7 +162,7 @@ def __init__(
port=0 if urlparse(host).port is None else urlparse(host).port,
cache=cache,
cachedir=cachedir,
timeout=timeout,
timeout=self.timeout,
block_size=1000 * (2**20),
)

Expand All @@ -167,7 +176,8 @@ def __init__(
raise S3PathError("This host (%s) is not alive !" % host)

self.fs["src"] = s3store(
cache=cache, cachedir=cachedir,
cache=cache,
cachedir=cachedir,
anon=not has_aws_credentials(),
)
self.skip_rows = 10
Expand Down Expand Up @@ -228,13 +238,15 @@ def __init__(
if self.fs["src"].exists(self.index_path + ".gz"):
self.index_file += ".gz"

if isinstance(self.fs['src'], s3store):
if isinstance(self.fs["src"], s3store):
# If the index host is on a S3 store, we add another file system that will bypass some
# search methods to improve performances.
self.fs["s3"] = get_a_s3index(self.convention)
# Adjust S3 bucket name and key with host and index file names:
self.fs["s3"].bucket_name = Path(split_protocol(self.host)[1]).parts[0]
self.fs["s3"].key = str(Path(*Path(split_protocol(self.host)[1]).parts[1:]) / self.index_file)
self.fs["s3"].key = str(
Path(*Path(split_protocol(self.host)[1]).parts[1:]) / self.index_file
)

# # CNAME internal manager to be able to chain search methods:
# self._cname = None
Expand All @@ -246,8 +258,10 @@ def __repr__(self):
summary.append("Convention: %s (%s)" % (self.convention, self.convention_title))
if hasattr(self, "index"):
summary.append("In memory: True (%i records)" % self.N_RECORDS)
elif 's3' in self.host:
summary.append("In memory: False [But there's no need to load the full index with a S3 host to make a search]")
elif "s3" in self.host:
summary.append(
"In memory: False [But there's no need to load the full index with a S3 host to make a search]"
)
else:
summary.append("In memory: False")

Expand Down Expand Up @@ -417,7 +431,7 @@ def N_RECORDS(self):
# Must work for all internal storage type (:class:`pyarrow.Table` or :class:`pandas.DataFrame`)
if hasattr(self, "index"):
return self.index.shape[0]
elif 's3' in self.host:
elif "s3" in self.host:
return np.Inf
else:
raise InvalidDataset("Load the index first !")
Expand Down Expand Up @@ -474,7 +488,9 @@ def _write(self, fs, path, obj, fmt="pq"):
if fmt == "parquet":
fmt = "pq"
if isinstance(fs, memorystore):
fs.fs.touch(this_path) # Fix for https://github.com/euroargodev/argopy/issues/345
fs.fs.touch(
this_path
) # Fix for https://github.com/euroargodev/argopy/issues/345
# fs.fs.touch(this_path) # Fix for https://github.com/euroargodev/argopy/issues/345
# This is an f* mystery to me, why do we need 2 calls to trigger file creation FOR REAL ????
# log.debug("memorystore touched this path before open context: '%s'" % this_path)
Expand Down Expand Up @@ -606,7 +622,7 @@ def get_filename(s, index):
from ..related import load_dict, mapp_dict

if nrows is not None:
df = df.loc[0: nrows - 1].copy()
df = df.loc[0 : nrows - 1].copy()

if "index" in df:
df.drop("index", axis=1, inplace=True)
Expand Down Expand Up @@ -900,7 +916,9 @@ def search_params(self, PARAMs: Union[str, list], logical: str):
raise NotImplementedError("Not implemented")

@abstractmethod
def search_parameter_data_mode(self, PARAMs: dict, logical: bool = 'and', nrows=None):
def search_parameter_data_mode(
self, PARAMs: dict, logical: bool = "and", nrows=None
):
"""Search index for profiles with a parameter in a specific data mode
Parameters
Expand Down Expand Up @@ -994,3 +1012,62 @@ def _insert_header(self, originalfile):
f.write(data)

return originalfile

def _copy(
self,
deep: bool = True,
) -> Self:
cls = self.__class__

if deep:
# Ensure complete independence between the original and the copied index:
obj = cls.__new__(cls)
obj.__init__(
host=copy.deepcopy(self.host),
index_file=copy.deepcopy(self.index_file),
timeout=copy.deepcopy(self.timeout),
cache=copy.deepcopy(self.cache),
cachedir=copy.deepcopy(self.cachedir),
)
if hasattr(self, "index"):
obj._nrows_index = copy.deepcopy(self._nrows_index)
obj.index = copy.deepcopy(self.index)
if self.cache:
obj.index_path_cache = copy.deepcopy(self.index_path_cache)

else:
obj = cls.__new__(cls)
obj.__init__(
host=copy.copy(self.host),
index_file=copy.copy(self.index_file),
timeout=copy.copy(self.timeout),
cache=copy.copy(self.cache),
cachedir=copy.copy(self.cachedir),
)
if hasattr(self, "index"):
obj._nrows_index = copy.copy(self._nrows_index)
obj.index = copy.copy(self.index)
if self.cache:
obj.index_path_cache = copy.copy(self.index_path_cache)

if hasattr(self, "search"):
obj.search_type = copy.copy(self.search_type)
obj.search_filter = copy.copy(self.search_filter)
obj.search = copy.copy(self.search)
if obj.cache:
obj.search_path_cache = copy.copy(self.search_path_cache)

return obj

def __copy__(self) -> Self:
return self._copy(deep=False)

def __deepcopy__(self) -> Self:
return self._copy(deep=True)

def copy(
self,
deep: bool = True,
) -> Self:
"""Returns a copy of this object."""
return self._copy(deep=deep)
44 changes: 35 additions & 9 deletions argopy/utils/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import numpy as np
import xarray as xr
import pandas as pd
import logging
from typing import List, Union

Expand Down Expand Up @@ -340,6 +341,7 @@ def filter_param_by_data_mode(
return ds.loc[dict(N_POINTS=filter)] if len(filter) > 0 else ds



def split_data_mode(ds: xr.Dataset) -> xr.Dataset:
"""Convert PARAMETER_DATA_MODE(N_PROF, N_PARAM) into several <PARAM>_DATA_MODE(N_PROF) variables
Expand All @@ -352,24 +354,48 @@ def split_data_mode(ds: xr.Dataset) -> xr.Dataset:
-------
:class:`xr.Dataset`
"""
if ds.argo._type != "profile":
raise InvalidDatasetStructure(
"Method only available to a collection of profiles"
)

if "STATION_PARAMETERS" in ds and "PARAMETER_DATA_MODE" in ds:

u64 = lambda s: "%s%s" % (s, " " * (64 - len(s))) # noqa: E731
params = [p.strip() for p in np.unique(ds["STATION_PARAMETERS"])]

def read_data_mode_for(ds: xr.Dataset, param: str) -> xr.DataArray:
"""Return data mode of a given parameter"""
da_masked = ds['PARAMETER_DATA_MODE'].where(ds['STATION_PARAMETERS'] == u64(param))

def _dropna(x):
# x('N_PARAM') is reduced to the first non nan value, a scalar, no dimension
y = pd.Series(x).dropna().tolist()
if len(y) == 0:
return ""
else:
return y[0]

kwargs = dict(
dask="parallelized",
input_core_dims=[["N_PARAM"]], # Function takes N_PARAM as input
output_core_dims=[[]], # Function reduces to a scalar (no dimension)
vectorize=True # Apply function element-wise along the other dimensions
)

dm = xr.apply_ufunc(_dropna, da_masked, **kwargs)
dm = dm.rename("%s_DATA_MODE" % param)
dm.attrs = ds['PARAMETER_DATA_MODE'].attrs
return dm

for param in params:
name = "%s_DATA_MODE" % param.replace("_PARAMETER", "").replace(
"PARAMETER_", ""
)
mask = ds["STATION_PARAMETERS"] == xr.full_like(
ds["STATION_PARAMETERS"],
u64(param),
dtype=ds["STATION_PARAMETERS"].dtype,
)
da = ds["PARAMETER_DATA_MODE"].where(mask, drop=True).isel(N_PARAM=0)
da = da.rename(name)
da = da.astype(ds["PARAMETER_DATA_MODE"].dtype)
ds[name] = da
if name == "_DATA_MODE":
log.error("This dataset has an error in 'STATION_PARAMETERS': it contains an empty string")
else:
ds[name] = read_data_mode_for(ds, param)

ds = ds.drop_vars("PARAMETER_DATA_MODE")
ds.argo.add_history("Transformed with 'split_data_mode'")
Expand Down
6 changes: 4 additions & 2 deletions argopy/xarray.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ def point2profile(self, drop: bool = False) -> xr.Dataset: # noqa: C901
Returns
-------
:class:`xr.dataset`
:class:`xr.Dataset`
See Also
--------
Expand Down Expand Up @@ -563,9 +563,11 @@ def profile2point(self) -> xr.Dataset:
- A "point" is a location with unique (N_PROF, N_LEVELS) indexes
- A "profile" is a collection of points with an unique UID based on WMO, CYCLE_NUMBER and DIRECTION
Note that this method will systematically apply the :meth:`datamode.split` method.
Returns
-------
:class:`xr.dataset`
:class:`xr.Dataset`
Warnings
--------
Expand Down
1 change: 1 addition & 0 deletions docs/api.rst
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ Data Transformation
Dataset.argo.interp_std_levels
Dataset.argo.groupby_pressure_bins
Dataset.argo.datamode.merge
Dataset.argo.datamode.split


Data Filters
Expand Down

0 comments on commit 3a876a6

Please sign in to comment.