Skip to content

Commit

Permalink
Merge pull request #439 from crim-ca/res-monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
fmigneault authored May 30, 2022
2 parents 5ade958 + a1f0bc0 commit 274d85e
Show file tree
Hide file tree
Showing 24 changed files with 650 additions and 53 deletions.
8 changes: 7 additions & 1 deletion CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,16 @@ Changes

Changes:
--------
- No change.
- Add statistics collection at the end of `Job` execution to obtain used memory from ``celery`` process and spaced
used by produced results.
- Add ``/jobs/{jobID}/statistics`` endpoint (and corresponding locations for ``/providers`` and ``/processes``) to
report any collected statistics following a `Job` execution.

Fixes:
------
- Fix `Job` ``Location`` header injected twice in ``get_job_submission_response`` causing header to have comma-separated
list of URI values failing retrieval by `CLI` when attempting to perform auto-monitoring of the submitted `Job`.
- Fix `CWL` runtime context setup to return monitored maximum RAM used by application under the `Process` if possible.
- Fix failing `Service` provider summary response in case of unresponsive (not accessible or parsable) URL endpoint
contents due to different errors raised by distinct versions of ``requests`` package.

Expand Down
3 changes: 1 addition & 2 deletions docs/source/configuration.rst
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ they are optional and which default value or operation is applied in each situat
- | ``weaver.wps_workdir = <directory-path>``
| (default: uses automatically generated temporary directory if none specified)
|
| Prefix where process job worker should execute the process from.
| Prefix where process :term:`Job` worker should execute the :term:`Process` from.
- | ``weaver.wps_restapi = true|false``
| (default: ``true``)
Expand Down Expand Up @@ -413,4 +413,3 @@ Starting the Application

- need to start ``gunicorn/pserve`` (example `Dockerfile-manager`_)
- need to start ``celery`` worker (example `Dockerfile-worker`_)

1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ mako
# esgf-compute-api (cwt) needs oauthlib but doesn't add it in their requirements
oauthlib
owslib==0.25.0
psutil
# FIXME: pymongo>=4 breaks with kombu corresponding to pinned Celery (https://github.com/crim-ca/weaver/issues/386)
pymongo>=3.12.0,<4
# pyproj>=2 employed by OWSLib, but make requirements stricter
Expand Down
7 changes: 7 additions & 0 deletions tests/resources/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import os

from weaver import WEAVER_MODULE_DIR
from weaver.utils import load_file

RESOURCES_PATH = os.path.abspath(os.path.join(os.path.dirname(__file__), ""))
EXAMPLES_PATH = os.path.join(WEAVER_MODULE_DIR, "wps_restapi/examples")

GET_CAPABILITIES_TEMPLATE_URL = "{}?service=WPS&request=GetCapabilities&version=1.0.0"
DESCRIBE_PROCESS_TEMPLATE_URL = "{}?service=WPS&request=DescribeProcess&identifier={}&version=1.0.0"
Expand Down Expand Up @@ -38,6 +40,11 @@
)


def load_example(file_name):
file_path = os.path.join(EXAMPLES_PATH, file_name)
return load_file(file_path)


