Skip to content

Commit

Permalink
Issue #683/#681 align additional/job_options argument in create_job, …
Browse files Browse the repository at this point in the history
…download, ...
  • Loading branch information
soxofaan committed Dec 10, 2024
1 parent a811bff commit 0076152
Show file tree
Hide file tree
Showing 5 changed files with 130 additions and 21 deletions.
57 changes: 50 additions & 7 deletions openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1644,6 +1644,8 @@ def upload_file(
def _build_request_with_process_graph(
self,
process_graph: Union[dict, FlatGraphableMixin, str, Path, List[FlatGraphableMixin]],
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
**kwargs,
) -> dict:
"""
Expand All @@ -1655,6 +1657,13 @@ def _build_request_with_process_graph(
if any(c != self for c in connections):
raise OpenEoClientException(f"Mixing different connections: {self} and {connections}.")
result = kwargs

if additional:
result.update(additional)
if job_options is not None:
assert "job_options" not in result
result["job_options"] = job_options

process_graph = as_flat_graph(process_graph)
if "process_graph" not in process_graph:
process_graph = {"process_graph": process_graph}
Expand Down Expand Up @@ -1702,6 +1711,8 @@ def download(
timeout: Optional[int] = None,
validate: Optional[bool] = None,
chunk_size: int = DEFAULT_DOWNLOAD_CHUNK_SIZE,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
) -> Union[None, bytes]:
"""
Downloads the result of a process graph synchronously,
Expand All @@ -1715,8 +1726,16 @@ def download(
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param chunk_size: chunk size for streaming response.
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
.. versionadded:: 0.36.0
Added arguments ``additional`` and ``job_options``.
"""
pg_with_metadata = self._build_request_with_process_graph(process_graph=graph)
pg_with_metadata = self._build_request_with_process_graph(
process_graph=graph, additional=additional, job_options=job_options
)
self._preflight_validation(pg_with_metadata=pg_with_metadata, validate=validate)
response = self.post(
path="/result",
Expand All @@ -1740,6 +1759,8 @@ def execute(
timeout: Optional[int] = None,
validate: Optional[bool] = None,
auto_decode: bool = True,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
) -> Union[dict, requests.Response]:
"""
Execute a process graph synchronously and return the result. If the result is a JSON object, it will be parsed.
Expand All @@ -1749,10 +1770,18 @@ def execute(
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_decode: Boolean flag to enable/disable automatic JSON decoding of the response. Defaults to True.
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:return: parsed JSON response as a dict if auto_decode is True, otherwise response object
.. versionadded:: 0.36.0
Added arguments ``additional`` and ``job_options``.
"""
pg_with_metadata = self._build_request_with_process_graph(process_graph=process_graph)
pg_with_metadata = self._build_request_with_process_graph(
process_graph=process_graph, additional=additional, job_options=job_options
)
self._preflight_validation(pg_with_metadata=pg_with_metadata, validate=validate)
response = self.post(
path="/result",
Expand All @@ -1779,6 +1808,7 @@ def create_job(
plan: Optional[str] = None,
budget: Optional[float] = None,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
) -> BatchJob:
"""
Expand All @@ -1795,23 +1825,27 @@ def create_job(
:param plan: The billing plan to process and charge the job with
:param budget: Maximum budget to be spent on executing the job.
Note that some backends do not honor this limit.
:param additional: additional job options to pass to the backend
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:return: Created job
.. versionchanged:: 0.35.0
Add :ref:`multi-result support <multi-result-process-graphs>`.
.. versionadded:: 0.36.0
Added argument ``job_options``.
"""
# TODO move all this (BatchJob factory) logic to BatchJob?

pg_with_metadata = self._build_request_with_process_graph(
process_graph=process_graph,
additional=additional,
job_options=job_options,
**dict_no_none(title=title, description=description, plan=plan, budget=budget)
)
if additional:
# TODO: get rid of this non-standard field? https://github.com/Open-EO/openeo-api/issues/276
pg_with_metadata["job_options"] = additional

self._preflight_validation(pg_with_metadata=pg_with_metadata, validate=validate)
response = self.post("/jobs", json=pg_with_metadata, expected_status=201)
Expand Down Expand Up @@ -1871,9 +1905,12 @@ def load_disk_collection(
def as_curl(
self,
data: Union[dict, DataCube, FlatGraphableMixin],
*,
path="/result",
method="POST",
obfuscate_auth: bool = False,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
) -> str:
"""
Build curl command to evaluate given process graph or data cube
Expand All @@ -1891,14 +1928,20 @@ def as_curl(
or ``"/jobs"`` for batch jobs
:param method: HTTP method to use (typically ``"POST"``)
:param obfuscate_auth: don't show actual bearer token
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:return: curl command as a string
.. versionadded:: 0.36.0
Added arguments ``additional`` and ``job_options``.
"""
cmd = ["curl", "-i", "-X", method]
cmd += ["-H", "Content-Type: application/json"]
if isinstance(self.auth, BearerAuth):
cmd += ["-H", f"Authorization: Bearer {'...' if obfuscate_auth else self.auth.bearer}"]
pg_with_metadata = self._build_request_with_process_graph(data)
pg_with_metadata = self._build_request_with_process_graph(data, additional=additional, job_options=job_options)
if path == "/validation":
pg_with_metadata = pg_with_metadata["process"]
post_json = json.dumps(pg_with_metadata, separators=(",", ":"))
Expand Down
35 changes: 30 additions & 5 deletions openeo/rest/datacube.py
Original file line number Diff line number Diff line change
Expand Up @@ -2329,6 +2329,8 @@ def download(
*,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
) -> Union[None, bytes]:
"""
Execute synchronously and download the raster data cube, e.g. as GeoTIFF.
Expand All @@ -2342,11 +2344,17 @@ def download(
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:return: None if the result is stored to disk, or a bytes object returned by the backend.
.. versionchanged:: 0.32.0
Added ``auto_add_save_result`` option
.. versionadded:: 0.36.0
Added arguments ``additional`` and ``job_options``.
"""
# TODO #278 centralize download/create_job/execute_job logic in DataCube, VectorCube, MlModel, ...
cube = self
Expand All @@ -2359,7 +2367,9 @@ def download(
default_format=self._DEFAULT_RASTER_FORMAT,
method="DataCube.download()",
)
return self._connection.download(cube.flat_graph(), outputfile, validate=validate)
return self._connection.download(
cube.flat_graph(), outputfile, validate=validate, additional=additional, job_options=job_options
)

def validate(self) -> List[dict]:
"""
Expand Down Expand Up @@ -2463,6 +2473,7 @@ def execute_batch(
print: typing.Callable[[str], None] = print,
max_poll_interval: float = 60,
connection_retry_interval: float = 30,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
Expand All @@ -2477,13 +2488,18 @@ def execute_batch(
:param outputfile: The path of a file to which a result can be written
:param out_format: (optional) File format to use for the job result.
:param job_options:
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
.. versionchanged:: 0.32.0
Added ``auto_add_save_result`` option
.. versionadded:: 0.36.0
Added argument ``additional``.
"""
# TODO: start showing deprecation warnings about these inconsistent argument names
if "format" in format_options and not out_format:
Expand All @@ -2506,6 +2522,7 @@ def execute_batch(
description=description,
plan=plan,
budget=budget,
additional=additional,
job_options=job_options,
validate=validate,
auto_add_save_result=False,
Expand All @@ -2523,6 +2540,7 @@ def create_job(
description: Optional[str] = None,
plan: Optional[str] = None,
budget: Optional[float] = None,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
auto_add_save_result: bool = True,
Expand All @@ -2543,15 +2561,20 @@ def create_job(
:param plan: The billing plan to process and charge the job with
:param budget: Maximum budget to be spent on executing the job.
Note that some backends do not honor this limit.
:param job_options: custom job options.
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:param validate: Optional toggle to enable/prevent validation of the process graphs before execution
(overruling the connection's ``auto_validate`` setting).
:param auto_add_save_result: Automatically add a ``save_result`` node to the process graph if there is none yet.
:return: Created job.
.. versionchanged:: 0.32.0
.. versionadded:: 0.32.0
Added ``auto_add_save_result`` option
.. versionadded:: 0.36.0
Added ``additional`` argument.
"""
# TODO: add option to also automatically start the job?
# TODO: avoid using all kwargs as format_options
Expand All @@ -2572,7 +2595,8 @@ def create_job(
plan=plan,
budget=budget,
validate=validate,
additional=job_options,
additional=additional,
job_options=job_options,
)

send_job = legacy_alias(create_job, name="send_job", since="0.10.0")
Expand Down Expand Up @@ -2617,6 +2641,7 @@ def execute(self, *, validate: Optional[bool] = None, auto_decode: bool = True)
:return: parsed JSON response as a dict if auto_decode is True, otherwise response object
"""
# TODO: deprecated this. It's ill-defined how to "execute" a data cube without downloading it.
return self._connection.execute(self.flat_graph(), validate=validate, auto_decode=auto_decode)

@staticmethod
Expand Down
29 changes: 25 additions & 4 deletions openeo/rest/mlmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def execute_batch(
print=print,
max_poll_interval=60,
connection_retry_interval=30,
job_options=None,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
) -> BatchJob:
"""
Evaluate the process graph by creating a batch job, and retrieving the results when it is finished.
Expand All @@ -81,8 +82,21 @@ def execute_batch(
:param outputfile: The path of a file to which a result can be written
:param out_format: (optional) Format of the job result.
:param format_options: String Parameters for the job result format
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
.. versionadded:: 0.36.0
Added argument ``additional``.
"""
job = self.create_job(title=title, description=description, plan=plan, budget=budget, job_options=job_options)
job = self.create_job(
title=title,
description=description,
plan=plan,
budget=budget,
additional=additional,
job_options=job_options,
)
return job.run_synchronous(
# TODO #135 support multi file result sets too
outputfile=outputfile,
Expand All @@ -96,6 +110,7 @@ def create_job(
description: Optional[str] = None,
plan: Optional[str] = None,
budget: Optional[float] = None,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
) -> BatchJob:
"""
Expand All @@ -106,9 +121,14 @@ def create_job(
:param plan: The billing plan to process and charge the job with
:param budget: Maximum budget to be spent on executing the job.
Note that some backends do not honor this limit.
:param job_options: A dictionary containing (custom) job options
:param additional: additional (top-level) properties to set in the request body
:param job_options: dictionary of job options to pass to the backend
(under top-level property "job_options")
:param format_options: String Parameters for the job result format
:return: Created job.
.. versionadded:: 0.36.0
Added argument ``additional``.
"""
# TODO: centralize `create_job` for `DataCube`, `VectorCube`, `MlModel`, ...
pg = self
Expand All @@ -121,5 +141,6 @@ def create_job(
description=description,
plan=plan,
budget=budget,
additional=job_options,
additional=additional,
job_options=job_options,
)
13 changes: 11 additions & 2 deletions openeo/rest/multiresult.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,16 @@ def create_job(
*,
title: Optional[str] = None,
description: Optional[str] = None,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
) -> BatchJob:
return self._connection.create_job(
process_graph=self._multi_leaf_graph,
title=title,
description=description,
additional=job_options,
additional=additional,
job_options=job_options,
validate=validate,
)

Expand All @@ -95,8 +97,15 @@ def execute_batch(
*,
title: Optional[str] = None,
description: Optional[str] = None,
additional: Optional[dict] = None,
job_options: Optional[dict] = None,
validate: Optional[bool] = None,
) -> BatchJob:
job = self.create_job(title=title, description=description, job_options=job_options, validate=validate)
job = self.create_job(
title=title,
description=description,
additional=additional,
job_options=job_options,
validate=validate,
)
return job.run_synchronous()
Loading

0 comments on commit 0076152

Please sign in to comment.