Skip to content

Commit

Permalink
Merge branch 'master' into stac_jobdb
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Dec 6, 2024
2 parents 949e7a9 + a811bff commit 4487ab7
Show file tree
Hide file tree
Showing 14 changed files with 279 additions and 81 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added

- Automatically use `load_url` when providing a URL as geometries to `DataCube.aggregate_spatial()`, `DataCube.mask_polygon()`, etc. ([#104](https://github.com/Open-EO/openeo-python-client/issues/104), [#457](https://github.com/Open-EO/openeo-python-client/issues/457))
- Allow specifying `limit` when listing batch jobs with `Connection.list_jobs()` ([#677](https://github.com/Open-EO/openeo-python-client/issues/677))

### Changed

Expand All @@ -18,12 +19,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
this is not translated automatically anymore to deprecated, non-standard `read_vector` usage.
Instead, if it is a local GeoJSON file, the GeoJSON data will be loaded directly client-side.
([#104](https://github.com/Open-EO/openeo-python-client/issues/104), [#457](https://github.com/Open-EO/openeo-python-client/issues/457))
- Move `read()` method from general `JobDatabaseInterface` to more specific `FullDataFrameJobDatabase` ([#680](https://github.com/Open-EO/openeo-python-client/issues/680))

### Removed

### Fixed

- `load_stac`: use fallback temporal dimension when no "cube:dimensions" in STAC Collection ([#666](https://github.com/Open-EO/openeo-python-client/issues/666))
- Fix usage of `Parameter.spatial_extent()` with `load_collection` and `filter_bbox` ([#676](https://github.com/Open-EO/openeo-python-client/issues/676))

## [0.35.0] - 2024-11-19

Expand Down
2 changes: 1 addition & 1 deletion docs/udp.rst
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ Some useful parameter helpers (class methods of the :py:class:`~openeo.api.proce
- :py:meth:`Parameter.geojson() <openeo.api.process.Parameter.geojson>` to create
a parameter for specifying a GeoJSON geometry.
- :py:meth:`Parameter.spatial_extent() <openeo.api.process.Parameter.spatial_extent>` to create
a spatial_extent parameter that is exactly the same as the corresponding parameter in `load_collection` and `load_stac`.
a spatial_extent parameter that is exactly the same as the corresponding parameter in ``load_collection`` and ``load_stac``.



Expand Down
58 changes: 47 additions & 11 deletions openeo/api/process.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import textwrap
import warnings
from typing import List, Optional, Union

Expand Down Expand Up @@ -279,23 +280,15 @@ def bounding_box(
}
return cls(name=name, description=description, schema=schema, **kwargs)

_spatial_extent_description = """Limits the data to process to the specified bounding box or polygons.
For raster data, the process loads the pixel into the data cube if the point at the pixel center intersects with the bounding box or any of the polygons (as defined in the Simple Features standard by the OGC).
For vector data, the process loads the geometry into the data cube if the geometry is fully within the bounding box or any of the polygons (as defined in the Simple Features standard by the OGC). Empty geometries may only be in the data cube if no spatial extent has been provided.
Empty geometries are ignored.
Set this parameter to null to set no limit for the spatial extent. """

@classmethod
def spatial_extent(
cls,
name: str = "spatial_extent",
description: str = _spatial_extent_description,
description: Optional[str] = None,
**kwargs,
) -> Parameter:
"""
Helper to easily create a 'spatial_extent' parameter, which is compatible with the 'load_collection' argument of
Helper to easily create a 'spatial_extent' parameter, which is compatible with the ``load_collection`` argument of
the same name. This allows to conveniently create user-defined processes that can be applied to a bounding box and vector data
for spatial filtering. It is also possible for users to set to null, and define spatial filtering using other processes.
Expand All @@ -307,6 +300,26 @@ def spatial_extent(
.. versionadded:: 0.32.0
"""
if description is None:
description = textwrap.dedent(
"""
Limits the data to process to the specified bounding box or polygons.
For raster data, the process loads the pixel into the data cube if the point
at the pixel center intersects with the bounding box or any of the polygons
(as defined in the Simple Features standard by the OGC).
For vector data, the process loads the geometry into the data cube if the geometry
is fully within the bounding box or any of the polygons (as defined in the
Simple Features standard by the OGC). Empty geometries may only be in the
data cube if no spatial extent has been provided.
Empty geometries are ignored.
Set this parameter to null to set no limit for the spatial extent.
"""
).strip()

schema = [
{
"title": "Bounding Box",
Expand Down Expand Up @@ -410,7 +423,7 @@ def geojson(cls, name: str, description: str = "Geometries specified as GeoJSON
@classmethod
def temporal_interval(
cls,
name: str,
name: str = "temporal_extent",
description: str = "Temporal extent specified as two-element array with start and end date/date-time.",
**kwargs,
) -> Parameter:
Expand Down Expand Up @@ -441,3 +454,26 @@ def temporal_interval(
},
}
return cls(name=name, description=description, schema=schema, **kwargs)


def schema_supports(schema: Union[dict, List[dict]], type: str, subtype: Optional[str] = None) -> bool:
"""Helper to check if parameter schema supports given type/subtype"""
# TODO: support checking item type in arrays
if isinstance(schema, dict):
actual_type = schema.get("type")
if isinstance(actual_type, str):
if actual_type != type:
return False
elif isinstance(actual_type, list):
if type not in actual_type:
return False
else:
raise ValueError(actual_type)
if subtype:
if schema.get("subtype") != subtype:
return False
return True
elif isinstance(schema, list):
return any(schema_supports(s, type=type, subtype=subtype) for s in schema)
else:
raise ValueError(schema)
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

_log = logging.getLogger(__name__)


class _Backend(NamedTuple):
"""Container for backend info/settings"""

Expand Down Expand Up @@ -70,7 +71,6 @@ def exists(self) -> bool:
"""Does the job database already exist, to read job data from?"""
...


@abc.abstractmethod
def persist(self, df: pd.DataFrame):
"""
Expand Down Expand Up @@ -370,9 +370,9 @@ def start_job_thread(self, start_job: Callable[[], BatchJob], job_db: JobDatabas

# Resume from existing db
_log.info(f"Resuming `run_jobs` from existing {job_db}")
df = job_db.read()

self._stop_thread = False

def run_loop():

# TODO: support user-provided `stats`
Expand Down Expand Up @@ -816,6 +816,15 @@ def initialize_from_df(self, df: pd.DataFrame, *, on_exists: str = "error"):
# Return self to allow chaining with constructor.
return self

@abc.abstractmethod
def read(self) -> pd.DataFrame:
"""
Read job data from the database as pandas DataFrame.
:return: loaded job data.
"""
...

@property
def df(self) -> pd.DataFrame:
if self._df is None:
Expand Down Expand Up @@ -862,6 +871,7 @@ class CsvJobDatabase(FullDataFrameJobDatabase):
.. versionadded:: 0.31.0
"""

def __init__(self, path: Union[str, Path]):
super().__init__()
self.path = Path(path)
Expand Down Expand Up @@ -918,6 +928,7 @@ class ParquetJobDatabase(FullDataFrameJobDatabase):
.. versionadded:: 0.31.0
"""

def __init__(self, path: Union[str, Path]):
super().__init__()
self.path = Path(path)
Expand All @@ -940,6 +951,7 @@ def read(self) -> pd.DataFrame:
metadata = pyarrow.parquet.read_metadata(self.path)
if b"geo" in metadata.metadata:
import geopandas

return geopandas.read_parquet(self.path)
else:
return pd.read_parquet(self.path)
Expand Down Expand Up @@ -1051,6 +1063,7 @@ class ProcessBasedJobCreator:
`feedback and suggestions for improvement <https://github.com/Open-EO/openeo-python-client/issues>`_.
"""

def __init__(
self,
*,
Expand Down Expand Up @@ -1083,7 +1096,6 @@ def _get_process_definition(self, connection: Connection) -> Process:
f"Unsupported process definition source udp_id={self._process_id!r} namespace={self._namespace!r}"
)


def start_job(self, row: pd.Series, connection: Connection, **_) -> BatchJob:
"""
Implementation of the ``start_job`` callable interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ def __init__(
self.base_url = stac_root_url
self.bulk_size = 500



def exists(self) -> bool:
return len([c.id for c in self.client.get_collections() if c.id == self.collection_id]) > 0

Expand Down Expand Up @@ -153,14 +151,13 @@ def item_from(self, series: pd.Series) -> pystac.Item:
return item

def count_by_status(self, statuses: Iterable[str] = ()) -> dict:
items = self.get_by_status(statuses,max=200)
items = self.get_by_status(statuses, max=200)
if items is None:
return {k: 0 for k in statuses}
else:
return items["status"].value_counts().to_dict()

def get_by_status(self, statuses: Iterable[str], max=None) -> pd.DataFrame:

if isinstance(statuses, str):
statuses = {statuses}
statuses = set(statuses)
Expand Down Expand Up @@ -198,7 +195,6 @@ def handle_row(series):
item = self.item_from(series)
all_items.append(item)


df.apply(handle_row, axis=1)

self._upload_items_bulk(self.collection_id, all_items)
Expand Down Expand Up @@ -269,13 +265,13 @@ def _create_collection(self, collection: Collection) -> dict:
default_auth = {
"_auth": {
"read": ["anonymous"],
"write": ["stac-openeo-admin", "stac-openeo-editor"]
"write": ["stac-openeo-admin", "stac-openeo-editor"],
}
}

coll_dict.update(default_auth)

response = requests.post(self.join_url("collections"), auth=self._auth,json=coll_dict)
response = requests.post(self.join_url("collections"), auth=self._auth, json=coll_dict)
_check_response_status(response, _EXPECTED_STATUS_POST)

return response.json()
Expand All @@ -287,6 +283,7 @@ def _create_collection(self, collection: Collection) -> dict:
requests.status_codes.codes.accepted,
]


def _check_response_status(response: requests.Response, expected_status_codes: List[int], raise_exc: bool = False):
if response.status_code not in expected_status_codes:
message = (
Expand Down
38 changes: 31 additions & 7 deletions openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ def request(
method: str,
path: str,
*,
params: Optional[dict] = None,
headers: Optional[dict] = None,
auth: Optional[AuthBase] = None,
check_error: bool = True,
Expand All @@ -159,13 +160,21 @@ def request(
auth = auth or (self.auth if not self._is_external(url) else None)
slow_response_threshold = kwargs.pop("slow_response_threshold", self.slow_response_threshold)
if _log.isEnabledFor(logging.DEBUG):
_log.debug("Request `{m} {u}` with headers {h}, auth {a}, kwargs {k}".format(
m=method.upper(), u=url, h=headers and headers.keys(), a=type(auth).__name__, k=list(kwargs.keys()))
_log.debug(
"Request `{m} {u}` with params {p}, headers {h}, auth {a}, kwargs {k}".format(
m=method.upper(),
u=url,
p=params,
h=headers and headers.keys(),
a=type(auth).__name__,
k=list(kwargs.keys()),
)
)
with ContextTimer() as timer:
resp = self.session.request(
method=method,
url=url,
params=params,
headers=self._merged_headers(headers),
auth=auth,
timeout=kwargs.pop("timeout", self.default_timeout),
Expand Down Expand Up @@ -227,16 +236,25 @@ def _raise_api_error(self, response: requests.Response):

raise OpenEoApiPlainError(message=text, http_status_code=status_code, error_message=error_message)

def get(self, path: str, stream: bool = False, auth: Optional[AuthBase] = None, **kwargs) -> Response:
def get(
self,
path: str,
*,
params: Optional[dict] = None,
stream: bool = False,
auth: Optional[AuthBase] = None,
**kwargs,
) -> Response:
"""
Do GET request to REST API.
:param path: API path (without root url)
:param params: Additional query parameters
:param stream: True if the get request should be streamed, else False
:param auth: optional custom authentication to use instead of the default one
:return: response: Response
"""
return self.request("get", path=path, stream=stream, auth=auth, **kwargs)
return self.request("get", path=path, params=params, stream=stream, auth=auth, **kwargs)

def post(self, path: str, json: Optional[dict] = None, **kwargs) -> Response:
"""
Expand Down Expand Up @@ -1047,18 +1065,24 @@ def describe_process(self, id: str, namespace: Optional[str] = None) -> dict:

raise OpenEoClientException("Process does not exist.")

def list_jobs(self) -> List[dict]:
def list_jobs(self, limit: Union[int, None] = None) -> List[dict]:
"""
Lists all jobs of the authenticated user.
:param limit: maximum number of jobs to return. Setting this limit enables pagination.
:return: job_list: Dict of all jobs of the user.
.. versionadded:: 0.36.0
Added ``limit`` argument
"""
# TODO: Parse the result so that there get Job classes returned?
resp = self.get('/jobs', expected_status=200).json()
# TODO: Parse the result so that Job classes returned?
resp = self.get("/jobs", params={"limit": limit}, expected_status=200).json()
if resp.get("federation:missing"):
_log.warning("Partial user job listing due to missing federation components: {c}".format(
c=",".join(resp["federation:missing"])
))
# TODO: when pagination is enabled: how to expose link to next page?
jobs = resp["jobs"]
return VisualList("data-table", data=jobs, parameters={'columns': 'jobs'})

Expand Down
12 changes: 6 additions & 6 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import shapely.geometry.base
from shapely.geometry import MultiPolygon, Polygon, mapping

from openeo.api.process import Parameter
from openeo.api.process import Parameter, schema_supports
from openeo.dates import get_temporal_extent
from openeo.internal.documentation import openeo_process
from openeo.internal.graph_building import PGNode, ReduceNode, _FromNodeMixin
Expand Down Expand Up @@ -182,10 +182,10 @@ def load_collection(
temporal_extent = cls._get_temporal_extent(extent=temporal_extent)

if isinstance(spatial_extent, Parameter):
if spatial_extent.schema.get("type") != "object":
if not schema_supports(spatial_extent.schema, type="object"):
warnings.warn(
"Unexpected parameterized `spatial_extent` in `load_collection`:"
f" expected schema with type 'object' but got {spatial_extent.schema!r}."
f" expected schema compatible with type 'object' but got {spatial_extent.schema!r}."
)
arguments = {
'id': collection_id,
Expand Down Expand Up @@ -481,7 +481,7 @@ def filter_bbox(
crs: Optional[Union[int, str]] = None,
base: Optional[float] = None,
height: Optional[float] = None,
bbox: Optional[Sequence[float]] = None,
bbox: Union[Sequence[float], Parameter, None] = None,
) -> DataCube:
"""
Limits the data cube to the specified bounding box.
Expand Down Expand Up @@ -555,10 +555,10 @@ def filter_bbox(
raise ValueError(args)

if isinstance(bbox, Parameter):
if bbox.schema.get("type") != "object":
if not schema_supports(bbox.schema, type="object"):
warnings.warn(
"Unexpected parameterized `extent` in `filter_bbox`:"
f" expected schema with type 'object' but got {bbox.schema!r}."
f" expected schema compatible with type 'object' but got {bbox.schema!r}."
)
extent = bbox
else:
Expand Down
Loading

0 comments on commit 4487ab7

Please sign in to comment.