def load_resource(file_name):
file_path = os.path.join(RESOURCES_PATH, file_name)
return load_file(file_path)
60 changes: 60 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from weaver.status import JOB_STATUS_CATEGORIES, STATUS_PYWPS_IDS, STATUS_PYWPS_MAP, Status, StatusCompliant, map_status
from weaver.utils import (
NullType,
apply_number_with_unit,
assert_sane_name,
bytes2str,
fetch_file,
Expand All @@ -50,6 +51,7 @@
make_dirs,
null,
parse_kvp,
parse_number_with_unit,
parse_prefer_header_execute_mode,
pass_http_error,
request_extra,
Expand Down Expand Up @@ -811,3 +813,61 @@ def test_parse_kvp(query, params, expected):
def test_prefer_header_execute_mode(headers, support, expected):
result = parse_prefer_header_execute_mode(headers, support)
assert result == expected


@pytest.mark.parametrize("number,binary,unit,expect", [
(1.234, False, "B", "1.234 B"),
(10_000_000, False, "B", "10.000 MB"),
(10_000_000, True, "B", "9.537 MiB"),
(10_000_000_000, False, "B", "10.000 GB"),
(10_737_418_240, True, "B", "10.000 GiB"),
(10_000_000_000, True, "B", "9.313 GiB"),
(10**25, False, "", "10.000 Y"),
(10**25, True, "B", "8.272 YiB"),
(10**28, False, "", "10000.000 Y"), # last unit goes over bound
(10**-28, False, "s", "0.000 ys"), # out of bound, cannot represent smaller
(-10 * 10**3, False, "s", "-10.000 ks"), # negative & positive power
(-0.001234, False, "s", "-1.234 ms"), # negative & reducing power
(0, False, "", "0.000"),
(0.000, True, "", "0.000 B"),
])
def test_apply_number_with_unit(number, binary, unit, expect):
result = apply_number_with_unit(number, unit=unit, binary=binary)
assert result == expect


@pytest.mark.parametrize("number,binary,expect", [
("1 B", None, 1),
# note: 'k' lower
("1k", False, 1_000), # normal
("1kB", False, 1_000), # forced unmatched 'B'
("1kB", None, 1_024), # auto from 'B'
("1kB", True, 1_024), # forced but matches
# note: 'K' upper
("1K", False, 1_000), # normal
("1KB", False, 1_000), # forced unmatched 'B'
("1KB", None, 1_024), # auto from 'B'
("1KB", True, 1_024), # forced but matches
# normal
("1KiB", True, 1_024), # forced but matches
("1KiB", None, 1_024), # normal
("1KiB", False, 1_000), # forced unmatched 'B'
("1G", False, 1_000_000_000), # normal
("1GB", False, 1_000_000_000), # forced unmatched 'B'
("1GB", None, 1_073_741_824), # auto from 'B'
("1GB", True, 1_073_741_824), # forced but matches
("1GiB", True, 1_073_741_824), # forced but matches
("1GiB", None, 1_073_741_824), # normal
("1GiB", False, 1_000_000_000), # forced unmatched 'B'
# rounding expected for binary (ie: 1 x 2^30 + 400 x 2^20 for accurate result)
# if not rounded, converting causes floating remainder (1.4 x 2^30 = 1503238553.6)
("1.4GiB", True, 1_503_238_554),
])
def test_parse_number_with_unit(number, binary, expect):
result = parse_number_with_unit(number, binary=binary)
assert result == expect


def test_parse_number_with_unit_error():
with pytest.raises(ValueError):
parse_number_with_unit(123) # noqa
3 changes: 3 additions & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import os
import re
import subprocess
import sys
import tempfile
import uuid
import warnings
Expand Down Expand Up @@ -342,6 +343,8 @@ def run_command(command, trim=True, expect_error=False, entrypoint=None):
command = [str(arg) for arg in command]
if entrypoint is None:
out, _ = subprocess.Popen(["which", "python"], universal_newlines=True, stdout=subprocess.PIPE).communicate()
if not out:
out = sys.executable # fallback for some systems that fail above call
python_path = os.path.split(out)[0]
debug_path = os.path.expandvars(os.environ["PATH"])
env = {"PATH": f"{python_path}:{debug_path}"}
Expand Down
7 changes: 6 additions & 1 deletion tests/wps_restapi/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from tests.utils import get_test_weaver_app, get_test_weaver_config
from weaver.formats import ContentType
from weaver.utils import request_extra
from weaver.utils import get_header, request_extra
from weaver.wps_restapi import swagger_definitions as sd


Expand Down Expand Up @@ -44,6 +44,11 @@ def test_frontpage_format(self):
resp = self.testapp.get(urlparse(path).path, expect_errors=True) # allow error for wps without queries
else:
resp = request_extra("GET", path, retries=3, retry_after=True, ssl_verify=False, allow_redirects=True)
user_agent = get_header("user-agent", resp.request.headers)
if resp.status_code == 403 and "python" in user_agent:
# some sites will explicitly block bots, retry with mocked user-agent simulating human user access
resp = request_extra("GET", path, headers={"User-Agent": "Mozilla"},
retries=3, retry_after=True, ssl_verify=False, allow_redirects=True)
code = resp.status_code
test = f"({rel}) [{path}]"
assert code in [200, 400], f"Reference link expected to be found, got [{code}] for {test}"
Expand Down
48 changes: 41 additions & 7 deletions tests/wps_restapi/test_jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import pytest
from dateutil import parser as date_parser

from tests.resources import load_example
from tests.utils import (
get_links,
get_module_version,
Expand Down Expand Up @@ -49,7 +50,7 @@
from typing import Iterable, List, Optional, Tuple, Union

from weaver.status import AnyStatusType
from weaver.typedefs import JSON, AnyLogLevel, Number
from weaver.typedefs import JSON, AnyLogLevel, Number, Statistics
from weaver.visibility import AnyVisibility


Expand Down Expand Up @@ -172,7 +173,9 @@ def make_job(self,
duration=None, # type: Optional[int]
exceptions=None, # type: Optional[List[JSON]]
logs=None, # type: Optional[List[Union[str, Tuple[str, AnyLogLevel, AnyStatusType, Number]]]]
):
statistics=None, # type: Optional[Statistics]
add_info=True, # type: bool
): # type: (...) -> Job
if isinstance(created, str):
created = date_parser.parse(created)
job = self.job_store.save_job(task_id=task_id, process=process, service=service, is_workflow=False,
Expand All @@ -192,8 +195,11 @@ def make_job(self,
job.save_log(message=log_item)
if exceptions is not None:
job.exceptions = exceptions
if statistics is not None:
job.statistics = statistics
job = self.job_store.update_job(job)
self.job_info.append(job)
if add_info:
self.job_info.append(job)
return job

def message_with_jobs_mapping(self, message="", indent=2):
Expand Down Expand Up @@ -1391,11 +1397,15 @@ def test_job_results_errors(self):
]:
for what in ["outputs", "results"]:
path = f"/jobs/{job.id}/{what}"
case = (
f"Failed using (Path: {path}, Status: {job.status}, Code: {code}, Job: {job}, "
f"Title: {title}, Error: {error_type}, Cause: {cause})"
)
resp = self.app.get(path, headers=self.json_headers, expect_errors=True)
assert resp.status_code == code, f"Failed using [{path}]"
assert resp.json["title"] == title
assert resp.json["cause"] == cause
assert resp.json["type"].endswith(error_type) # ignore http full reference, not always there
assert resp.status_code == code, case
assert resp.json["title"] == title, case
assert resp.json["cause"] == cause, case
assert resp.json["type"].endswith(error_type), case # ignore http full reference, not always there
assert "links" in resp.json

def test_jobs_inputs_outputs_validations(self):
Expand Down Expand Up @@ -1548,3 +1558,27 @@ def test_job_logs_formats(self):
assert "Start" in lines[0]
assert "Process" in lines[1]
assert "Complete" in lines[2]

def test_job_statistics_missing(self):
job = self.job_info[0]
assert job.status == Status.SUCCEEDED, "invalid job status to run test"
path = f"/jobs/{job.id}/statistics"
resp = self.app.get(path, headers=self.json_headers, expect_errors=True)
assert resp.status_code == 404, "even if job is successful, expects not found if no statistics are available"

def test_job_statistics_response(self):
stats = load_example("job_statistics.json")
job = self.make_job(
add_info=False,
task_id="2222-0000-0000-0000", process=self.process_public.identifier, service=None,
user_id=self.user_admin_id, status=Status.SUCCEEDED, progress=100, access=Visibility.PUBLIC,
statistics=stats
)
try:
path = f"/jobs/{job.id}/statistics"
resp = self.app.get(path, headers=self.json_headers)
assert resp.status_code == 200
assert resp.json == stats
finally:
if job:
self.job_store.delete_job(job.id)
3 changes: 2 additions & 1 deletion weaver/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ def _update_files(self, inputs, url=None):
.. seealso::
- Headers dictionary limitation by :mod:`requests`:
https://docs.python-requests.org/en/latest/user/quickstart/#response-headers
https://requests.readthedocs.io/en/master/user/quickstart/#response-content
- Headers formatting with multiple values must be provided by comma-separated values
(:rfc:`7230#section-3.2.2`).
- Multi Vault-Token parsing accomplished by :func:`weaver.vault.utils.parse_vault_token`.
Expand Down Expand Up @@ -841,6 +841,7 @@ def monitor(self,
remain = timeout = timeout or self.monitor_timeout
delta = interval or self.monitor_interval
LOGGER.info("Monitoring job [%s] for %ss at intervals of %ss.", job_id, timeout, delta)
LOGGER.debug("Job URL: [%s]", job_url)
once = True
while remain >= 0 or once:
resp = request_extra("GET", job_url, headers=self._headers, settings=self._settings)
Expand Down
25 changes: 22 additions & 3 deletions weaver/datatype.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@
JSON,
Link,
Metadata,
QuoteProcessParameters
QuoteProcessParameters,
Statistics
)
from weaver.visibility import AnyVisibility

Expand Down Expand Up @@ -1021,6 +1022,21 @@ def progress(self, progress):
raise ValueError(f"Value must be in range [0,100] for '{self.__name__}.progress'")
self["progress"] = progress

@property
def statistics(self):
# type: () -> Optional[Statistics]
"""
Collected statistics about used memory and processing units if available.
"""
return self.get("statistics")

@statistics.setter
def statistics(self, stats):
# type: (Statistics) -> None
if not isinstance(stats, dict):
raise TypeError(f"Type 'dict' is required for '{self.__name__}.statistics'")
self["statistics"] = stats

def _get_results(self):
# type: () -> List[Optional[Dict[str, JSON]]]
if self.get("results") is None:
Expand Down Expand Up @@ -1052,13 +1068,13 @@ def _set_exceptions(self, exceptions):
exceptions = property(_get_exceptions, _set_exceptions)

def _get_logs(self):
# type: () -> List[Dict[str, str]]
# type: () -> List[str]
if self.get("logs") is None:
self["logs"] = []
return dict.__getitem__(self, "logs")

def _set_logs(self, logs):
# type: (List[Dict[str, str]]) -> None
# type: (List[str]) -> None
if not isinstance(logs, list):
raise TypeError(f"Type 'list' is required for '{self.__name__}.logs'")
self["logs"] = logs
Expand Down Expand Up @@ -1198,6 +1214,8 @@ def links(self, container=None, self_link=None):
"title": "Job outputs of successful process execution (extended outputs with metadata)."},
{"href": job_url + "/results", "rel": "http://www.opengis.net/def/rel/ogc/1.0/results",
"title": "Job results of successful process execution (direct output values mapping)."},
{"href": job_url + "/statistics", "rel": "statistics", # unofficial
"title": "Job statistics collected following process execution."},
])
else:
job_links.append({
Expand Down Expand Up @@ -1283,6 +1301,7 @@ def params(self):
"updated": self.updated,
"progress": self.progress,
"results": self.results,
"statistics": self.statistics,
"exceptions": self.exceptions,
"logs": self.logs,
"tags": self.tags,
Expand Down
9 changes: 9 additions & 0 deletions weaver/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,15 @@ class JobInvalidParameter(HTTPBadRequest, OWSInvalidParameterValue, JobException
"""


class JobStatisticsNotFound(JobNotFound):
"""
Error related to statistics not available for a Job.
Statistics could be unavailable due to incomplete execution, failed status,
or simply because it is an older result generated before this feature was introduced.
"""


class JobRegistrationError(HTTPInternalServerError, OWSNoApplicableCode, JobException):
"""
Error related to a registration issue for a job.
Expand Down
13 changes: 13 additions & 0 deletions weaver/processes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,13 @@ class OpenSearchField(Constants):
# FIXME: convert to 'Constants' class
CWL_REQUIREMENT_ENV_VAR = "EnvVarRequirement"
CWL_REQUIREMENT_INIT_WORKDIR = "InitialWorkDirRequirement"
CWL_REQUIREMENT_RESOURCE = "ResourceRequirement"
CWL_REQUIREMENT_SCATTER = "ScatterFeatureRequirement"

CWL_REQUIREMENT_FEATURES = frozenset([
CWL_REQUIREMENT_ENV_VAR,
CWL_REQUIREMENT_INIT_WORKDIR,
CWL_REQUIREMENT_RESOURCE, # FIXME: perform pre-check on job submit? (https://github.com/crim-ca/weaver/issues/138)
# CWL_REQUIREMENT_SCATTER, # FIXME: see workflow test + fix https://github.com/crim-ca/weaver/issues/105
])
"""
Expand Down Expand Up @@ -154,6 +156,17 @@ class ProcessSchema(Constants):
from weaver.typedefs import Literal

# pylint: disable=invalid-name
CWL_RequirementNames = Literal[
CWL_REQUIREMENT_APP_BUILTIN,
CWL_REQUIREMENT_APP_DOCKER,
CWL_REQUIREMENT_APP_DOCKER_GPU,
CWL_REQUIREMENT_APP_ESGF_CWT,
CWL_REQUIREMENT_APP_WPS1,
CWL_REQUIREMENT_ENV_VAR,
CWL_REQUIREMENT_INIT_WORKDIR,
CWL_REQUIREMENT_RESOURCE,
CWL_REQUIREMENT_SCATTER,
]
ProcessSchemaType = Literal[ProcessSchema.OGC, ProcessSchema.OLD]
WPS_ComplexType = Literal[WPS_COMPLEX, WPS_COMPLEX_DATA, WPS_REFERENCE]
WPS_DataType = Union[Literal[WPS_LITERAL, WPS_BOUNDINGBOX], WPS_ComplexType]
Loading

0 comments on commit 274d85e

Please sign in to comment.