diff --git a/CHANGES.rst b/CHANGES.rst index eab70a4c1..cac16c735 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -12,10 +12,16 @@ Changes Changes: -------- -- No change. +- Add statistics collection at the end of `Job` execution to obtain used memory from ``celery`` process and spaced + used by produced results. +- Add ``/jobs/{jobID}/statistics`` endpoint (and corresponding locations for ``/providers`` and ``/processes``) to + report any collected statistics following a `Job` execution. Fixes: ------ +- Fix `Job` ``Location`` header injected twice in ``get_job_submission_response`` causing header to have comma-separated + list of URI values failing retrieval by `CLI` when attempting to perform auto-monitoring of the submitted `Job`. +- Fix `CWL` runtime context setup to return monitored maximum RAM used by application under the `Process` if possible. - Fix failing `Service` provider summary response in case of unresponsive (not accessible or parsable) URL endpoint contents due to different errors raised by distinct versions of ``requests`` package. diff --git a/docs/source/configuration.rst b/docs/source/configuration.rst index dbee3dd7e..48dc3634b 100644 --- a/docs/source/configuration.rst +++ b/docs/source/configuration.rst @@ -175,7 +175,7 @@ they are optional and which default value or operation is applied in each situat - | ``weaver.wps_workdir = `` | (default: uses automatically generated temporary directory if none specified) | - | Prefix where process job worker should execute the process from. + | Prefix where process :term:`Job` worker should execute the :term:`Process` from. - | ``weaver.wps_restapi = true|false`` | (default: ``true``) @@ -413,4 +413,3 @@ Starting the Application - need to start ``gunicorn/pserve`` (example `Dockerfile-manager`_) - need to start ``celery`` worker (example `Dockerfile-worker`_) - diff --git a/requirements.txt b/requirements.txt index f3798fde3..21e202181 100644 --- a/requirements.txt +++ b/requirements.txt @@ -49,6 +49,7 @@ mako # esgf-compute-api (cwt) needs oauthlib but doesn't add it in their requirements oauthlib owslib==0.25.0 +psutil # FIXME: pymongo>=4 breaks with kombu corresponding to pinned Celery (https://github.com/crim-ca/weaver/issues/386) pymongo>=3.12.0,<4 # pyproj>=2 employed by OWSLib, but make requirements stricter diff --git a/tests/resources/__init__.py b/tests/resources/__init__.py index 1596c10d8..6a86e1b4a 100644 --- a/tests/resources/__init__.py +++ b/tests/resources/__init__.py @@ -1,8 +1,10 @@ import os +from weaver import WEAVER_MODULE_DIR from weaver.utils import load_file RESOURCES_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), "")) +EXAMPLES_PATH = os.path.join(WEAVER_MODULE_DIR, "wps_restapi/examples") GET_CAPABILITIES_TEMPLATE_URL = "{}?service=WPS&request=GetCapabilities&version=1.0.0" DESCRIBE_PROCESS_TEMPLATE_URL = "{}?service=WPS&request=DescribeProcess&identifier={}&version=1.0.0" @@ -38,6 +40,11 @@ ) +def load_example(file_name): + file_path = os.path.join(EXAMPLES_PATH, file_name) + return load_file(file_path) + + def load_resource(file_name): file_path = os.path.join(RESOURCES_PATH, file_name) return load_file(file_path) diff --git a/tests/test_utils.py b/tests/test_utils.py index dc725a7d9..80b4d6a54 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -35,6 +35,7 @@ from weaver.status import JOB_STATUS_CATEGORIES, STATUS_PYWPS_IDS, STATUS_PYWPS_MAP, Status, StatusCompliant, map_status from weaver.utils import ( NullType, + apply_number_with_unit, assert_sane_name, bytes2str, fetch_file, @@ -50,6 +51,7 @@ make_dirs, null, parse_kvp, + parse_number_with_unit, parse_prefer_header_execute_mode, pass_http_error, request_extra, @@ -811,3 +813,61 @@ def test_parse_kvp(query, params, expected): def test_prefer_header_execute_mode(headers, support, expected): result = parse_prefer_header_execute_mode(headers, support) assert result == expected + + +@pytest.mark.parametrize("number,binary,unit,expect", [ + (1.234, False, "B", "1.234 B"), + (10_000_000, False, "B", "10.000 MB"), + (10_000_000, True, "B", "9.537 MiB"), + (10_000_000_000, False, "B", "10.000 GB"), + (10_737_418_240, True, "B", "10.000 GiB"), + (10_000_000_000, True, "B", "9.313 GiB"), + (10**25, False, "", "10.000 Y"), + (10**25, True, "B", "8.272 YiB"), + (10**28, False, "", "10000.000 Y"), # last unit goes over bound + (10**-28, False, "s", "0.000 ys"), # out of bound, cannot represent smaller + (-10 * 10**3, False, "s", "-10.000 ks"), # negative & positive power + (-0.001234, False, "s", "-1.234 ms"), # negative & reducing power + (0, False, "", "0.000"), + (0.000, True, "", "0.000 B"), +]) +def test_apply_number_with_unit(number, binary, unit, expect): + result = apply_number_with_unit(number, unit=unit, binary=binary) + assert result == expect + + +@pytest.mark.parametrize("number,binary,expect", [ + ("1 B", None, 1), + # note: 'k' lower + ("1k", False, 1_000), # normal + ("1kB", False, 1_000), # forced unmatched 'B' + ("1kB", None, 1_024), # auto from 'B' + ("1kB", True, 1_024), # forced but matches + # note: 'K' upper + ("1K", False, 1_000), # normal + ("1KB", False, 1_000), # forced unmatched 'B' + ("1KB", None, 1_024), # auto from 'B' + ("1KB", True, 1_024), # forced but matches + # normal + ("1KiB", True, 1_024), # forced but matches + ("1KiB", None, 1_024), # normal + ("1KiB", False, 1_000), # forced unmatched 'B' + ("1G", False, 1_000_000_000), # normal + ("1GB", False, 1_000_000_000), # forced unmatched 'B' + ("1GB", None, 1_073_741_824), # auto from 'B' + ("1GB", True, 1_073_741_824), # forced but matches + ("1GiB", True, 1_073_741_824), # forced but matches + ("1GiB", None, 1_073_741_824), # normal + ("1GiB", False, 1_000_000_000), # forced unmatched 'B' + # rounding expected for binary (ie: 1 x 2^30 + 400 x 2^20 for accurate result) + # if not rounded, converting causes floating remainder (1.4 x 2^30 = 1503238553.6) + ("1.4GiB", True, 1_503_238_554), +]) +def test_parse_number_with_unit(number, binary, expect): + result = parse_number_with_unit(number, binary=binary) + assert result == expect + + +def test_parse_number_with_unit_error(): + with pytest.raises(ValueError): + parse_number_with_unit(123) # noqa diff --git a/tests/utils.py b/tests/utils.py index 02a5748af..538fa3d80 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -12,6 +12,7 @@ import os import re import subprocess +import sys import tempfile import uuid import warnings @@ -342,6 +343,8 @@ def run_command(command, trim=True, expect_error=False, entrypoint=None): command = [str(arg) for arg in command] if entrypoint is None: out, _ = subprocess.Popen(["which", "python"], universal_newlines=True, stdout=subprocess.PIPE).communicate() + if not out: + out = sys.executable # fallback for some systems that fail above call python_path = os.path.split(out)[0] debug_path = os.path.expandvars(os.environ["PATH"]) env = {"PATH": f"{python_path}:{debug_path}"} diff --git a/tests/wps_restapi/test_api.py b/tests/wps_restapi/test_api.py index 975ea7660..3c2b31990 100644 --- a/tests/wps_restapi/test_api.py +++ b/tests/wps_restapi/test_api.py @@ -10,7 +10,7 @@ from tests.utils import get_test_weaver_app, get_test_weaver_config from weaver.formats import ContentType -from weaver.utils import request_extra +from weaver.utils import get_header, request_extra from weaver.wps_restapi import swagger_definitions as sd @@ -44,6 +44,11 @@ def test_frontpage_format(self): resp = self.testapp.get(urlparse(path).path, expect_errors=True) # allow error for wps without queries else: resp = request_extra("GET", path, retries=3, retry_after=True, ssl_verify=False, allow_redirects=True) + user_agent = get_header("user-agent", resp.request.headers) + if resp.status_code == 403 and "python" in user_agent: + # some sites will explicitly block bots, retry with mocked user-agent simulating human user access + resp = request_extra("GET", path, headers={"User-Agent": "Mozilla"}, + retries=3, retry_after=True, ssl_verify=False, allow_redirects=True) code = resp.status_code test = f"({rel}) [{path}]" assert code in [200, 400], f"Reference link expected to be found, got [{code}] for {test}" diff --git a/tests/wps_restapi/test_jobs.py b/tests/wps_restapi/test_jobs.py index 1da9a390d..bf8e24378 100644 --- a/tests/wps_restapi/test_jobs.py +++ b/tests/wps_restapi/test_jobs.py @@ -18,6 +18,7 @@ import pytest from dateutil import parser as date_parser +from tests.resources import load_example from tests.utils import ( get_links, get_module_version, @@ -49,7 +50,7 @@ from typing import Iterable, List, Optional, Tuple, Union from weaver.status import AnyStatusType - from weaver.typedefs import JSON, AnyLogLevel, Number + from weaver.typedefs import JSON, AnyLogLevel, Number, Statistics from weaver.visibility import AnyVisibility @@ -172,7 +173,9 @@ def make_job(self, duration=None, # type: Optional[int] exceptions=None, # type: Optional[List[JSON]] logs=None, # type: Optional[List[Union[str, Tuple[str, AnyLogLevel, AnyStatusType, Number]]]] - ): + statistics=None, # type: Optional[Statistics] + add_info=True, # type: bool + ): # type: (...) -> Job if isinstance(created, str): created = date_parser.parse(created) job = self.job_store.save_job(task_id=task_id, process=process, service=service, is_workflow=False, @@ -192,8 +195,11 @@ def make_job(self, job.save_log(message=log_item) if exceptions is not None: job.exceptions = exceptions + if statistics is not None: + job.statistics = statistics job = self.job_store.update_job(job) - self.job_info.append(job) + if add_info: + self.job_info.append(job) return job def message_with_jobs_mapping(self, message="", indent=2): @@ -1391,11 +1397,15 @@ def test_job_results_errors(self): ]: for what in ["outputs", "results"]: path = f"/jobs/{job.id}/{what}" + case = ( + f"Failed using (Path: {path}, Status: {job.status}, Code: {code}, Job: {job}, " + f"Title: {title}, Error: {error_type}, Cause: {cause})" + ) resp = self.app.get(path, headers=self.json_headers, expect_errors=True) - assert resp.status_code == code, f"Failed using [{path}]" - assert resp.json["title"] == title - assert resp.json["cause"] == cause - assert resp.json["type"].endswith(error_type) # ignore http full reference, not always there + assert resp.status_code == code, case + assert resp.json["title"] == title, case + assert resp.json["cause"] == cause, case + assert resp.json["type"].endswith(error_type), case # ignore http full reference, not always there assert "links" in resp.json def test_jobs_inputs_outputs_validations(self): @@ -1548,3 +1558,27 @@ def test_job_logs_formats(self): assert "Start" in lines[0] assert "Process" in lines[1] assert "Complete" in lines[2] + + def test_job_statistics_missing(self): + job = self.job_info[0] + assert job.status == Status.SUCCEEDED, "invalid job status to run test" + path = f"/jobs/{job.id}/statistics" + resp = self.app.get(path, headers=self.json_headers, expect_errors=True) + assert resp.status_code == 404, "even if job is successful, expects not found if no statistics are available" + + def test_job_statistics_response(self): + stats = load_example("job_statistics.json") + job = self.make_job( + add_info=False, + task_id="2222-0000-0000-0000", process=self.process_public.identifier, service=None, + user_id=self.user_admin_id, status=Status.SUCCEEDED, progress=100, access=Visibility.PUBLIC, + statistics=stats + ) + try: + path = f"/jobs/{job.id}/statistics" + resp = self.app.get(path, headers=self.json_headers) + assert resp.status_code == 200 + assert resp.json == stats + finally: + if job: + self.job_store.delete_job(job.id) diff --git a/weaver/cli.py b/weaver/cli.py index 46a9f6918..df030f8b7 100644 --- a/weaver/cli.py +++ b/weaver/cli.py @@ -531,7 +531,7 @@ def _update_files(self, inputs, url=None): .. seealso:: - Headers dictionary limitation by :mod:`requests`: - https://docs.python-requests.org/en/latest/user/quickstart/#response-headers + https://requests.readthedocs.io/en/master/user/quickstart/#response-content - Headers formatting with multiple values must be provided by comma-separated values (:rfc:`7230#section-3.2.2`). - Multi Vault-Token parsing accomplished by :func:`weaver.vault.utils.parse_vault_token`. @@ -841,6 +841,7 @@ def monitor(self, remain = timeout = timeout or self.monitor_timeout delta = interval or self.monitor_interval LOGGER.info("Monitoring job [%s] for %ss at intervals of %ss.", job_id, timeout, delta) + LOGGER.debug("Job URL: [%s]", job_url) once = True while remain >= 0 or once: resp = request_extra("GET", job_url, headers=self._headers, settings=self._settings) diff --git a/weaver/datatype.py b/weaver/datatype.py index 4a20eed7c..fc5febd82 100644 --- a/weaver/datatype.py +++ b/weaver/datatype.py @@ -75,7 +75,8 @@ JSON, Link, Metadata, - QuoteProcessParameters + QuoteProcessParameters, + Statistics ) from weaver.visibility import AnyVisibility @@ -1021,6 +1022,21 @@ def progress(self, progress): raise ValueError(f"Value must be in range [0,100] for '{self.__name__}.progress'") self["progress"] = progress + @property + def statistics(self): + # type: () -> Optional[Statistics] + """ + Collected statistics about used memory and processing units if available. + """ + return self.get("statistics") + + @statistics.setter + def statistics(self, stats): + # type: (Statistics) -> None + if not isinstance(stats, dict): + raise TypeError(f"Type 'dict' is required for '{self.__name__}.statistics'") + self["statistics"] = stats + def _get_results(self): # type: () -> List[Optional[Dict[str, JSON]]] if self.get("results") is None: @@ -1052,13 +1068,13 @@ def _set_exceptions(self, exceptions): exceptions = property(_get_exceptions, _set_exceptions) def _get_logs(self): - # type: () -> List[Dict[str, str]] + # type: () -> List[str] if self.get("logs") is None: self["logs"] = [] return dict.__getitem__(self, "logs") def _set_logs(self, logs): - # type: (List[Dict[str, str]]) -> None + # type: (List[str]) -> None if not isinstance(logs, list): raise TypeError(f"Type 'list' is required for '{self.__name__}.logs'") self["logs"] = logs @@ -1198,6 +1214,8 @@ def links(self, container=None, self_link=None): "title": "Job outputs of successful process execution (extended outputs with metadata)."}, {"href": job_url + "/results", "rel": "http://www.opengis.net/def/rel/ogc/1.0/results", "title": "Job results of successful process execution (direct output values mapping)."}, + {"href": job_url + "/statistics", "rel": "statistics", # unofficial + "title": "Job statistics collected following process execution."}, ]) else: job_links.append({ @@ -1283,6 +1301,7 @@ def params(self): "updated": self.updated, "progress": self.progress, "results": self.results, + "statistics": self.statistics, "exceptions": self.exceptions, "logs": self.logs, "tags": self.tags, diff --git a/weaver/exceptions.py b/weaver/exceptions.py index fa7a9b455..3be62962c 100644 --- a/weaver/exceptions.py +++ b/weaver/exceptions.py @@ -190,6 +190,15 @@ class JobInvalidParameter(HTTPBadRequest, OWSInvalidParameterValue, JobException """ +class JobStatisticsNotFound(JobNotFound): + """ + Error related to statistics not available for a Job. + + Statistics could be unavailable due to incomplete execution, failed status, + or simply because it is an older result generated before this feature was introduced. + """ + + class JobRegistrationError(HTTPInternalServerError, OWSNoApplicableCode, JobException): """ Error related to a registration issue for a job. diff --git a/weaver/processes/constants.py b/weaver/processes/constants.py index 52a9e629d..befa1fcff 100644 --- a/weaver/processes/constants.py +++ b/weaver/processes/constants.py @@ -74,11 +74,13 @@ class OpenSearchField(Constants): # FIXME: convert to 'Constants' class CWL_REQUIREMENT_ENV_VAR = "EnvVarRequirement" CWL_REQUIREMENT_INIT_WORKDIR = "InitialWorkDirRequirement" +CWL_REQUIREMENT_RESOURCE = "ResourceRequirement" CWL_REQUIREMENT_SCATTER = "ScatterFeatureRequirement" CWL_REQUIREMENT_FEATURES = frozenset([ CWL_REQUIREMENT_ENV_VAR, CWL_REQUIREMENT_INIT_WORKDIR, + CWL_REQUIREMENT_RESOURCE, # FIXME: perform pre-check on job submit? (https://github.com/crim-ca/weaver/issues/138) # CWL_REQUIREMENT_SCATTER, # FIXME: see workflow test + fix https://github.com/crim-ca/weaver/issues/105 ]) """ @@ -154,6 +156,17 @@ class ProcessSchema(Constants): from weaver.typedefs import Literal # pylint: disable=invalid-name + CWL_RequirementNames = Literal[ + CWL_REQUIREMENT_APP_BUILTIN, + CWL_REQUIREMENT_APP_DOCKER, + CWL_REQUIREMENT_APP_DOCKER_GPU, + CWL_REQUIREMENT_APP_ESGF_CWT, + CWL_REQUIREMENT_APP_WPS1, + CWL_REQUIREMENT_ENV_VAR, + CWL_REQUIREMENT_INIT_WORKDIR, + CWL_REQUIREMENT_RESOURCE, + CWL_REQUIREMENT_SCATTER, + ] ProcessSchemaType = Literal[ProcessSchema.OGC, ProcessSchema.OLD] WPS_ComplexType = Literal[WPS_COMPLEX, WPS_COMPLEX_DATA, WPS_REFERENCE] WPS_DataType = Union[Literal[WPS_LITERAL, WPS_BOUNDINGBOX], WPS_ComplexType] diff --git a/weaver/processes/execution.py b/weaver/processes/execution.py index 05de0be6e..7f7172045 100644 --- a/weaver/processes/execution.py +++ b/weaver/processes/execution.py @@ -4,7 +4,9 @@ from typing import TYPE_CHECKING import colander +import psutil from celery.exceptions import TimeoutError as CeleryTaskTimeoutError +from celery.utils.debug import ps as get_celery_process from celery.utils.log import get_task_logger from owslib.util import clean_ows_url from owslib.wps import ComplexDataInput @@ -24,6 +26,7 @@ from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory, map_status from weaver.store.base import StoreJobs, StoreProcesses from weaver.utils import ( + apply_number_with_unit, as_int, fully_qualified_name, get_any_id, @@ -32,6 +35,7 @@ get_registry, get_settings, now, + parse_number_with_unit, parse_prefer_header_execute_mode, raise_on_xml_exception, wait_secs @@ -42,6 +46,7 @@ get_wps_client, get_wps_local_status_location, get_wps_output_context, + get_wps_output_dir, get_wps_output_path, get_wps_output_url, load_pywps_config @@ -54,14 +59,22 @@ from uuid import UUID from typing import Dict, List, Optional, Tuple, Union - from celery.task import Task + from celery.app.task import Task from pyramid.request import Request from pywps.inout.inputs import ComplexInput from weaver.datatype import Job from weaver.processes.convert import OWS_Input_Type, ProcessOWS from weaver.status import StatusType - from weaver.typedefs import AnyResponseType, CeleryResult, HeadersType, HeaderCookiesType, JSON, SettingsType + from weaver.typedefs import ( + JSON, + AnyResponseType, + CeleryResult, + HeadersType, + HeaderCookiesType, + SettingsType, + Statistics + ) from weaver.visibility import AnyVisibility @@ -84,7 +97,7 @@ class JobProgress(object): @app.task(bind=True) -def execute_process(self, job_id, wps_url, headers=None): +def execute_process(task, job_id, wps_url, headers=None): # type: (Task, UUID, str, Optional[HeadersType]) -> StatusType """ Celery task that executes the WPS process job monitoring as status updates (local and remote). @@ -93,10 +106,11 @@ def execute_process(self, job_id, wps_url, headers=None): LOGGER.debug("Job execute process called.") - # reset the connection because we are in a forked celery process + task_process = get_celery_process() + rss_start = task_process.memory_info().rss registry = get_registry(None) # local thread, whether locally or dispatched celery settings = get_settings(registry) - db = get_db(registry, reset_connection=True) + db = get_db(registry, reset_connection=True) # reset the connection because we are in a forked celery process store = db.get_store(StoreJobs) job = store.fetch_by_id(job_id) job.started = now() @@ -108,7 +122,7 @@ def execute_process(self, job_id, wps_url, headers=None): job.save_log(logger=task_logger, message="Job task setup initiated.") load_pywps_config(settings) job.progress = JobProgress.SETUP - job.task_id = self.request.id + job.task_id = task.request.id job.save_log(logger=task_logger, message="Job task setup completed.") job = store.update_job(job) @@ -250,6 +264,7 @@ def execute_process(self, job_id, wps_url, headers=None): if task_terminated and map_status(job.status) == Status.FAILED: job.status = Status.DISMISSED task_success = map_status(job.status) not in JOB_STATUS_CATEGORIES[StatusCategory.FAILED] + collect_statistics(task_process, settings, job, rss_start) if task_success: job.progress = JobProgress.EXECUTE_MONITOR_END job.status_message = f"Job {job.status}." @@ -271,6 +286,82 @@ def execute_process(self, job_id, wps_url, headers=None): return job.status +def collect_statistics(process, settings=None, job=None, rss_start=None): + # type: (Optional[psutil.Process], Optional[SettingsType], Optional[Job], Optional[int]) -> Optional[Statistics] + """ + Collect any available execution statistics and store them in the :term:`Job` if provided. + """ + try: + mem_used = None + if job: + mem_info = list(filter(lambda line: "cwltool" in line and "memory used" in line, job.logs)) + mem_used = None + if mem_info: + mem_info = mem_info[0].split(":")[-1].strip() + mem_used = parse_number_with_unit(mem_info, binary=True) + + stats = {} # type: JSON + if mem_used: + stats["application"] = { + # see: 'cwltool.job.JobBase.process_monitor', reported memory in logs uses 'rss' + "usedMemory": apply_number_with_unit(mem_used, binary=True), + "usedMemoryBytes": mem_used, + } + + rss = None + if process: + proc_info = process.memory_full_info() + rss = getattr(proc_info, "rss", 0) + uss = getattr(proc_info, "uss", 0) + vms = getattr(proc_info, "vms", 0) + stats["process"] = { + "rss": apply_number_with_unit(rss, binary=True), + "rssBytes": rss, + "uss": apply_number_with_unit(uss, binary=True), + "ussBytes": uss, + "vms": apply_number_with_unit(vms, binary=True), + "vmsBytes": vms, + } + fields = [("usedThreads", "num_threads"), ("usedCPU", "cpu_num"), ("usedHandles", "num_handles")] + for field, method in fields: + func = getattr(process, method, None) + stats["process"][field] = func() if func is not None else 0 + + if rss_start and rss: + # diff of RSS between start/end to consider only execution of the job steps + # this more accurately reports used memory by the execution itself, omitting celery worker's base memory + rss_diff = rss - rss_start + stats["process"]["usedMemory"] = apply_number_with_unit(rss_diff, binary=True) + stats["process"]["usedMemoryBytes"] = rss_diff + + total_size = 0 + if job: + stats["outputs"] = {} + for result in job.results: + res_ref = get_any_value(result, file=True) + if res_ref and isinstance(res_ref, str): + if res_ref.startswith(f"/{job.id}"): # pseudo-relative reference + out_dir = get_wps_output_dir(settings) + res_ref = os.path.join(out_dir, res_ref[1:]) + if os.path.isfile(res_ref): + res_stat = os.stat(res_ref) + res_id = get_any_id(result) + res_size = res_stat.st_size + stats["outputs"][res_id] = { + "size": apply_number_with_unit(res_size, binary=True), + "sizeBytes": res_size, + } + total_size += res_size + stats["process"]["totalSize"] = apply_number_with_unit(total_size, binary=True) + stats["process"]["totalSizeBytes"] = total_size + + if stats and job: + job.statistics = stats + return stats or None + except Exception as exc: + LOGGER.warning("Ignoring error that occurred during statistics collection [%s]", str(exc), exc_info=exc) + + def fetch_wps_process(job, wps_url, headers, settings): # type: (Job, str, HeadersType, SettingsType) -> ProcessOWS """ diff --git a/weaver/processes/wps_package.py b/weaver/processes/wps_package.py index e5f25f86e..8ad8048a2 100644 --- a/weaver/processes/wps_package.py +++ b/weaver/processes/wps_package.py @@ -58,6 +58,7 @@ CWL_REQUIREMENT_APP_TYPES, CWL_REQUIREMENT_APP_WPS1, CWL_REQUIREMENT_ENV_VAR, + CWL_REQUIREMENT_RESOURCE, CWL_REQUIREMENTS_SUPPORTED, WPS_INPUT, WPS_OUTPUT @@ -125,7 +126,9 @@ AnyValueType, CWL, CWL_AnyRequirements, + CWL_Requirement, CWL_RequirementsDict, + CWL_RequirementNames, CWL_RequirementsList, CWL_Results, CWL_ToolPathObjectType, @@ -590,14 +593,24 @@ def _generate_process_with_cwl_from_reference(reference): return cwl_package, process_info -def get_application_requirement(package): - # type: (CWL) -> Dict[str, Any] +def get_application_requirement(package, search=None, default=None, validate=True): + # type: (CWL, Optional[CWL_RequirementNames], Optional[Any], bool) -> Union[CWL_Requirement, Any] """ - Retrieve the principal requirement that allows mapping to the appropriate process implementation. - - Obtains the first item in `CWL` package ``requirements`` or ``hints`` that corresponds to a `Weaver`-specific - application type as defined in :py:data:`CWL_REQUIREMENT_APP_TYPES`. - + Retrieves a requirement or hint from the :term:`CWL` package definition. + + If no filter is specified (default), retrieve the *principal* requirement that allows mapping to the appropriate + :term:`Process` implementation. Obtains the first item in :term:`CWL` package ``requirements`` or ``hints`` + that corresponds to a `Weaver`-specific application type as defined in :py:data:`CWL_REQUIREMENT_APP_TYPES`. + If a filter is provided, this specific requirement or hint is looked for instead. + Regardless of the applied filter, only a unique item can be matched across requirements/hints containers, and + within a same container in case of listing representation to avoid ambiguity. When requirements/hints validation + is enabled, all requirements must also be defined amongst :data:`CWL_REQUIREMENTS_SUPPORTED` for the :term:`CWL` + package to be considered valid. + + :param package: CWL definition to parse. + :param search: Specific requirement/hint name to search and retrieve the definition if available. + :param default: Default value to return if no match was found. If ``None``, returns an empty ``{"class": ""}``. + :param validate: Validate supported requirements/hints definition while extracting requested one. :returns: dictionary that minimally has ``class`` field, and optionally other parameters from that requirement. """ # package can define requirements and/or hints, @@ -606,17 +619,22 @@ def get_application_requirement(package): reqs = package.get("requirements", {}) hints = package.get("hints", {}) all_hints = _get_package_requirements_as_class_list(reqs) + _get_package_requirements_as_class_list(hints) - app_hints = list(filter(lambda h: any(h["class"].endswith(t) for t in CWL_REQUIREMENT_APP_TYPES), all_hints)) + if search: + app_hints = list(filter(lambda h: h == search, all_hints)) + else: + app_hints = list(filter(lambda h: any(h["class"].endswith(t) for t in CWL_REQUIREMENT_APP_TYPES), all_hints)) if len(app_hints) > 1: raise ValueError( f"Package 'requirements' and/or 'hints' define too many conflicting values: {list(app_hints)}, " f"only one permitted amongst {list(CWL_REQUIREMENT_APP_TYPES)}." ) - requirement = app_hints[0] if app_hints else {"class": ""} + req_default = default if default is not None else {"class": ""} + requirement = app_hints[0] if app_hints else req_default - cwl_supported_reqs = list(CWL_REQUIREMENTS_SUPPORTED) - if not all(item.get("class") in cwl_supported_reqs for item in all_hints): - raise PackageTypeError(f"Invalid requirement, the requirements supported are {cwl_supported_reqs}") + if validate: + cwl_supported_reqs = list(CWL_REQUIREMENTS_SUPPORTED) + if not all(item.get("class") in cwl_supported_reqs for item in all_hints): + raise PackageTypeError(f"Invalid requirement, the requirements supported are {cwl_supported_reqs}") return requirement @@ -1053,6 +1071,7 @@ def setup_runtime(self): # cwltool will add additional unique characters after prefix paths cwl_workdir = os.path.join(wps_workdir, "cwltool_tmp_") cwl_outdir = os.path.join(wps_workdir, "cwltool_out_") + res_req = get_application_requirement(self.package, CWL_REQUIREMENT_RESOURCE, default={}, validate=False) runtime_params = { # force explicit staging if write needed (InitialWorkDirRequirement in CWL package) # protect input paths that can be re-used to avoid potential in-place modifications @@ -1065,7 +1084,13 @@ def setup_runtime(self): "tmp_outdir_prefix": cwl_outdir, # ask CWL to move tmp outdir results to the WPS process workdir (otherwise we loose them on cleanup) "outdir": self.workdir, - "debug": self.logger.isEnabledFor(logging.DEBUG) + "debug": self.logger.isEnabledFor(logging.DEBUG), + # when process is a docker image, memory monitoring information is obtained with CID file + # this file is only generated when the below command is explicitly None (not even when '') + "user_space_docker_cmd": None, + # if 'ResourceRequirement' is specified to limit RAM usage, below must be added to ensure it is applied + # but don't enable it otherwise, since some defaults are applied which could break existing processes + "strict_memory_limit": bool(res_req), } return runtime_params diff --git a/weaver/quotation/estimation.py b/weaver/quotation/estimation.py index 076da4812..bbb81a94f 100644 --- a/weaver/quotation/estimation.py +++ b/weaver/quotation/estimation.py @@ -14,7 +14,7 @@ from weaver.utils import fully_qualified_name, get_settings, request_extra, wait_secs if TYPE_CHECKING: - from celery.task import Task + from celery.app.task import Task from weaver.datatype import Process, Quote from weaver.quotation.status import AnyQuoteStatus diff --git a/weaver/store/base.py b/weaver/store/base.py index c2cc1f423..a578107bf 100644 --- a/weaver/store/base.py +++ b/weaver/store/base.py @@ -144,7 +144,7 @@ def update_job(self, job): @abc.abstractmethod def delete_job(self, job_id): - # type: (str) -> bool + # type: (AnyUUID) -> bool raise NotImplementedError @abc.abstractmethod diff --git a/weaver/store/mongodb.py b/weaver/store/mongodb.py index 1e0ebd604..6a5a6a684 100644 --- a/weaver/store/mongodb.py +++ b/weaver/store/mongodb.py @@ -648,12 +648,14 @@ def update_job(self, job): raise JobUpdateError(f"Failed to update specified job: '{job!s}'") def delete_job(self, job_id): - # type: (str) -> bool + # type: (AnyUUID) -> bool """ Removes job from `MongoDB` storage. """ - self.collection.delete_one({"id": job_id}) - return True + if isinstance(job_id, str): + job_id = uuid.UUID(job_id) + result = self.collection.delete_one({"id": job_id}) + return result.deleted_count == 1 def fetch_by_id(self, job_id): # type: (AnyUUID) -> Job diff --git a/weaver/typedefs.py b/weaver/typedefs.py index 4fed0dab4..ee9edaca5 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -2,10 +2,14 @@ if TYPE_CHECKING: import os + import sys import typing import uuid from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Type, Union + + import psutil + if hasattr(typing, "TypedDict"): from typing import TypedDict # pylint: disable=E0611,no-name-in-module # Python >= 3.8 else: @@ -27,6 +31,28 @@ else: FileSystemPathType = str + MemoryInfo = Any + if sys.platform == "win32": + try: + MemoryInfo = psutil._psutil_windows._pfullmem # noqa: W0212 + except (AttributeError, ImportError, NameError): + pass + if MemoryInfo is Any: + try: + MemoryInfo = psutil._pslinux.pfullmem # noqa: W0212 + except (AttributeError, ImportError, NameError): + pass + if MemoryInfo is Any: + if TypedDict is Dict: + MemoryInfo = Dict + else: + MemoryInfo = TypedDict("MemoryInfo", { + "rss": int, + "uss": int, + "vms": int, + }, total=False) + TimesCPU = psutil._common.pcputimes # noqa: W0212 + from celery.app import Celery from celery.result import AsyncResult, EagerResult, GroupResult, ResultSet from owslib.wps import BoundingBoxDataInput, ComplexDataInput, Process as ProcessOWS, WPSExecution @@ -46,6 +72,7 @@ from webtest.response import TestResponse from werkzeug.wrappers import Request as WerkzeugRequest + from weaver.processes.constants import CWL_RequirementNames from weaver.processes.wps_process_base import WpsProcessInterface from weaver.datatype import Process from weaver.status import AnyStatusType @@ -61,9 +88,9 @@ _JSON: TypeAlias = "JSON" _JsonObjectItemAlias: TypeAlias = "_JsonObjectItem" _JsonListItemAlias: TypeAlias = "_JsonListItem" - _JsonObjectItem = Dict[str, Union[_JSON, _JsonListItemAlias]] - _JsonListItem = List[Union[AnyValueType, _JsonObjectItem, _JsonListItemAlias, _JSON]] - _JsonItem = Union[AnyValueType, _JsonObjectItem, _JsonListItem] + _JsonObjectItem = Dict[str, Union[_JSON, _JsonObjectItemAlias, _JsonListItemAlias]] + _JsonListItem = List[Union[AnyValueType, _JsonObjectItem, _JsonListItemAlias]] + _JsonItem = Union[AnyValueType, _JsonObjectItem, _JsonListItem, _JSON] JSON = Union[Dict[str, _JsonItem], List[_JsonItem], AnyValueType] Link = TypedDict("Link", { @@ -123,8 +150,10 @@ }, total=False) CWL_Inputs = Union[List[CWL_Input_Type], Dict[str, CWL_Input_Type]] CWL_Outputs = Union[List[CWL_Output_Type], Dict[str, CWL_Output_Type]] - CWL_Requirement = TypedDict("CWL_Requirement", {"class": str}, total=False) # includes 'hints' - CWL_RequirementsDict = Dict[str, Dict[str, str]] # {'': {: }} + + # 'requirements' includes 'hints' + CWL_Requirement = TypedDict("CWL_Requirement", {"class": CWL_RequirementNames}, total=False) # type: ignore + CWL_RequirementsDict = Dict[CWL_RequirementNames, Dict[str, str]] # {'': {: }} CWL_RequirementsList = List[CWL_Requirement] # [{'class': , : }] CWL_AnyRequirements = Union[CWL_RequirementsDict, CWL_RequirementsList] # results from CWL execution @@ -330,6 +359,36 @@ def __call__(self, message: str, progress: Number, status: AnyStatusType, *args: "outputs": JobOutputs, }) + # job execution statistics + ApplicationStatistics = TypedDict("ApplicationStatistics", { + "usedMemory": str, + "usedMemoryBytes": int, + }, total=True) + ProcessStatistics = TypedDict("ProcessStatistics", { + "rss": str, + "rssBytes": int, + "uss": str, + "ussBytes": int, + "vms": str, + "vmsBytes": int, + "usedThreads": int, + "usedCPU": int, + "usedHandles": int, + "usedMemory": str, + "usedMemoryBytes": int, + "totalSize": str, + "totalSizeBytes": int, + }, total=False) + OutputStatistics = TypedDict("OutputStatistics", { + "size": str, + "sizeBytes": int, + }, total=True) + Statistics = TypedDict("Statistics", { + "application": Optional[ApplicationStatistics], + "process": Optional[ProcessStatistics], + "outputs": Dict[str, OutputStatistics], + }, total=False) + CeleryResult = Union[AsyncResult, EagerResult, GroupResult, ResultSet] # simple/partial definitions of OpenAPI schema diff --git a/weaver/utils.py b/weaver/utils.py index 000a86206..9beb3d85c 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -89,6 +89,34 @@ FILE_NAME_QUOTE_PATTERN = re.compile(r"^\"?([\w\-.]+\.\w+)\"?$") # extension required, permissive extra quotes FILE_NAME_LOOSE_PATTERN = re.compile(r"^[\w\-.]+$") # no extension required + +class CaseInsensitive(str): + __str = None + + def __init__(self, _str): + # type: (str) -> None + self.__str = _str + super(CaseInsensitive, self).__init__() + + def __hash__(self): + return hash(self.__str) + + def __str__(self): + return self.__str + + def __repr__(self): + return f"CaseInsensitive({self.__str})" + + def __eq__(self, other): + # type: (Any) -> bool + return self.__str.casefold() == str(other).casefold() + + +NUMBER_PATTERN = re.compile(r"^(?P[+-]?[0-9]+[.]?[0-9]*([e][+-]?[0-9]+)?)\s*(?P.*)$") +UNIT_SI_POWER_UP = [CaseInsensitive("k"), "M", "G", "T", "P", "E", "Z", "Y"] # allow upper 'K' often used +UNIT_SI_POWER_DOWN = ["m", "ยต", "n", "p", "f", "a", "z", "y"] +UNIT_BIN_POWER = ["Ki", "Mi", "Gi", "Ti", "Pi", "Ei", "Zi", "Yi"] + UUID_PATTERN = re.compile(colander.UUID_REGEX, re.IGNORECASE) @@ -1755,3 +1783,91 @@ def generate_diff(val, ref, val_name="Test", ref_name="Reference"): val = val.splitlines() ref = ref.splitlines() return "\n".join(difflib.context_diff(val, ref, fromfile=val_name, tofile=ref_name)) + + +def apply_number_with_unit(number, unit="", binary=False, decimals=3): + # type: (Number, str, bool, int) -> str + """ + Apply the relevant unit and prefix factor to the specified number to create a human-readable value. + + :param number: Numeric value with no unit. + :param unit: Unit to be applied. Auto-resolved to 'B' if binary requested. Factor applied accordingly to number. + :param binary: Use binary multiplier (powers of 2) instead of SI decimal multipliers (powers of 10). + :param decimals: Number of decimals to preserve after unit is applied. + :return: String of the numeric value with appropriate unit. + """ + multiplier = 1024. if binary else 1000. + unit = "B" if binary else unit + factor = "" + ratio = 1.0 + negative = number < 0 + number = -1. * number if negative else number + if number == 0: + pass + elif number > multiplier: + for exp, factor in enumerate(UNIT_SI_POWER_UP, start=1): + if not (number / float(multiplier ** exp)) >= (multiplier - 1.): + ratio = float(multiplier ** -exp) + break + else: + ratio = float(multiplier ** -len(UNIT_SI_POWER_UP)) + elif number < multiplier: + for exp, factor in enumerate([""] + UNIT_SI_POWER_DOWN, start=0): + if (number * float(multiplier ** exp)) >= 1.: + ratio = float(multiplier ** exp) + break + else: + ratio = float(multiplier ** len(UNIT_SI_POWER_DOWN)) + factor = f"{factor}i" if factor and binary else factor + factor = f" {factor}" if factor or unit else "" + value = (-1. if negative else 1.) * number * ratio + return f"{value:.{decimals}f}{factor}{unit}" + + +def parse_number_with_unit(number, binary=None): + # type: (str, Optional[bool]) -> Number + """ + Parses a numeric value accompanied with a unit to generate the unit-less value without prefix factor. + + :param number: + Numerical value and unit. Unit is dissociated from value with first non-numerical match. + Unit is assumed to be present (not only the multiplier by itself). + This is important to avoid confusion (e.g.: ``m`` used for meters vs ``m`` prefix for "milli"). + :param binary: + Force use (``True``) or non-use (``False``) of binary multiplier (powers of 2) instead of + SI decimal multipliers (powers of 10) for converting value (with applicable unit multiplier if available). + If unspecified (``None``), auto-detect from unit (e.g.: powers of 2 for ``MiB``, powers of 10 for ``MB``). + When unspecified, the ``B`` character is used to auto-detect if binary should apply, SI multipliers are + otherwise assumed. + :return: Literal value. + """ + try: + num = re.match(NUMBER_PATTERN, number) + grp = num.groupdict() + f_val = float(num["number"]) + unit = grp["unit"] + multiplier = 1 + as_bin = False + if unit: + as_bin = binary is None and unit[-1] == "B" + is_bin = unit[:2] in UNIT_BIN_POWER + is_num = unit[0] in UNIT_SI_POWER_UP + if is_bin: + factor = UNIT_BIN_POWER.index(unit[:2]) + 1 + elif is_num: + factor = UNIT_SI_POWER_UP.index(unit[:1]) + 1 + else: + factor = 0 + if binary or as_bin: + multiplier = 2 ** (factor * 10) + else: + multiplier = 10 ** (factor * 3) + f_val = f_val * multiplier + if binary or as_bin: + val = int(f_val + 0.5) # round up + else: + i_val = int(f_val) + val = i_val if i_val == f_val else f_val + except (AttributeError, KeyError, ValueError, TypeError): + raise ValueError(f"Invalid number with optional unit string could not be parsed: [{number!s}]") + return val diff --git a/weaver/wps_restapi/examples/job_statistics.json b/weaver/wps_restapi/examples/job_statistics.json new file mode 100644 index 000000000..366046bd9 --- /dev/null +++ b/weaver/wps_restapi/examples/job_statistics.json @@ -0,0 +1,27 @@ +{ + "application": { + "usedMemory": "3.000 MiB", + "usedMemoryBytes": 3145728 + }, + "process": { + "rss": "139.531 MiB", + "rssBytes": 146309120, + "uss": "84.535 MiB", + "ussBytes": 88641536, + "vms": "1.388 GiB", + "vmsBytes": 1490432000, + "usedThreads": 11, + "usedCPU": 5, + "usedHandles": 0, + "usedMemory": "13.734 MiB", + "usedMemoryBytes": 14401536, + "totalSize": "3.000 B", + "totalSizeBytes": 3 + }, + "outputs": { + "output": { + "size": "3.000 B", + "sizeBytes": 3 + } + } +} diff --git a/weaver/wps_restapi/jobs/__init__.py b/weaver/wps_restapi/jobs/__init__.py index bb19334fb..f76d00129 100644 --- a/weaver/wps_restapi/jobs/__init__.py +++ b/weaver/wps_restapi/jobs/__init__.py @@ -17,6 +17,7 @@ def includeme(config): config.add_route(**sd.service_api_route_info(sd.job_inputs_service, settings)) config.add_route(**sd.service_api_route_info(sd.job_exceptions_service, settings)) config.add_route(**sd.service_api_route_info(sd.job_logs_service, settings)) + config.add_route(**sd.service_api_route_info(sd.job_stats_service, settings)) config.add_route(**sd.service_api_route_info(sd.provider_job_service, settings)) config.add_route(**sd.service_api_route_info(sd.provider_jobs_service, settings)) config.add_route(**sd.service_api_route_info(sd.provider_results_service, settings)) @@ -24,6 +25,7 @@ def includeme(config): config.add_route(**sd.service_api_route_info(sd.provider_inputs_service, settings)) config.add_route(**sd.service_api_route_info(sd.provider_exceptions_service, settings)) config.add_route(**sd.service_api_route_info(sd.provider_logs_service, settings)) + config.add_route(**sd.service_api_route_info(sd.provider_stats_service, settings)) config.add_route(**sd.service_api_route_info(sd.process_jobs_service, settings)) config.add_route(**sd.service_api_route_info(sd.process_job_service, settings)) config.add_route(**sd.service_api_route_info(sd.process_results_service, settings)) @@ -31,6 +33,7 @@ def includeme(config): config.add_route(**sd.service_api_route_info(sd.process_inputs_service, settings)) config.add_route(**sd.service_api_route_info(sd.process_exceptions_service, settings)) config.add_route(**sd.service_api_route_info(sd.process_logs_service, settings)) + config.add_route(**sd.service_api_route_info(sd.process_stats_service, settings)) # backward compatibility routes (deprecated) config.add_route(**sd.service_api_route_info(sd.job_result_service, settings)) @@ -91,6 +94,12 @@ def includeme(config): request_method="GET", renderer=OutputFormat.JSON) config.add_view(j.get_job_logs, route_name=sd.process_logs_service.name, request_method="GET", renderer=OutputFormat.JSON) + config.add_view(j.get_job_stats, route_name=sd.job_stats_service.name, + request_method="GET", renderer=OutputFormat.JSON) + config.add_view(j.get_job_stats, route_name=sd.provider_stats_service.name, + request_method="GET", renderer=OutputFormat.JSON) + config.add_view(j.get_job_stats, route_name=sd.process_stats_service.name, + request_method="GET", renderer=OutputFormat.JSON) config.add_view(j.redirect_job_result, route_name=sd.job_result_service.name, request_method="GET", renderer=OutputFormat.JSON) config.add_view(j.redirect_job_result, route_name=sd.process_result_service.name, diff --git a/weaver/wps_restapi/jobs/jobs.py b/weaver/wps_restapi/jobs/jobs.py index e05baa3c9..21948df65 100644 --- a/weaver/wps_restapi/jobs/jobs.py +++ b/weaver/wps_restapi/jobs/jobs.py @@ -7,9 +7,10 @@ from notify import encrypt_email from weaver.database import get_db from weaver.datatype import Job -from weaver.exceptions import JobNotFound, log_unhandled_exceptions +from weaver.exceptions import JobNotFound, JobStatisticsNotFound, log_unhandled_exceptions from weaver.formats import ContentType, OutputFormat, add_content_type_charset, guess_target_format, repr_json from weaver.processes.convert import convert_input_values_schema, convert_output_params_schema +from weaver.status import JOB_STATUS_CATEGORIES, Status, StatusCategory from weaver.store.base import StoreJobs from weaver.utils import get_settings from weaver.wps_restapi import swagger_definitions as sd @@ -276,10 +277,10 @@ def get_job_results(request): @sd.provider_exceptions_service.get(tags=[sd.TAG_JOBS, sd.TAG_EXCEPTIONS, sd.TAG_PROVIDERS], renderer=OutputFormat.JSON, schema=sd.ProviderExceptionsEndpoint(), response_schemas=sd.get_prov_exceptions_responses) -@sd.job_exceptions_service.get(tags=[sd.TAG_JOBS, sd.TAG_EXCEPTIONS], renderer=OutputFormat.JSON, - schema=sd.JobExceptionsEndpoint(), response_schemas=sd.get_exceptions_responses) @sd.process_exceptions_service.get(tags=[sd.TAG_JOBS, sd.TAG_EXCEPTIONS, sd.TAG_PROCESSES], renderer=OutputFormat.JSON, schema=sd.ProcessExceptionsEndpoint(), response_schemas=sd.get_exceptions_responses) +@sd.job_exceptions_service.get(tags=[sd.TAG_JOBS, sd.TAG_EXCEPTIONS], renderer=OutputFormat.JSON, + schema=sd.JobExceptionsEndpoint(), response_schemas=sd.get_exceptions_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_job_exceptions(request): # type: (PyramidRequest) -> AnyResponseType @@ -294,10 +295,10 @@ def get_job_exceptions(request): @sd.provider_logs_service.get(tags=[sd.TAG_JOBS, sd.TAG_LOGS, sd.TAG_PROVIDERS], renderer=OutputFormat.JSON, schema=sd.ProviderLogsEndpoint(), response_schemas=sd.get_prov_logs_responses) -@sd.job_logs_service.get(tags=[sd.TAG_JOBS, sd.TAG_LOGS], renderer=OutputFormat.JSON, - schema=sd.JobLogsEndpoint(), response_schemas=sd.get_logs_responses) @sd.process_logs_service.get(tags=[sd.TAG_JOBS, sd.TAG_LOGS, sd.TAG_PROCESSES], renderer=OutputFormat.JSON, schema=sd.ProcessLogsEndpoint(), response_schemas=sd.get_logs_responses) +@sd.job_logs_service.get(tags=[sd.TAG_JOBS, sd.TAG_LOGS], renderer=OutputFormat.JSON, + schema=sd.JobLogsEndpoint(), response_schemas=sd.get_logs_responses) @log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) def get_job_logs(request): # type: (PyramidRequest) -> AnyResponseType @@ -318,6 +319,41 @@ def get_job_logs(request): return HTTPOk(json=logs) +@sd.provider_stats_service.get(tags=[sd.TAG_JOBS, sd.TAG_STATISTICS, sd.TAG_PROVIDERS], renderer=OutputFormat.JSON, + schema=sd.ProviderJobStatisticsEndpoint(), response_schemas=sd.get_prov_stats_responses) +@sd.process_stats_service.get(tags=[sd.TAG_JOBS, sd.TAG_STATISTICS, sd.TAG_PROCESSES], renderer=OutputFormat.JSON, + schema=sd.ProcessJobStatisticsEndpoint(), response_schemas=sd.get_stats_responses) +@sd.job_stats_service.get(tags=[sd.TAG_JOBS, sd.TAG_STATISTICS], renderer=OutputFormat.JSON, + schema=sd.JobStatisticsEndpoint(), response_schemas=sd.get_stats_responses) +@log_unhandled_exceptions(logger=LOGGER, message=sd.InternalServerErrorResponseSchema.description) +def get_job_stats(request): + # type: (PyramidRequest) -> AnyResponseType + """ + Retrieve the statistics of a job. + """ + job = get_job(request) + raise_job_dismissed(job, request) + if job.status not in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED] or job.status != Status.SUCCEEDED: + raise JobStatisticsNotFound(json={ + "title": "NoJobStatistics", + "type": "no-job-statistics", # unofficial + "detail": "Job statistics are only available for completed and successful jobs.", + "status": JobStatisticsNotFound.code, + "cause": {"status": job.status}, + }) + stats = job.statistics + if not stats: # backward compatibility for existing jobs before feature was added + raise JobStatisticsNotFound(json={ + "title": "NoJobStatistics", + "type": "no-job-statistics", # unofficial + "detail": "Job statistics were not collected for this execution.", + "status": JobStatisticsNotFound.code, + "cause": "Empty statistics." + }) + body = sd.JobStatisticsSchema().deserialize(stats) + return HTTPOk(json=body) + + @sd.provider_result_service.get(tags=[sd.TAG_JOBS, sd.TAG_RESULTS, sd.TAG_PROVIDERS, sd.TAG_DEPRECATED], renderer=OutputFormat.JSON, schema=sd.ProviderResultEndpoint(), response_schemas=sd.get_result_redirect_responses) diff --git a/weaver/wps_restapi/jobs/utils.py b/weaver/wps_restapi/jobs/utils.py index ba36ec1f0..ca987cdfc 100644 --- a/weaver/wps_restapi/jobs/utils.py +++ b/weaver/wps_restapi/jobs/utils.py @@ -39,7 +39,6 @@ get_any_id, get_any_value, get_file_headers, - get_header, get_path_kvp, get_settings, get_weaver_url, @@ -487,7 +486,6 @@ def get_job_submission_response(body, headers, error=False): :func:`weaver.processes.execution.submit_job_handler` """ status = map_status(body.get("status")) - location = get_header("location", headers) if status in JOB_STATUS_CATEGORIES[StatusCategory.FINISHED]: if error: http_class = HTTPBadRequest @@ -498,11 +496,11 @@ def get_job_submission_response(body, headers, error=False): body = sd.CompletedJobStatusSchema().deserialize(body) body["description"] = http_desc - return http_class(location=location, json=body, headers=headers) + return http_class(json=body, headers=headers) body["description"] = sd.CreatedLaunchJobResponse.description body = sd.CreatedJobStatusSchema().deserialize(body) - return HTTPCreated(location=location, json=body, headers=headers) + return HTTPCreated(json=body, headers=headers) def validate_service_process(request): diff --git a/weaver/wps_restapi/swagger_definitions.py b/weaver/wps_restapi/swagger_definitions.py index 0bdde125d..f47fc3405 100644 --- a/weaver/wps_restapi/swagger_definitions.py +++ b/weaver/wps_restapi/swagger_definitions.py @@ -197,6 +197,7 @@ TAG_RESULTS = "Results" TAG_EXCEPTIONS = "Exceptions" TAG_LOGS = "Logs" +TAG_STATISTICS = "Statistics" TAG_VAULT = "Vault" TAG_WPS = "WPS" TAG_DEPRECATED = "Deprecated Endpoints" @@ -226,6 +227,7 @@ job_outputs_service = Service(name="job_outputs", path=job_service.path + "/outputs") job_inputs_service = Service(name="job_inputs", path=job_service.path + "/inputs") job_logs_service = Service(name="job_logs", path=job_service.path + "/logs") +job_stats_service = Service(name="job_stats", path=job_service.path + "/statistics") processes_service = Service(name="processes", path="/processes") process_service = Service(name="process", path=processes_service.path + "/{process_id}") @@ -241,6 +243,7 @@ process_outputs_service = Service(name="process_outputs", path=process_service.path + job_outputs_service.path) process_exceptions_service = Service(name="process_exceptions", path=process_service.path + job_exceptions_service.path) process_logs_service = Service(name="process_logs", path=process_service.path + job_logs_service.path) +process_stats_service = Service(name="process_stats", path=process_service.path + job_stats_service.path) process_execution_service = Service(name="process_execution", path=process_service.path + "/execution") providers_service = Service(name="providers", path="/providers") @@ -253,6 +256,7 @@ provider_inputs_service = Service(name="provider_inputs", path=provider_service.path + process_inputs_service.path) provider_outputs_service = Service(name="provider_outputs", path=provider_service.path + process_outputs_service.path) provider_logs_service = Service(name="provider_logs", path=provider_service.path + process_logs_service.path) +provider_stats_service = Service(name="provider_stats", path=provider_service.path + process_stats_service.path) provider_exceptions_service = Service(name="provider_exceptions", path=provider_service.path + process_exceptions_service.path) provider_execution_service = Service(name="provider_execution", path=provider_service.path + "/execution") @@ -2633,6 +2637,18 @@ class ProcessLogsEndpoint(ProcessPath, JobPath): header = RequestHeaders() +class JobStatisticsEndpoint(JobPath): + header = RequestHeaders() + + +class ProcessJobStatisticsEndpoint(ProcessPath, JobPath): + header = RequestHeaders() + + +class ProviderJobStatisticsEndpoint(ProviderPath, ProcessPath, JobPath): + header = RequestHeaders() + + ################################################################## # These classes define schemas for requests that feature a body ################################################################## @@ -3969,6 +3985,46 @@ class JobLogsSchema(ExtendedSequenceSchema): log = ExtendedSchemaNode(String()) +class ApplicationStatisticsSchema(ExtendedMappingSchema): + mem = ExtendedSchemaNode(String(), name="usedMemory", example="10 MiB") + mem_bytes = ExtendedSchemaNode(Integer(), name="usedMemoryBytes", example=10485760) + + +class ProcessStatisticsSchema(ExtendedMappingSchema): + rss = ExtendedSchemaNode(String(), name="rss", example="140 MiB") + rss_bytes = ExtendedSchemaNode(Integer(), name="rssBytes", example=146800640) + uss = ExtendedSchemaNode(String(), name="uss", example="80 MiB") + uss_bytes = ExtendedSchemaNode(Integer(), name="ussBytes", example=83886080) + vms = ExtendedSchemaNode(String(), name="vms", example="1.4 GiB") + vms_bytes = ExtendedSchemaNode(Integer(), name="vmsBytes", example=1503238554) + used_threads = ExtendedSchemaNode(Integer(), name="usedThreads", example=10) + used_cpu = ExtendedSchemaNode(Integer(), name="usedCPU", example=2) + used_handles = ExtendedSchemaNode(Integer(), name="usedHandles", example=0) + mem = ExtendedSchemaNode(String(), name="usedMemory", example="10 MiB", + description="RSS memory employed by the job execution omitting worker memory.") + mem_bytes = ExtendedSchemaNode(Integer(), name="usedMemoryBytes", example=10485760, + description="RSS memory employed by the job execution omitting worker memory.") + total_size = ExtendedSchemaNode(String(), name="totalSize", example="10 MiB", + description="Total size to store job output files.") + total_size_bytes = ExtendedSchemaNode(Integer(), name="totalSizeBytes", example=10485760, + description="Total size to store job output files.") + + +class OutputStatisticsSchema(ExtendedMappingSchema): + size = ExtendedSchemaNode(String(), name="size", example="5 MiB") + size_bytes = ExtendedSchemaNode(Integer(), name="sizeBytes", example=5242880) + + +class OutputStatisticsMap(ExtendedMappingSchema): + output = OutputStatisticsSchema(variable="{output-id}", description="Spaced used by this output file.") + + +class JobStatisticsSchema(ExtendedMappingSchema): + application = ApplicationStatisticsSchema(missing=drop) + process = ProcessStatisticsSchema(missing=drop) + outputs = OutputStatisticsMap(missing=drop) + + class FrontpageParameterSchema(ExtendedMappingSchema): name = ExtendedSchemaNode(String(), example="api") enabled = ExtendedSchemaNode(Boolean(), example=True) @@ -4741,6 +4797,11 @@ class OkGetJobLogsResponse(ExtendedMappingSchema): body = JobLogsSchema() +class OkGetJobStatsResponse(ExtendedMappingSchema): + header = ResponseHeaders() + body = JobStatisticsSchema() + + class VaultFileID(UUID): description = "Vault file identifier." example = "78977deb-28af-46f3-876b-cdd272742678" @@ -5155,6 +5216,22 @@ class GoneVaultFileDownloadResponse(ExtendedMappingSchema): get_prov_logs_responses.update({ "403": ForbiddenProviderLocalResponseSchema(), }) +get_stats_responses = { + "200": OkGetJobStatsResponse(description="success", examples={ + "JobStatistics": { + "summary": "Job statistics collected following process execution.", + "value": EXAMPLES["job_statistics.json"], + } + }), + "400": InvalidJobResponseSchema(), + "404": NotFoundJobResponseSchema(), + "410": GoneJobResponseSchema(), + "500": InternalServerErrorResponseSchema(), +} +get_prov_stats_responses = copy(get_stats_responses) +get_prov_stats_responses.update({ + "403": ForbiddenProviderLocalResponseSchema(), +}) get_quote_list_responses = { "200": OkGetQuoteListResponse(description="success"), "500": InternalServerErrorResponseSchema(),