From 28c9d77e2739e0bd00485cd18ef61f1999b3c4cd Mon Sep 17 00:00:00 2001 From: Francis Charette Migneault Date: Thu, 3 Oct 2024 04:44:03 -0400 Subject: [PATCH] [wip] more updates for multipart inline convert of data/link representation --- tests/functional/test_wps_package.py | 100 ++++++++++++++++++--------- weaver/cli.py | 20 +++--- weaver/typedefs.py | 5 +- weaver/utils.py | 23 +++++- weaver/wps_restapi/jobs/utils.py | 30 +++++++- 5 files changed, 127 insertions(+), 51 deletions(-) diff --git a/tests/functional/test_wps_package.py b/tests/functional/test_wps_package.py index 4a5cc1831..84a8826bd 100644 --- a/tests/functional/test_wps_package.py +++ b/tests/functional/test_wps_package.py @@ -3568,6 +3568,34 @@ def remove_result_multipart_variable(results): results = re.sub(r"Last-Modified: .*\r\n", "", results) return results.strip() + @staticmethod + def fix_result_multipart_indent(results): + # type: (str) -> str + """ + Remove indented whitespace from multipart literal contents. + + This behaves similarly to :func:`inspect.cleandoc`, but handles cases were the nested part contents could + themselves contain newlines, leading to inconsistent indents for some lines when injected by string formating, + and causing :func:`inspect.cleandoc` to fail removing any indent. + + Also, automatically applies ``\r\n`` characters correction which are critical in parsing multipart contents. + This is done to consider that literal newlines will include or not the ``\r`` depending on the OS running tests. + + .. warning:: + This should be used only for literal test string (i.e.: expected value) for comparison against the result. + Result contents obtained from the response should be compared as-is, without any fix for strict checks. + """ + if results.startswith("\n "): + results = results[1:] + res_dedent = results.lstrip() + res_indent = len(results) - len(res_dedent) + res_spaces = " " * res_indent + res_dedent = res_dedent.replace(f"\n{res_spaces}", "\r\n") # indented line + res_dedent = res_dedent.replace(f"\n\r\n", "\r\n\r\n") # empty line (header/body separator) + res_dedent = res_dedent.replace("\r\r", "\r") # in case windows + res_dedent = res_dedent.rstrip("\n ") # last line often indented less because of closing multiline string + return res_dedent + def test_execute_single_output_prefer_header_return_representation_literal(self): proc = "EchoResultsTester" p_id = self.fully_qualified_test_process_name(proc) @@ -3605,8 +3633,10 @@ def test_execute_single_output_prefer_header_return_representation_literal(self) assert results.text == "test" outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) - assert outputs.json == { - "output_data": {"value": "test"}, + assert outputs.json["outputs"] == { + "output_data": { + "value": "test" + }, } def test_execute_single_output_prefer_header_return_representation_complex(self): @@ -3648,7 +3678,7 @@ def test_execute_single_output_prefer_header_return_representation_complex(self) assert results.text == output_json outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) - assert outputs.json == { + assert outputs.json["outputs"] == { "output_json": { "href": f"{out_url}/{job_id}/output_json/result.json", "type": ContentType.APP_JSON, @@ -3692,8 +3722,10 @@ def test_execute_single_output_prefer_header_return_minimal_literal(self): assert results.text == "test" outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) - assert outputs.json == { - "output_data": "test", + assert outputs.json["outputs"] == { + "output_data": { + "value": "test" + }, } def test_execute_single_output_prefer_header_return_minimal_complex(self): @@ -3744,7 +3776,7 @@ def test_execute_single_output_prefer_header_return_minimal_complex(self): ), "Filtered outputs should not be found in results response links." outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) - assert outputs.json == { + assert outputs.json["outputs"] == { "output_json": { "href": f"{out_url}/{job_id}/output_json/result.json", "type": ContentType.APP_JSON, @@ -3790,7 +3822,9 @@ def test_execute_single_output_response_raw_value_literal(self): outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) assert outputs.json["outputs"] == { - "output_data": "test", + "output_data": { + "value": "test" + }, } def test_execute_single_output_response_raw_value_complex(self): @@ -3889,7 +3923,9 @@ def test_execute_single_output_response_raw_reference_literal(self): outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) assert outputs.content_type.startswith(ContentType.APP_JSON) assert outputs.json["outputs"] == { - "output_data": "test", + "output_data": { + "value": "test" + }, } def test_execute_single_output_response_raw_reference_complex(self): @@ -4004,7 +4040,7 @@ def test_execute_single_output_multipart_accept_data(self): assert ContentType.MULTIPART_MIXED in results.content_type boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] output_json = repr_json({"data": "test"}, separators=(",", ":"), force_string=True) - results_body = inspect.cleandoc(f""" + results_body = self.fix_result_multipart_indent(f""" --{boundary} Content-Type: {ContentType.APP_JSON} Content-Location: {out_url}/{job_id}/output_json/result.json @@ -4012,7 +4048,7 @@ def test_execute_single_output_multipart_accept_data(self): {output_json} --{boundary}-- - """).replace("\n", "\r\n") + """) results_text = self.remove_result_multipart_variable(results.text) assert results_text == results_body outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) @@ -4077,7 +4113,7 @@ def test_execute_single_output_multipart_accept_link(self): results = resp assert ContentType.MULTIPART_MIXED in results.content_type boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] - results_body = inspect.cleandoc(f""" + results_body = self.fix_result_multipart_indent(f""" --{boundary} Content-Disposition: attachment; name="output_json"; filename="result.json" Content-Type: {ContentType.APP_JSON} @@ -4086,7 +4122,7 @@ def test_execute_single_output_multipart_accept_link(self): Content-Length: 0 --{boundary}-- - """).replace("\n", "\r\n") + """) results_text = self.remove_result_multipart_variable(results.text) assert results_text == results_body outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) @@ -4151,7 +4187,7 @@ def test_execute_single_output_multipart_accept_alt_format(self): assert ContentType.MULTIPART_MIXED in results.content_type boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] output_json_as_yaml = yaml.safe_dump({"data": "test"}) - results_body = inspect.cleandoc(f""" + results_body = self.fix_result_multipart_indent(f""" --{boundary} Content-Type: {ContentType.APP_YAML} Content-ID: @@ -4159,7 +4195,7 @@ def test_execute_single_output_multipart_accept_alt_format(self): {output_json_as_yaml} --{boundary}-- - """).replace("\n", "\r\n") + """) results_text = self.remove_result_multipart_variable(results.text) assert results.content_type.startswith(ContentType.MULTIPART_MIXED) assert results_text == results_body @@ -4182,7 +4218,7 @@ def test_execute_single_output_multipart_accept_alt_format(self): # FIXME: implement (https://github.com/crim-ca/weaver/pull/548) @pytest.mark.xfail(reason="not implemented") - def test_execute_single_output_response_document_alt_format(self): + def test_execute_single_output_response_document_alt_format_yaml(self): proc = "EchoResultsTester" p_id = self.fully_qualified_test_process_name(proc) body = self.retrieve_payload(proc, "deploy", local=True) @@ -4227,7 +4263,7 @@ def test_execute_single_output_response_document_alt_format(self): assert ContentType.MULTIPART_MIXED in results.content_type boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] output_json_as_yaml = yaml.safe_dump({"data": "test"}) - results_body = inspect.cleandoc(f""" + results_body = self.fix_result_multipart_indent(f""" --{boundary} Content-Type: {ContentType.APP_YAML} Content-ID: @@ -4235,7 +4271,7 @@ def test_execute_single_output_response_document_alt_format(self): {output_json_as_yaml} --{boundary}-- - """).replace("\n", "\r\n") + """) results_text = self.remove_result_multipart_variable(results.text) assert results.content_type.startswith(ContentType.MULTIPART_MIXED) assert results_text == results_body @@ -4461,7 +4497,7 @@ def test_execute_multi_output_multipart_accept(self, multipart_header): results = self.app.get(f"/jobs/{job_id}/results") boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] - results_body = inspect.cleandoc(f""" + results_body = self.fix_result_multipart_indent(f""" --{boundary} Content-Disposition: attachment; name="output_data" Content-Type: {ContentType.TEXT_PLAIN} @@ -4477,7 +4513,7 @@ def test_execute_multi_output_multipart_accept(self, multipart_header): Content-Length: 0 --{boundary}-- - """).replace("\n", "\r\n") + """) results_text = self.remove_result_multipart_variable(results.text) assert results.content_type.startswith(ContentType.MULTIPART_MIXED) assert results_text == results_body @@ -4611,10 +4647,10 @@ def test_execute_multi_output_prefer_header_return_representation(self): out_url = get_wps_output_url(self.settings) results = self.app.get(f"/jobs/{job_id}/results") boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] - output_json = repr_json({"data": "test"}, separators=(",", ":"), force_string=True) - results_body = inspect.cleandoc(f""" + output_json = repr_json({"data": "test"}, indent=None, separators=(",", ":"), force_string=True) + results_body = self.fix_result_multipart_indent(f""" --{boundary} - Content-Disposition: attachment; filename="output_data.txt"; name="output_data" + Content-Disposition: attachment; name="output_data"; filename="output_data.txt" Content-Type: {ContentType.TEXT_PLAIN} Content-ID: Content-Length: 4 @@ -4629,7 +4665,7 @@ def test_execute_multi_output_prefer_header_return_representation(self): {output_json} --{boundary}-- - """).replace("\n", "\r\n") + """) results_text = self.remove_result_multipart_variable(results.text) assert results.content_type.startswith(ContentType.MULTIPART_MIXED) assert results_text == results_body @@ -4683,7 +4719,7 @@ def test_execute_multi_output_response_raw_value(self): results = self.app.get(f"/jobs/{job_id}/results") boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] output_json = repr_json({"data": "test"}, separators=(",", ":"), force_string=True) - results_body = inspect.cleandoc(f""" + results_body = self.fix_result_multipart_indent(f""" --{boundary} Content-Disposition: attachment; name="output_data" Content-Type: {ContentType.TEXT_PLAIN} @@ -4700,7 +4736,7 @@ def test_execute_multi_output_response_raw_value(self): {output_json} --{boundary}-- - """).replace("\n", "\r\n") + """) assert results.content_type.startswith(ContentType.MULTIPART_MIXED) assert results.text == results_body outputs = self.app.get(f"/jobs/{job_id}/outputs", params={"schema": JobInputsOutputsSchema.OGC_STRICT}) @@ -4838,7 +4874,7 @@ def test_execute_multi_output_response_raw_reference_accept_multipart(self): results = self.app.get(f"/jobs/{job_id}/results") boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] - results_body = inspect.cleandoc(f""" + results_body = self.fix_result_multipart_indent(f""" --{boundary} Content-Disposition: attachment; name="output_data"; filename="output_data.txt" Content-Type: {ContentType.TEXT_PLAIN} @@ -4854,7 +4890,7 @@ def test_execute_multi_output_response_raw_reference_accept_multipart(self): Content-Length: 0 --{boundary}-- - """).replace("\n", "\r\n") + """) results_text = self.remove_result_multipart_variable(results.text) assert results.content_type.startswith(ContentType.MULTIPART_MIXED) assert results_text == results_body @@ -4908,8 +4944,8 @@ def test_execute_multi_output_response_raw_mixed(self): out_url = get_wps_output_url(self.settings) results = self.app.get(f"/jobs/{job_id}/results") boundary = parse_kvp(results.headers["Content-Type"])["boundary"][0] - output_json = repr_json({"data": "test"}, separators=(",", ":"), force_string=True) - results_body = inspect.cleandoc(f""" + output_json = repr_json({"data": "test"}, indent=None, separators=(",", ":"), force_string=True) + results_body = self.fix_result_multipart_indent(f""" --{boundary} Content-Disposition: attachment; name="output_data" Content-Type: {ContentType.TEXT_PLAIN} @@ -4933,7 +4969,7 @@ def test_execute_multi_output_response_raw_mixed(self): {output_json} --{boundary}-- - """).replace("\n", "\r\n") + """) results_text = self.remove_result_multipart_variable(results.text) assert results.content_type.startswith(ContentType.MULTIPART_MIXED) assert results_text == results_body @@ -5060,11 +5096,11 @@ def test_execute_multi_output_prefer_header_return_minimal_override_transmission out_url = get_wps_output_url(self.settings) results = self.app.get(f"/jobs/{job_id}/results") results_json = self.remove_result_format(results.json) - output_json = repr_json({"data": "test"}, separators=(",", ":"), force_string=True) + output_json = repr_json({"data": "test"}, indent=None, separators=(",", ":"), force_string=True) assert results.content_type.startswith(ContentType.APP_JSON) assert results_json == { "output_data": { - "href": f"{out_url}/{job_id}/output_text/result.txt", + "href": f"{out_url}/{job_id}/output_data/output_data.txt", "type": ContentType.TEXT_PLAIN, }, "output_json": { diff --git a/weaver/cli.py b/weaver/cli.py index 687449bdc..3d1b910c2 100644 --- a/weaver/cli.py +++ b/weaver/cli.py @@ -51,7 +51,7 @@ import_target, load_file, null, - parse_kvp, + parse_link_header, request_extra, setup_loggers ) @@ -1691,23 +1691,21 @@ def _download_references(self, outputs, out_links, out_dir, job_id, auth=None): # download links from headers LOGGER.debug("%s outputs in results link headers.", "Processing" if len(out_links) else "No") for _, link_header in ResponseHeaders(out_links).items(): - link, params = link_header.split(";", 1) - href = link.strip("<>") - params = parse_kvp(params, multi_value_sep=None, accumulate_keys=False) - ctype = (params.get("type") or [None])[0] # type: str - rel = params["rel"][0].split(".") + link = parse_link_header(link_header) + rel = link["rel"].rsplit(".", 1) output = rel[0] is_array = len(rel) > 1 and str.isnumeric(rel[1]) - ref_path = fetch_reference(href, out_dir, auth=auth, + ref_path = fetch_reference(link["href"], out_dir, auth=auth, out_method=OutputMethod.COPY, out_listing=False) - value = {"href": href, "type": ctype, "path": ref_path, "source": "link"} # type: ExecutionResultObjectRef + link = cast("ExecutionResultObjectRef", link) + link.update({"path": ref_path, "source": "link"}) if output in outputs: if isinstance(outputs[output], dict): # in case 'rel="."' was not employed - outputs[output] = [outputs[output], value] + outputs[output] = [outputs[output], link] else: - outputs[output].append(value) + outputs[output].append(link) else: - outputs[output] = [value] if is_array else value + outputs[output] = [link] if is_array else link return outputs def results(self, diff --git a/weaver/typedefs.py b/weaver/typedefs.py index 58dc1c66f..0a3304fa6 100644 --- a/weaver/typedefs.py +++ b/weaver/typedefs.py @@ -113,7 +113,7 @@ JSON = Union[Dict[str, Union[_JSON, _JsonItem]], List[Union[_JSON, _JsonItem]], AnyValueType] Link = TypedDict("Link", { - "title": str, + "title": NotRequired[str], "rel": Required[str], "href": Required[str], "hreflang": NotRequired[str], @@ -349,7 +349,8 @@ class CWL_SchemaName(Protocol): WPS_OutputAsRefMediaType = Tuple[str, Optional[bool], Optional[str]] # (output_id, as_ref, mime_type) WPS_OutputRequested = Union[WPS_OutputAsRef, WPS_OutputAsRefMediaType] - KVP_Item = Union[ValueType, Sequence[ValueType]] + KVP_Value = Optional[str] + KVP_Item = Union[KVP_Value, Sequence[KVP_Value]] KVP_Container = Union[Sequence[Tuple[str, KVP_Item]], Dict[str, KVP_Item]] KVP = Dict[str, List[KVP_Item]] diff --git a/weaver/utils.py b/weaver/utils.py index 2c8b0067c..7fbc8794f 100644 --- a/weaver/utils.py +++ b/weaver/utils.py @@ -1321,9 +1321,9 @@ def get_href_headers( if os.path.splitext(path)[-1] in ["", "."]: f_ext = get_extension(f_type, dot=True) path = f"{path}{f_ext}" - content_disposition_params = f"filename=\"{os.path.basename(path)}\"" - if content_name: - content_disposition_params += f"; name=\"{content_name}\"" + # set name, then filename, to align with order employed by requests-toolbelt multipart class + content_disposition_params = f"name=\"{content_name}\"; "if content_name else "" + content_disposition_params += f"filename=\"{os.path.basename(path)}\"" headers["Content-Disposition"] = f"{content_disposition_type}; {content_disposition_params}" f_current = get_file_header_datetime(now()) headers["Date"] = f_current @@ -1371,6 +1371,23 @@ def make_link_header( return link +def parse_link_header(link_header): + # type: (str) -> Link + """ + Parses the parameters of the ``Link`` header. + """ + url, params = link_header.split(";", 1) + href = url.strip("<>") + params = parse_kvp(params, multi_value_sep=None, accumulate_keys=False) + ctype = (params.pop("type", None) or [None])[0] + rel = str(params.pop("rel")[0]) + link = {"href": href, "rel": rel} # type: Link + if ctype and isinstance(ctype, str): + link["type"] = ctype + link.update({param: value[0] for param, value in params.items() if value}) + return link + + def get_base_url(url): # type: (str) -> str """ diff --git a/weaver/wps_restapi/jobs/utils.py b/weaver/wps_restapi/jobs/utils.py index 6f1421aab..cbaa4f8ba 100644 --- a/weaver/wps_restapi/jobs/utils.py +++ b/weaver/wps_restapi/jobs/utils.py @@ -54,6 +54,7 @@ get_weaver_url, is_uuid, make_link_header, + parse_link_header ) from weaver.visibility import Visibility from weaver.wps.utils import get_wps_output_dir, get_wps_output_url, map_wps_output_location @@ -582,6 +583,17 @@ def get_job_results_response( raise_job_dismissed(job, container) raise_job_bad_status(job, container) + # FIXME: if 'return=representation' (any type) without 'transmissionMode' override -> force 'transmissionMode=value' + # (see 'test_execute_multi_output_prefer_header_return_representation') + + # FIXME: if value is JSON with 'response=document' also JSON, auto-load value from ref to embed in body + # - test_execute_single_output_response_document_default_format_json_special + + # FIXME: apply converters (https://github.com/crim-ca/weaver/pull/548) + # - test_execute_single_output_response_document_alt_format_json + # - test_execute_single_output_response_document_alt_format_yaml + # - test_execute_single_output_multipart_accept_alt_format + # when 'response=document', ignore 'transmissionMode=value|reference', respect it when 'response=raw' # resolution of 'transmissionMode' for document representation will be done by its own handler function # See: @@ -653,11 +665,23 @@ def get_job_results_response( # multipart response if ( - len(results) > 1 or + (len(results) + len(refs)) > 1 or (isinstance(out_data, list) and len(out_data) > 1) or is_accept_multipart ): - return get_job_results_multipart(job, results, headers=headers, container=container) + # backtrack link references that were generated if 'Accept: multipart/*' was omitted + # while using 'response=raw' leading to at least 1 by-value output + # (must force multipart with empty-part for links to respect OGC API - Processes v1.0) + # https://docs.ogc.org/is/18-062r2/18-062r2.html#req_core_process-execute-sync-raw-mixed-multi + for ref in refs: + ref_link = parse_link_header(ref[-1]) + results[ref_link["rel"]] = ref_link + # attempt sort by original results ordering to generate multipart contents consistently + out_order = list(convert_output_params_schema(job.results, JobInputsOutputsSchema.OGC)) + res_order = {out_id: results[out_id] for out_id in out_order if out_id in results} + res_array = sorted(set(results) - set(res_order)) # in case of 'out.idx' employed for arrays + res_order.update({out_id: results[out_id] for out_id in res_array}) # if missing link arrays + return get_job_results_multipart(job, res_order, headers=headers, container=container) # single value only out_data = out_data[0] if isinstance(out_data, list) else out_data @@ -726,7 +750,7 @@ def generate_or_resolve_result( url = os.path.join(out_url, url[1:]) loc = map_wps_output_location(url, settings, exists=True, url=False) else: - typ = result.get("mediaType") or ContentType.TEXT_PLAIN + typ = get_field(result, "mime_type", search_variations=True, default=ContentType.TEXT_PLAIN) if not url: out_dir = get_wps_output_dir(settings)