From dd6490767363c055772e9f6fc1097e7e3e8cfa62 Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Fri, 14 Apr 2023 13:53:33 +0200 Subject: [PATCH 1/5] Update whats-new.rst --- docs/whats-new.rst | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/docs/whats-new.rst b/docs/whats-new.rst index 3b7b7155..9124cdd1 100644 --- a/docs/whats-new.rst +++ b/docs/whats-new.rst @@ -11,6 +11,10 @@ What's New v0.1.14 (XX Xxx. 2023) ---------------------- +**Features and front-end API** + +- **Argopy now provides authenticated access to the Argo reference database for DMQC**. Using login/password new **argopy* options, it is now possible to fetch the `Argo reference database `_, both CTD data and Argo CTD. + **Internals** - Use a mocked server for all http and GDAC ftp requests in CI tests (:pr:`249`, :pr:`252`, :pr:`255`) by `G. Maze `_ From 7e8fb5cb49c3601d711f3b69e26891a788573f92 Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Tue, 25 Apr 2023 15:59:00 +0200 Subject: [PATCH 2/5] New CTDRefDataFetcher ! - also new options 'user' and 'password' - new httpstores: httpstore_erddap, httpstore_erddap_auth - black8 erddap_data - fix bug in erddap_data used with ds='ref' --- argopy/__init__.py | 2 + argopy/data_fetchers/__init__.py | 18 +- argopy/data_fetchers/erddap_data.py | 173 ++++++++++-------- argopy/data_fetchers/erddap_refdata.py | 207 ++++++++++++++++++++++ argopy/errors.py | 11 ++ argopy/options.py | 9 +- argopy/stores/__init__.py | 4 +- argopy/stores/filesystems.py | 112 +++++++++++- argopy/tests/test_fetchers_data_erddap.py | 1 + 9 files changed, 457 insertions(+), 80 deletions(-) create mode 100644 argopy/data_fetchers/erddap_refdata.py diff --git a/argopy/__init__.py b/argopy/__init__.py index 13255881..a4704538 100644 --- a/argopy/__init__.py +++ b/argopy/__init__.py @@ -32,6 +32,7 @@ from .utilities import TopoFetcher, ArgoNVSReferenceTables, OceanOPSDeployments # noqa: E402 from .utilities import monitor_status as status # noqa: E402 from .options import set_options # noqa: E402 +from .data_fetchers import CTDRefDataFetcher # @@ -53,6 +54,7 @@ "TopoFetcher", # Class "ArgoNVSReferenceTables", # Class "OceanOPSDeployments", # Class + "CTDRefDataFetcher", # Submodules: "utilities", diff --git a/argopy/data_fetchers/__init__.py b/argopy/data_fetchers/__init__.py index 41d08364..d88e27e6 100644 --- a/argopy/data_fetchers/__init__.py +++ b/argopy/data_fetchers/__init__.py @@ -1,5 +1,21 @@ """ This package contains implementations for data and index fetchers for specific data sources. -These fetchers are meant to be used and discovered automatically by the facades (in fetchers.py) +Most of these fetchers are meant to be used and discovered automatically by the facades (in fetchers.py) and by utilities functions list_available_data_src() and list_available_index_src() """ + +from .erddap_refdata import Fetch_box as CTDRefDataFetcher +from . import erddap_data +from . import erddap_index +from . import argovis_data +from . import gdacftp_data +from . import gdacftp_index + +__all__ = ( + "erddap_data", + "erddap_index", + "argovis_data", + "gdacftp_data", + "gdacftp_index", + "CTDRefDataFetcher", +) diff --git a/argopy/data_fetchers/erddap_data.py b/argopy/data_fetchers/erddap_data.py index 682f96fe..36789b2c 100644 --- a/argopy/data_fetchers/erddap_data.py +++ b/argopy/data_fetchers/erddap_data.py @@ -9,7 +9,7 @@ This is not intended to be used directly, only by the facade at fetchers.py """ - +import xarray as xr import pandas as pd import numpy as np import copy @@ -17,7 +17,6 @@ from abc import abstractmethod import getpass -import argopy from .proto import ArgoDataFetcherProto from argopy.options import OPTIONS from argopy.utilities import list_standard_variables, Chunker, format_oneline @@ -35,24 +34,27 @@ from erddapy.erddapy import ERDDAP from erddapy.erddapy import _quote_string_constraints as quote_string_constraints from erddapy.erddapy import parse_dates + # Soon ! https://github.com/ioos/erddapy log = logging.getLogger("argopy.erddap.data") -access_points = ['wmo', 'box'] -exit_formats = ['xarray'] -dataset_ids = ['phy', 'ref', 'bgc'] # First is default -api_server = OPTIONS['erddap'] # API root url -api_server_check = OPTIONS['erddap'] + '/info/ArgoFloats/index.json' # URL to check if the API is alive +access_points = ["wmo", "box"] +exit_formats = ["xarray"] +dataset_ids = ["phy", "ref", "bgc"] # First is default +api_server = OPTIONS["erddap"] # API root url +api_server_check = ( + OPTIONS["erddap"] + "/info/ArgoFloats/index.json" +) # URL to check if the API is alive class ErddapArgoDataFetcher(ArgoDataFetcherProto): - """ Manage access to Argo data through Ifremer ERDDAP + """Manage access to Argo data through Ifremer ERDDAP - ERDDAP transaction are managed with the erddapy library + ERDDAP transaction are managed with the erddapy library - This class is a prototype not meant to be instantiated directly + This class is a prototype not meant to be instantiated directly """ @@ -61,18 +63,20 @@ class ErddapArgoDataFetcher(ArgoDataFetcherProto): ### @abstractmethod def init(self, *args, **kwargs): - """ Initialisation for a specific fetcher """ + """Initialisation for a specific fetcher""" raise NotImplementedError("ErddapArgoDataFetcher.init not implemented") @abstractmethod def define_constraints(self): - """ Define erddapy constraints """ - raise NotImplementedError("ErddapArgoDataFetcher.define_constraints not implemented") + """Define erddapy constraints""" + raise NotImplementedError( + "ErddapArgoDataFetcher.define_constraints not implemented" + ) @property @abstractmethod def uri(self) -> list: - """ Return the list of Unique Resource Identifier (URI) to download data """ + """Return the list of Unique Resource Identifier (URI) to download data""" raise NotImplementedError("ErddapArgoDataFetcher.uri not implemented") ### @@ -92,7 +96,7 @@ def __init__( api_timeout: int = 0, **kwargs, ): - """ Instantiate an ERDDAP Argo data fetcher + """Instantiate an ERDDAP Argo data fetcher Parameters ---------- @@ -119,10 +123,11 @@ def __init__( Erddap request time out in seconds. Set to OPTIONS['api_timeout'] by default. """ timeout = OPTIONS["api_timeout"] if api_timeout == 0 else api_timeout - self.fs = kwargs['fs'] if 'fs' in kwargs else httpstore(cache=cache, cachedir=cachedir, timeout=timeout, size_policy='head') self.definition = "Ifremer erddap Argo data fetcher" self.dataset_id = OPTIONS["dataset"] if ds == "" else ds - self.server = kwargs['server'] if 'server' in kwargs else OPTIONS['erddap'] + self.server = kwargs["server"] if "server" in kwargs else OPTIONS["erddap"] + self.store_opts = {'cache': cache, 'cachedir': cachedir, 'timeout': timeout, 'size_policy': 'head'} + self.fs = kwargs['fs'] if 'fs' in kwargs else httpstore(**self.store_opts) if not isinstance(parallel, bool): parallel_method = parallel @@ -155,14 +160,14 @@ def server(self): @server.setter def server(self, value): self._server = value - if hasattr(self, 'erddap') and self.erddap.server != value: + if hasattr(self, "erddap") and self.erddap.server != value: log.debug("The erddap server has been modified, updating internal data") self._init_erddapy() def _add_attributes(self, this): # noqa: C901 - """ Add variables attributes not return by erddap requests (csv) + """Add variables attributes not return by erddap requests (csv) - This is hard coded, but should be retrieved from an API somewhere + This is hard coded, but should be retrieved from an API somewhere """ for v in this.data_vars: if "TEMP" in v and "_QC" not in v: @@ -282,7 +287,7 @@ def _init_erddapy(self): @property def _minimal_vlist(self): - """ Return the minimal list of variables to retrieve measurements for """ + """Return the minimal list of variables to retrieve measurements for""" vlist = list() if self.dataset_id == "phy": plist = [ @@ -322,21 +327,24 @@ def _minimal_vlist(self): ] [vlist.append(p) for p in plist] - plist = ["pres", "temp", "psal", - "cndc", - "doxy", - "beta_backscattering", - "fluorescence_chla", - # "fluorescence_cdom", - # "side_scattering_turbidity", - # "transmittance_particle_beam_attenuation", - "bbp", - "turbidity", - "cp", - "chla", - "cdom", - "nitrate", - ] + plist = [ + "pres", + "temp", + "psal", + "cndc", + "doxy", + "beta_backscattering", + "fluorescence_chla", + # "fluorescence_cdom", + # "side_scattering_turbidity", + # "transmittance_particle_beam_attenuation", + "bbp", + "turbidity", + "cp", + "chla", + "cdom", + "nitrate", + ] [vlist.append(p) for p in plist] [vlist.append(p + "_qc") for p in plist] [vlist.append(p + "_adjusted") for p in plist] @@ -352,12 +360,12 @@ def _minimal_vlist(self): return vlist def cname(self): - """ Return a unique string defining the constraints """ + """Return a unique string defining the constraints""" return self._cname() @property def cachepath(self): - """ Return path to cached file(s) for this request + """Return path to cached file(s) for this request Returns ------- @@ -366,7 +374,7 @@ def cachepath(self): return [self.fs.cachepath(uri) for uri in self.uri] def get_url(self): - """ Return the URL to download data requested + """Return the URL to download data requested Returns ------- @@ -407,17 +415,19 @@ def get_url(self): @property def N_POINTS(self) -> int: - """ Number of measurements expected to be returned by a request """ + """Number of measurements expected to be returned by a request""" url = self.get_url().replace("." + self.erddap.response, ".ncHeader") try: ncHeader = str(self.fs.fs.cat_file(url)) lines = [line for line in ncHeader.splitlines() if "row = " in line][0] return int(lines.split("=")[1].split(";")[0]) except Exception: - raise ErddapServerError("Erddap server can't return ncHeader for this url. ") + raise ErddapServerError( + "Erddap server can't return ncHeader for this url. " + ) - def to_xarray(self, errors: str = 'ignore'): # noqa: C901 - """ Load Argo data and return a xarray.DataSet """ + def to_xarray(self, errors: str = "ignore"): # noqa: C901 + """Load Argo data and return a xarray.DataSet""" URI = self.uri # Call it once @@ -440,7 +450,10 @@ def to_xarray(self, errors: str = 'ignore'): # noqa: C901 else: try: ds = self.fs.open_mfdataset( - URI, method=self.parallel_method, progress=self.progress, errors=errors + URI, + method=self.parallel_method, + progress=self.progress, + errors=errors, ) except ClientResponseError as e: raise ErddapServerError(e.message) @@ -458,22 +471,24 @@ def to_xarray(self, errors: str = 'ignore'): # noqa: C901 ds = ds.rename({v: v.upper()}) ds = ds.set_coords(coords) + if self.dataset_id == "ref": + ds["DIRECTION"] = xr.full_like(ds["CYCLE_NUMBER"], "A", dtype=str) + # Cast data types and add variable attributes (not available in the csv download): ds = self._add_attributes(ds) ds = ds.argo.cast_types() - # More convention: - # ds = ds.rename({'pres': 'pressure'}) - # Remove erddap file attributes and replace them with argopy ones: ds.attrs = {} if self.dataset_id == "phy": ds.attrs["DATA_ID"] = "ARGO" + ds.attrs["DOI"] = "http://doi.org/10.17882/42182" elif self.dataset_id == "ref": ds.attrs["DATA_ID"] = "ARGO_Reference" + ds.attrs["DOI"] = "-" elif self.dataset_id == "bgc": ds.attrs["DATA_ID"] = "ARGO-BGC" - ds.attrs["DOI"] = "http://doi.org/10.17882/42182" + ds.attrs["DOI"] = "http://doi.org/10.17882/42182" ds.attrs["Fetched_from"] = self.erddap.server ds.attrs["Fetched_by"] = getpass.getuser() ds.attrs["Fetched_date"] = pd.to_datetime("now", utc=True).strftime("%Y/%m/%d") @@ -507,7 +522,7 @@ def filter_variables(self, ds, mode="standard"): class Fetch_wmo(ErddapArgoDataFetcher): - """ Manage access to Argo data through Ifremer ERDDAP for: a list of WMOs + """Manage access to Argo data through Ifremer ERDDAP for: a list of WMOs This class is instantiated when a call is made to these facade access points: - `ArgoDataFetcher(src='erddap').float(**)` @@ -516,14 +531,14 @@ class Fetch_wmo(ErddapArgoDataFetcher): """ def init(self, WMO=[], CYC=None, **kw): - """ Create Argo data loader for WMOs - - Parameters - ---------- - WMO : list(int) - The list of WMOs to load all Argo data for. - CYC : int, np.array(int), list(int) - The cycle numbers to load. + """Create Argo data loader for WMOs + + Parameters + ---------- + WMO : list(int) + The list of WMOs to load all Argo data for. + CYC : int, np.array(int), list(int) + The cycle numbers to load. """ self.WMO = WMO self.CYC = CYC @@ -541,7 +556,7 @@ def init(self, WMO=[], CYC=None, **kw): return self def define_constraints(self): - """ Define erddap constraints """ + """Define erddap constraints""" self.erddap.constraints = { "platform_number=~": "|".join(["%i" % i for i in self.WMO]) } @@ -553,7 +568,7 @@ def define_constraints(self): @property def uri(self): - """ List of URLs to load for a request + """List of URLs to load for a request Returns ------- @@ -561,7 +576,7 @@ def uri(self): """ if not self.parallel: chunks = "auto" - chunks_maxsize = {'wmo': 5} + chunks_maxsize = {"wmo": 5} else: chunks = self.chunks chunks_maxsize = self.chunks_maxsize @@ -573,26 +588,30 @@ def uri(self): for wmos in wmo_grps: urls.append( Fetch_wmo( - WMO=wmos, CYC=self.CYC, ds=self.dataset_id, parallel=False, fs=self.fs, server=self.server, + WMO=wmos, + CYC=self.CYC, + ds=self.dataset_id, + parallel=False, + fs=self.fs, + server=self.server, ).get_url() ) return urls class Fetch_box(ErddapArgoDataFetcher): - """ Manage access to Argo data through Ifremer ERDDAP for: an ocean rectangle - """ + """Manage access to Argo data through Ifremer ERDDAP for: an ocean rectangle""" def init(self, box: list, **kw): - """ Create Argo data loader - - Parameters - ---------- - box : list(float, float, float, float, float, float, str, str) - The box domain to load all Argo data for: - box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max] - or: - box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max, datim_min, datim_max] + """Create Argo data loader + + Parameters + ---------- + box : list(float, float, float, float, float, float, str, str) + The box domain to load all Argo data for: + box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max] + or: + box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max, datim_min, datim_max] """ self.BOX = box.copy() @@ -606,7 +625,7 @@ def init(self, box: list, **kw): return self def define_constraints(self): - """ Define request constraints """ + """Define request constraints""" self.erddap.constraints = {"longitude>=": self.BOX[0]} self.erddap.constraints.update({"longitude<=": self.BOX[1]}) self.erddap.constraints.update({"latitude>=": self.BOX[2]}) @@ -620,7 +639,7 @@ def define_constraints(self): @property def uri(self): - """ List of files to load for a request + """List of files to load for a request Returns ------- @@ -635,5 +654,9 @@ def uri(self): boxes = self.Chunker.fit_transform() urls = [] for box in boxes: - urls.append(Fetch_box(box=box, ds=self.dataset_id, fs=self.fs, server=self.server).get_url()) + urls.append( + Fetch_box( + box=box, ds=self.dataset_id, fs=self.fs, server=self.server + ).get_url() + ) return urls diff --git a/argopy/data_fetchers/erddap_refdata.py b/argopy/data_fetchers/erddap_refdata.py new file mode 100644 index 00000000..98f48ebd --- /dev/null +++ b/argopy/data_fetchers/erddap_refdata.py @@ -0,0 +1,207 @@ +""" +Fetcher to retrieve CTD reference data from Ifremer erddap +""" +import xarray as xr +from .erddap_data import ErddapArgoDataFetcher +from argopy.options import OPTIONS +from argopy.utilities import Chunker +from argopy.stores import httpstore_erddap_auth +import logging + +# Load erddapy according to available version (breaking changes in v0.8.0) +try: + from erddapy import ERDDAP + from erddapy.utilities import parse_dates, quote_string_constraints +except: # noqa: E722 + # >= v0.8.0 + from erddapy.erddapy import ERDDAP + from erddapy.erddapy import _quote_string_constraints as quote_string_constraints + from erddapy.erddapy import parse_dates + + # Soon ! https://github.com/ioos/erddapy + + +log = logging.getLogger("argopy.erddap.refdata") + +access_points = ["box"] +exit_formats = ["xarray"] +dataset_ids = ["ref-ctd"] # First is default +api_server = OPTIONS["erddap"] # API root url +api_server_check = ( + OPTIONS["erddap"] + "/info/ArgoFloats/index.json" +) # URL to check if the API is alive + + +class ErddapREFDataFetcher(ErddapArgoDataFetcher): + """Manage access to Argo CTD reference data through Ifremer ERDDAP""" + + def __init__(self, **kwargs): + kwargs["ds"] = "ref-ctd" + super().__init__(**kwargs) + kw = kwargs + [ + kw.pop(p) + for p in [ + "ds", + "cache", + "cachedir", + "parallel", + "parallel_method", + "progress", + "chunks", + "chunks_maxsize", + "api_timeout", + "box", + ] + if p in kw + ] + login_page = "%s/login.html" % self.server.rstrip("/") + self.fs = httpstore_erddap_auth( + login=login_page, auto=False, **{**kw, **self.store_opts} + ) + + def __repr__(self): + summary = [super().__repr__()] + summary.append( + "Performances: cache=%s, parallel=%s" + % (str(self.fs.cache), str(self.parallel_method)) + ) + summary.append("User mode: %s" % "expert") + summary.append("Dataset: %s" % self.dataset_id) + return "\n".join(summary) + + def _add_attributes(self, this): # noqa: C901 + """Add variables attributes not return by erddap requests + + This is hard coded, but should be retrieved from an API somewhere + """ + this = super()._add_attributes(this) + + if "DIRECTION" in this.data_vars: + this["DIRECTION"].attrs[ + "comment" + ] = "Set to 'A' for all CTD stations by default" + + if "PLATFORM_NUMBER" in this.data_vars: + this["PLATFORM_NUMBER"].attrs["long_name"] = "Fake unique identifier" + this["PLATFORM_NUMBER"].attrs[ + "comment" + ] = "This was inferred from EXPOCODE and is not a real WMO" + + if "CYCLE_NUMBER" in this.data_vars: + this["CYCLE_NUMBER"].attrs["long_name"] = "Station number" + this["CYCLE_NUMBER"].attrs[ + "comment" + ] = "This was computed using unique TIME for each EXPOCODE" + this["CYCLE_NUMBER"].attrs["convention"] = "-" + + return this + + def _init_erddapy(self): + # Init erddapy + self.erddap = ERDDAP(server=str(self.server), protocol="tabledap") + self.erddap.response = "nc" + self.erddap.dataset_id = "Argo-ref-ctd" + return self + + @property + def _minimal_vlist(self): + """Return the minimal list of variables to retrieve measurements for""" + # vlist = super()._minimal_vlist + vlist = list() + + plist = ["latitude", "longitude", "time"] + [vlist.append(p) for p in plist] + plist = ["pres", "temp", "psal", "ptmp", "source", "qclevel"] + [vlist.append(p) for p in plist] + + return vlist + + def to_xarray(self, errors: str = "ignore"): # noqa: C901 + """Load CTD-Reference data and return a xarray.DataSet""" + + ds = super().to_xarray(errors=errors) + + ds = ds.rename({"SOURCE": "EXPOCODE"}) + ds["DIRECTION"] = xr.full_like(ds["EXPOCODE"], "A", dtype=str) + g = [] + for iplatform, grp in enumerate(ds.groupby("EXPOCODE")): + code, this_ds = grp + for istation, sub_grp in enumerate(this_ds.groupby("TIME")): + sub_grp[-1]["CYCLE_NUMBER"] = xr.full_like( + sub_grp[-1]["TIME"], istation, int + ) + sub_grp[-1]["PLATFORM_NUMBER"] = xr.full_like( + sub_grp[-1]["TIME"], iplatform + 900000, int + ) + g.append(sub_grp[-1]) + ds = xr.concat( + g, dim="N_POINTS", data_vars="minimal", coords="minimal", compat="override" + ) + + ds.attrs["DATA_ID"] = "ARGO_Reference_CTD" + ds.attrs["DOI"] = "-" + + # Cast data types and add variable attributes (not available in the csv download): + ds = self._add_attributes(ds) + ds = ds.argo.cast_types() + + return ds + + +class Fetch_box(ErddapREFDataFetcher): + """Manage access to Argo data through Ifremer ERDDAP for: an ocean rectangle""" + + def init(self, box: list, **kw): + """Create Argo data loader + + Parameters + ---------- + box : list(float, float, float, float, float, float, str, str) + The box domain to load all Argo data for: + box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max] + or: + box = [lon_min, lon_max, lat_min, lat_max, pres_min, pres_max, datim_min, datim_max] + """ + self.BOX = box.copy() + self.definition = ( + "Ifremer erddap Argo CTD-REFERENCE data fetcher for a space/time region" + ) + return self + + def define_constraints(self): + """Define request constraints""" + self.erddap.constraints = {"longitude>=": self.BOX[0]} + self.erddap.constraints.update({"longitude<=": self.BOX[1]}) + self.erddap.constraints.update({"latitude>=": self.BOX[2]}) + self.erddap.constraints.update({"latitude<=": self.BOX[3]}) + self.erddap.constraints.update({"pres>=": self.BOX[4]}) + self.erddap.constraints.update({"pres<=": self.BOX[5]}) + if len(self.BOX) == 8: + self.erddap.constraints.update({"time>=": self.BOX[6]}) + self.erddap.constraints.update({"time<=": self.BOX[7]}) + return None + + @property + def uri(self): + """List of files to load for a request + + Returns + ------- + list(str) + """ + if not self.parallel: + return [self.get_url()] + else: + self.Chunker = Chunker( + {"box": self.BOX}, chunks=self.chunks, chunksize=self.chunks_maxsize + ) + boxes = self.Chunker.fit_transform() + urls = [] + for box in boxes: + urls.append( + Fetch_box( + box=box, ds=self.dataset_id, fs=self.fs, server=self.server + ).get_url() + ) + return urls diff --git a/argopy/errors.py b/argopy/errors.py index 17f32e03..a2f171e7 100644 --- a/argopy/errors.py +++ b/argopy/errors.py @@ -125,3 +125,14 @@ class ArgovisServerError(APIServerError): """Raise this when argopy is disrupted by an error due to the Erddap, not argopy machinery.""" pass + + +class ErddapHTTPUnauthorized(APIServerError): + """Raise when login to erddap fails""" + + pass + +class ErddapHTTPNotFound(APIServerError): + """Raise when erddap ressource is not found""" + + pass \ No newline at end of file diff --git a/argopy/options.py b/argopy/options.py index d95292f3..d7127359 100644 --- a/argopy/options.py +++ b/argopy/options.py @@ -13,6 +13,7 @@ from socket import gaierror from urllib.parse import urlparse + # Define a logger log = logging.getLogger("argopy.options") @@ -26,6 +27,8 @@ API_TIMEOUT = "api_timeout" TRUST_ENV = "trust_env" SERVER = "server" +USER = "user" +PASSWORD = "password" # Define the list of available options and default values: OPTIONS = { @@ -37,7 +40,9 @@ USER_LEVEL: "standard", API_TIMEOUT: 60, TRUST_ENV: False, - SERVER: None + SERVER: None, + USER: None, + PASSWORD: None, } # Define the list of possible values @@ -77,6 +82,8 @@ def validate_http(this_path): API_TIMEOUT: lambda x: isinstance(x, int) and x > 0, TRUST_ENV: lambda x: isinstance(x, bool), SERVER: lambda x: True, + USER: lambda x: isinstance(x, str), + PASSWORD: lambda x: isinstance(x, str), } diff --git a/argopy/stores/__init__.py b/argopy/stores/__init__.py index 1a8ab897..392cef22 100644 --- a/argopy/stores/__init__.py +++ b/argopy/stores/__init__.py @@ -1,5 +1,5 @@ from .argo_index import indexstore, indexfilter_wmo, indexfilter_box -from .filesystems import filestore, httpstore, memorystore, ftpstore +from .filesystems import filestore, httpstore, memorystore, ftpstore, httpstore_erddap, httpstore_erddap_auth from .argo_index_pa import indexstore_pyarrow as indexstore_pa from .argo_index_pd import indexstore_pandas as indexstore_pd @@ -15,6 +15,8 @@ "indexstore_pd", "filestore", "httpstore", + "httpstore_erddap", + "httpstore_erddap_auth", "ftpstore", "memorystore" ) diff --git a/argopy/stores/filesystems.py b/argopy/stores/filesystems.py index dfbade14..8864339c 100644 --- a/argopy/stores/filesystems.py +++ b/argopy/stores/filesystems.py @@ -13,14 +13,14 @@ import logging from packaging import version from typing import Union -from urllib.parse import urlparse +from urllib.parse import urlparse, parse_qs import concurrent.futures import multiprocessing from ..options import OPTIONS from ..errors import FileSystemHasNoCache, CacheFileNotFound, DataNotFound, \ - InvalidMethod + InvalidMethod, ErddapHTTPUnauthorized, ErddapHTTPNotFound from abc import ABC, abstractmethod from ..utilities import Registry, log_argopy_callerstack @@ -1108,3 +1108,111 @@ def drop_variables_not_in_all_datasets(ds_collection): return results else: raise DataNotFound(urls) + + +class httpstore_erddap_auth(httpstore): + + async def get_auth_client(self, **kwargs): + session = aiohttp.ClientSession(**kwargs) + + async with session.post(self._login_page, data=self._login_payload) as resp: + resp_query = dict(parse_qs(urlparse(str(resp.url)).query)) + + if resp.status == 404: + raise ErddapHTTPNotFound( + "Error %s: %s. This erddap server does not support log-in" % (resp.status, resp.reason)) + + elif resp.status == 200: + has_expected = 'message' in resp_query # only available when there is a form page response + if has_expected: + message = resp_query['message'][0] + if 'failed' in message: + raise ErddapHTTPUnauthorized("Error %i: %s (%s)" % (401, message, self._login_payload)) + else: + raise ErddapHTTPUnauthorized("This erddap server does not support log-in with a user/password") + + else: + log.debug('resp.status', resp.status) + log.debug('resp.reason', resp.reason) + log.debug('resp.headers', resp.headers) + log.debug('resp.url', urlparse(str(resp.url))) + log.debug('resp.url.query', resp_query) + data = await resp.read() + log.debug('data', data) + + return session + + def __init__(self, + cache: bool = False, + cachedir: str = "", + login: str = None, + payload: dict = {"user": None, "password": None}, + auto: bool = True, + **kwargs): + + if login is None: + raise ValueError("Invalid login url") + else: + self._login_page = login + + self._login_auto = auto # Should we try to log-in automatically at instanciation ? + + self._login_payload = payload.copy() + if "user" in self._login_payload and self._login_payload['user'] is None: + self._login_payload['user'] = OPTIONS['user'] + if "password" in self._login_payload and self._login_payload['password'] is None: + self._login_payload['password'] = OPTIONS['password'] + + filesystem_kwargs = {**kwargs, **{"get_client": self.get_auth_client}} + super().__init__(cache=cache, cachedir=cachedir, **filesystem_kwargs) + + if auto: + assert isinstance(self.connect(), bool) + + def __repr__(self): + # summary = ["" % id(self)] + summary = [""] + summary.append("login page: %s" % self._login_page) + summary.append("login data: %s" % (self._login_payload)) + if hasattr(self, '_connected'): + summary.append("connected: %s" % (self._connected)) + else: + summary.append("connected: ?") + return "\n".join(summary) + + def connect(self): + try: + log.info("Try to log-in to '%s' page with %s data ..." % (self._login_page, self._login_payload)) + self.fs.info(self._login_page) + self._connected = True + except ErddapHTTPUnauthorized: + self._connected = False + except: #noqa: E722 + raise + return self._connected + + @property + def connected(self): + if not hasattr(self, '_connected'): + self.connect() + return self._connected + + +def httpstore_erddap(url: str = "", + cache: bool = False, + cachedir: str = "", + **kwargs): + + login_page = "%s/login.html" % url.rstrip("/") + login_store = httpstore_erddap_auth(cache=cache, cachedir=cachedir, login=login_page, auto=False, **kwargs) + try: + login_store.connect() + keep = True + except ErddapHTTPNotFound: + keep = False + pass + + if keep: + return login_store + else: + return httpstore(cache=cache, cachedir=cachedir, **kwargs) diff --git a/argopy/tests/test_fetchers_data_erddap.py b/argopy/tests/test_fetchers_data_erddap.py index 610db04e..d84cc918 100644 --- a/argopy/tests/test_fetchers_data_erddap.py +++ b/argopy/tests/test_fetchers_data_erddap.py @@ -1,5 +1,6 @@ import logging import numpy as np +import pandas as pd from argopy import DataFetcher as ArgoDataFetcher from argopy.utilities import is_list_of_strings From dda379faa057aa6ec5c2695e0087f0aab405c811 Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Wed, 26 Apr 2023 10:40:00 +0200 Subject: [PATCH 3/5] misc --- argopy/data_fetchers/erddap_refdata.py | 2 +- argopy/stores/filesystems.py | 44 +++++++++++++++++++++----- argopy/utilities.py | 3 ++ 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/argopy/data_fetchers/erddap_refdata.py b/argopy/data_fetchers/erddap_refdata.py index 98f48ebd..c5c357c4 100644 --- a/argopy/data_fetchers/erddap_refdata.py +++ b/argopy/data_fetchers/erddap_refdata.py @@ -101,7 +101,7 @@ def _init_erddapy(self): # Init erddapy self.erddap = ERDDAP(server=str(self.server), protocol="tabledap") self.erddap.response = "nc" - self.erddap.dataset_id = "Argo-ref-ctd" + self.erddap.dataset_id = "Argo-ref-ctd-petit" return self @property diff --git a/argopy/stores/filesystems.py b/argopy/stores/filesystems.py index 8864339c..b9886112 100644 --- a/argopy/stores/filesystems.py +++ b/argopy/stores/filesystems.py @@ -1169,16 +1169,44 @@ def __init__(self, if auto: assert isinstance(self.connect(), bool) - def __repr__(self): - # summary = ["" % id(self)] - summary = [""] - summary.append("login page: %s" % self._login_page) - summary.append("login data: %s" % (self._login_payload)) + # def __repr__(self): + # # summary = ["" % id(self)] + # summary = [""] + # summary.append("login page: %s" % self._login_page) + # summary.append("login data: %s" % (self._login_payload)) + # if hasattr(self, '_connected'): + # summary.append("connected: %s" % (self._connected)) + # else: + # summary.append("connected: ?") + # return "\n".join(summary) + + def _repr_html_(self): + td_title = lambda title: '
%s
' % title # noqa: E731 + tr_title = lambda title: "%s" % td_title(title) # noqa: E731 + a_link = lambda url, txt: '%s' % (url, txt) + td_key = lambda prop: '%s' % str(prop) # noqa: E731 + td_val = lambda label: '%s' % str(label) # noqa: E731 + tr_tick = lambda key, value: '%s%s' % (td_key(key), td_val(value)) # noqa: E731 + td_vallink = lambda url, label: '%s' % a_link(url, label) # noqa: E731 + tr_ticklink = lambda key, url, value: '%s%s' % (td_key(key), td_vallink(url, value)) # noqa: E731 + + html = [] + html.append("") + html.append("") + html.append(tr_title("httpstore_erddap_auth")) + html.append("") + html.append("") + html.append(tr_ticklink("login page", self._login_page, self._login_page)) + html.append(tr_tick("login data", self._login_payload)) if hasattr(self, '_connected'): - summary.append("connected: %s" % (self._connected)) + html.append(tr_tick("connected", "✅" if self._connected else "⛔")) else: - summary.append("connected: ?") - return "\n".join(summary) + html.append(tr_tick("connected", "?")) + html.append("") + html.append("
") + + html = "\n".join(html) + return html def connect(self): try: diff --git a/argopy/utilities.py b/argopy/utilities.py index 75f29be7..ce9c1856 100644 --- a/argopy/utilities.py +++ b/argopy/utilities.py @@ -3460,6 +3460,9 @@ def cast_types(ds): # noqa: C901 "HISTORY_PARAMETER", "VERTICAL_SAMPLING_SCHEME", "FLOAT_SERIAL_NO", + "SOURCE", + "EXPOCODE", + "QCLEVEL", ] list_int = [ "PLATFORM_NUMBER", From 7d8d154dfb53516e8f8a9210254f95c8ea216648 Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Wed, 26 Apr 2023 11:12:09 +0200 Subject: [PATCH 4/5] Add minimal docs --- argopy/data_fetchers/erddap_data.py | 5 ++++- argopy/data_fetchers/erddap_refdata.py | 18 +++++++++++++++--- argopy/data_fetchers/proto.py | 1 + docs/api-hidden.rst | 4 ++++ docs/api.rst | 2 ++ docs/whats-new.rst | 10 +++++++++- 6 files changed, 35 insertions(+), 5 deletions(-) diff --git a/argopy/data_fetchers/erddap_data.py b/argopy/data_fetchers/erddap_data.py index 36789b2c..9f226012 100644 --- a/argopy/data_fetchers/erddap_data.py +++ b/argopy/data_fetchers/erddap_data.py @@ -82,7 +82,6 @@ def uri(self) -> list: ### # Methods that must not change ### - def __init__( self, ds: str = "", @@ -155,6 +154,7 @@ def __repr__(self): @property def server(self): + """URL of the Erddap server""" return self._server @server.setter @@ -500,18 +500,21 @@ def to_xarray(self, errors: str = "ignore"): # noqa: C901 return ds def filter_data_mode(self, ds, **kwargs): + """Apply xarray argo accessor filter_data_mode method""" ds = ds.argo.filter_data_mode(errors="ignore", **kwargs) if ds.argo._type == "point": ds["N_POINTS"] = np.arange(0, len(ds["N_POINTS"])) return ds def filter_qc(self, ds, **kwargs): + """Apply xarray argo accessor filter_qc method""" ds = ds.argo.filter_qc(**kwargs) if ds.argo._type == "point": ds["N_POINTS"] = np.arange(0, len(ds["N_POINTS"])) return ds def filter_variables(self, ds, mode="standard"): + """Filter variables according to user mode""" if mode == "standard": to_remove = sorted( list(set(list(ds.data_vars)) - set(list_standard_variables())) diff --git a/argopy/data_fetchers/erddap_refdata.py b/argopy/data_fetchers/erddap_refdata.py index c5c357c4..16257d73 100644 --- a/argopy/data_fetchers/erddap_refdata.py +++ b/argopy/data_fetchers/erddap_refdata.py @@ -4,7 +4,7 @@ import xarray as xr from .erddap_data import ErddapArgoDataFetcher from argopy.options import OPTIONS -from argopy.utilities import Chunker +from argopy.utilities import Chunker, doc_inherit from argopy.stores import httpstore_erddap_auth import logging @@ -33,9 +33,21 @@ class ErddapREFDataFetcher(ErddapArgoDataFetcher): - """Manage access to Argo CTD reference data through Ifremer ERDDAP""" + """Manage access to Argo CTD-reference data through Ifremer ERDDAP""" + # @doc_inherit def __init__(self, **kwargs): + """Instantiate an authenticated ERDDAP Argo data fetcher + + Parameters + ---------- + cache: bool (optional) + Cache data or not (default: False) + cachedir: str (optional) + Path to cache folder + api_timeout: int (optional) + Erddap request time out in seconds. Set to OPTIONS['api_timeout'] by default. + """ kwargs["ds"] = "ref-ctd" super().__init__(**kwargs) kw = kwargs @@ -150,7 +162,7 @@ def to_xarray(self, errors: str = "ignore"): # noqa: C901 class Fetch_box(ErddapREFDataFetcher): - """Manage access to Argo data through Ifremer ERDDAP for: an ocean rectangle""" + """Manage access to Argo CTD-reference data through Ifremer ERDDAP for: an ocean rectangle""" def init(self, box: list, **kw): """Create Argo data loader diff --git a/argopy/data_fetchers/proto.py b/argopy/data_fetchers/proto.py index 79815284..7dbe8089 100644 --- a/argopy/data_fetchers/proto.py +++ b/argopy/data_fetchers/proto.py @@ -101,6 +101,7 @@ def sha(self) -> str: return hashlib.sha256(path.encode()).hexdigest() def dashboard(self, **kw): + """Return 3rd party dashboard for the access point""" if self.WMO is not None: if len(self.WMO) == 1 and self.CYC is not None and len(self.CYC) == 1: return dashboard(wmo=self.WMO[0], cyc=self.CYC[0], **kw) diff --git a/docs/api-hidden.rst b/docs/api-hidden.rst index 077bb648..672d665c 100644 --- a/docs/api-hidden.rst +++ b/docs/api-hidden.rst @@ -48,6 +48,10 @@ argopy.data_fetchers.argovis_data.Fetch_wmo argopy.data_fetchers.argovis_data.Fetch_box + argopy.data_fetchers.erddap_refdata.ErddapREFDataFetcher + argopy.data_fetchers.erddap_refdata.Fetch_box + argopy.data_fetchers.CTDRefDataFetcher + argopy.options.set_options argopy.tutorial.open_dataset diff --git a/docs/api.rst b/docs/api.rst index 55735834..08e733bf 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -89,6 +89,8 @@ Utilities for Argo related data TopoFetcher ArgoNVSReferenceTables OceanOPSDeployments + CTDRefDataFetcher + .. _Module Visualisation: diff --git a/docs/whats-new.rst b/docs/whats-new.rst index 9124cdd1..4eb84d53 100644 --- a/docs/whats-new.rst +++ b/docs/whats-new.rst @@ -13,7 +13,15 @@ v0.1.14 (XX Xxx. 2023) **Features and front-end API** -- **Argopy now provides authenticated access to the Argo reference database for DMQC**. Using login/password new **argopy* options, it is now possible to fetch the `Argo reference database `_, both CTD data and Argo CTD. +- **Argopy now provides authenticated access to the Argo reference database for DMQC**. Using user/password new **argopy** options, it is now possible to fetch the `Argo CTD reference database `_, with the :class:`CTDRefDataFetcher` class. (pr:`256`) by `G. Maze `_ + +.. code-block:: python + + from argopy import CTDRefDataFetcher + + with argopy.set_options(user="john_doe", password="***"): + f = CTDRefDataFetcher(box=[15, 30, -70, -60, 0, 5000.0]) + ds = f.to_xarray() **Internals** From 0153453fa95919d081d3f318126af9a71e3a096c Mon Sep 17 00:00:00 2001 From: Guillaume Maze Date: Wed, 26 Apr 2023 12:36:08 +0200 Subject: [PATCH 5/5] misc --- argopy/data_fetchers/erddap_data.py | 2 +- argopy/stores/filesystems.py | 16 ++++++++-------- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/argopy/data_fetchers/erddap_data.py b/argopy/data_fetchers/erddap_data.py index 9f226012..f84ebacc 100644 --- a/argopy/data_fetchers/erddap_data.py +++ b/argopy/data_fetchers/erddap_data.py @@ -131,7 +131,7 @@ def __init__( if not isinstance(parallel, bool): parallel_method = parallel parallel = True - if parallel_method not in ["thread"]: + if parallel_method not in ["thread", "seq"]: raise ValueError( "erddap only support multi-threading, use 'thread' instead of '%s'" % parallel_method diff --git a/argopy/stores/filesystems.py b/argopy/stores/filesystems.py index b9886112..d8f07118 100644 --- a/argopy/stores/filesystems.py +++ b/argopy/stores/filesystems.py @@ -321,7 +321,7 @@ def open_mfdataset(self, # noqa: C901 for url in urls} futures = concurrent.futures.as_completed(future_to_url) if progress: - futures = tqdm(futures, total=len(urls), disable='disable' in progress) + futures = tqdm(futures, total=len(urls), disable='disable' in [progress]) for future in futures: data = None @@ -353,7 +353,7 @@ def open_mfdataset(self, # noqa: C901 elif method in ['seq', 'sequential']: if progress: - urls = tqdm(urls, total=len(urls), disable='disable' in progress) + urls = tqdm(urls, total=len(urls), disable='disable' in [progress]) for url in urls: data = None @@ -628,7 +628,7 @@ def drop_variables_not_in_all_datasets(ds_collection): for url in urls} futures = concurrent.futures.as_completed(future_to_url) if progress: - futures = tqdm(futures, total=len(urls), disable='disable' in progress) + futures = tqdm(futures, total=len(urls), disable='disable' in [progress]) for future in futures: data = None @@ -665,7 +665,7 @@ def drop_variables_not_in_all_datasets(ds_collection): elif method in ['seq', 'sequential']: if progress: - urls = tqdm(urls, total=len(urls), disable='disable' in progress) + urls = tqdm(urls, total=len(urls), disable='disable' in [progress]) for url in urls: data = None @@ -827,7 +827,7 @@ def open_mfjson(self, # noqa: C901 preprocess=preprocess, url_follow=url_follow, *args, **kwargs): url for url in urls} futures = concurrent.futures.as_completed(future_to_url) if progress: - futures = tqdm(futures, total=len(urls), disable='disable' in progress) + futures = tqdm(futures, total=len(urls), disable='disable' in [progress]) for future in futures: data = None @@ -854,7 +854,7 @@ def open_mfjson(self, # noqa: C901 elif method in ['seq', 'sequential']: if progress: # log.debug("We asked for a progress bar !") - urls = tqdm(urls, total=len(urls), disable='disable' in progress) + urls = tqdm(urls, total=len(urls), disable='disable' in [progress]) for url in urls: data = None @@ -1029,7 +1029,7 @@ def drop_variables_not_in_all_datasets(ds_collection): preprocess_opts=preprocess_opts, *args, **kwargs): url for url in urls} futures = concurrent.futures.as_completed(future_to_url) if progress: - futures = tqdm(futures, total=len(urls), disable='disable' in progress) + futures = tqdm(futures, total=len(urls), disable='disable' in [progress]) for future in futures: data = None @@ -1068,7 +1068,7 @@ def drop_variables_not_in_all_datasets(ds_collection): elif method in ['seq', 'sequential']: if progress: - urls = tqdm(urls, total=len(urls), disable='disable' in progress) + urls = tqdm(urls, total=len(urls), disable='disable' in [progress]) for url in urls: data = None