diff --git a/pythonFiles/tests/pytestadapter/helpers.py b/pythonFiles/tests/pytestadapter/helpers.py index e84c46037c2f..013e4bb31fca 100644 --- a/pythonFiles/tests/pytestadapter/helpers.py +++ b/pythonFiles/tests/pytestadapter/helpers.py @@ -1,18 +1,15 @@ # Copyright (c) Microsoft Corporation. All rights reserved. # Licensed under the MIT License. - -import contextlib import io import json import os import pathlib -import random import socket import subprocess import sys import threading import uuid -from typing import Any, Dict, List, Optional, Union +from typing import Any, Dict, List, Optional, Tuple TEST_DATA_PATH = pathlib.Path(__file__).parent / ".data" from typing_extensions import TypedDict @@ -70,31 +67,29 @@ def _new_sock() -> socket.socket: ) -def process_rpc_json(data: str) -> Dict[str, Any]: +def process_rpc_message(data: str) -> Tuple[Dict[str, Any], str]: """Process the JSON data which comes from the server which runs the pytest discovery.""" str_stream: io.StringIO = io.StringIO(data) length: int = 0 - while True: line: str = str_stream.readline() if CONTENT_LENGTH.lower() in line.lower(): length = int(line[len(CONTENT_LENGTH) :]) break - if not line or line.isspace(): raise ValueError("Header does not contain Content-Length") - while True: line: str = str_stream.readline() if not line or line.isspace(): break raw_json: str = str_stream.read(length) - return json.loads(raw_json) + dict_json: Dict[str, Any] = json.loads(raw_json) + return dict_json, str_stream.read() -def runner(args: List[str]) -> Optional[Dict[str, Any]]: +def runner(args: List[str]) -> Optional[List[Dict[str, Any]]]: """Run the pytest discovery and return the JSON data from the server.""" process_args: List[str] = [ sys.executable, @@ -133,7 +128,19 @@ def runner(args: List[str]) -> Optional[Dict[str, Any]]: t1.join() t2.join() - return process_rpc_json(result[0]) if result else None + a = process_rpc_json(result[0]) + return a if result else None + + +def process_rpc_json(data: str) -> List[Dict[str, Any]]: + """Process the JSON data which comes from the server which runs the pytest discovery.""" + json_messages = [] + remaining = data + while remaining: + json_data, remaining = process_rpc_message(remaining) + json_messages.append(json_data) + + return json_messages def _listen_on_socket(listener: socket.socket, result: List[str]): diff --git a/pythonFiles/tests/pytestadapter/test_discovery.py b/pythonFiles/tests/pytestadapter/test_discovery.py index bb6e7255704e..ab7abb508153 100644 --- a/pythonFiles/tests/pytestadapter/test_discovery.py +++ b/pythonFiles/tests/pytestadapter/test_discovery.py @@ -2,6 +2,7 @@ # Licensed under the MIT License. import os import shutil +from typing import Any, Dict, List, Optional import pytest @@ -28,12 +29,15 @@ def test_syntax_error(tmp_path): temp_dir.mkdir() p = temp_dir / "error_syntax_discovery.py" shutil.copyfile(file_path, p) - actual = runner(["--collect-only", os.fspath(p)]) - assert actual - assert all(item in actual for item in ("status", "cwd", "error")) - assert actual["status"] == "error" - assert actual["cwd"] == os.fspath(TEST_DATA_PATH) - assert len(actual["error"]) == 2 + actual_list: Optional[List[Dict[str, Any]]] = runner( + ["--collect-only", os.fspath(p)] + ) + assert actual_list + for actual in actual_list: + assert all(item in actual for item in ("status", "cwd", "error")) + assert actual["status"] == "error" + assert actual["cwd"] == os.fspath(TEST_DATA_PATH) + assert len(actual["error"]) == 2 def test_parameterized_error_collect(): @@ -42,12 +46,15 @@ def test_parameterized_error_collect(): The json should still be returned but the errors list should be present. """ file_path_str = "error_parametrize_discovery.py" - actual = runner(["--collect-only", file_path_str]) - assert actual - assert all(item in actual for item in ("status", "cwd", "error")) - assert actual["status"] == "error" - assert actual["cwd"] == os.fspath(TEST_DATA_PATH) - assert len(actual["error"]) == 2 + actual_list: Optional[List[Dict[str, Any]]] = runner( + ["--collect-only", file_path_str] + ) + assert actual_list + for actual in actual_list: + assert all(item in actual for item in ("status", "cwd", "error")) + assert actual["status"] == "error" + assert actual["cwd"] == os.fspath(TEST_DATA_PATH) + assert len(actual["error"]) == 2 @pytest.mark.parametrize( @@ -98,14 +105,15 @@ def test_pytest_collect(file, expected_const): file -- a string with the file or folder to run pytest discovery on. expected_const -- the expected output from running pytest discovery on the file. """ - actual = runner( + actual_list: Optional[List[Dict[str, Any]]] = runner( [ "--collect-only", os.fspath(TEST_DATA_PATH / file), ] ) - assert actual - assert all(item in actual for item in ("status", "cwd", "tests")) - assert actual["status"] == "success" - assert actual["cwd"] == os.fspath(TEST_DATA_PATH) - assert actual["tests"] == expected_const + assert actual_list + for actual in actual_list: + assert all(item in actual for item in ("status", "cwd", "tests")) + assert actual["status"] == "success" + assert actual["cwd"] == os.fspath(TEST_DATA_PATH) + assert actual["tests"] == expected_const diff --git a/pythonFiles/tests/pytestadapter/test_execution.py b/pythonFiles/tests/pytestadapter/test_execution.py index 8613deb96098..d54e4e758d35 100644 --- a/pythonFiles/tests/pytestadapter/test_execution.py +++ b/pythonFiles/tests/pytestadapter/test_execution.py @@ -2,6 +2,7 @@ # Licensed under the MIT License. import os import shutil +from typing import Any, Dict, List, Optional import pytest from tests.pytestadapter import expected_execution_test_output @@ -13,7 +14,7 @@ def test_syntax_error_execution(tmp_path): """Test pytest execution on a file that has a syntax error. Copies the contents of a .txt file to a .py file in the temporary directory - to then run pytest exeuction on. + to then run pytest execution on. The json should still be returned but the errors list should be present. @@ -28,12 +29,15 @@ def test_syntax_error_execution(tmp_path): temp_dir.mkdir() p = temp_dir / "error_syntax_discovery.py" shutil.copyfile(file_path, p) - actual = runner(["error_syntax_discover.py::test_function"]) - assert actual - assert all(item in actual for item in ("status", "cwd", "error")) - assert actual["status"] == "error" - assert actual["cwd"] == os.fspath(TEST_DATA_PATH) - assert len(actual["error"]) == 1 + actual_list: Optional[List[Dict[str, Any]]] = runner( + ["error_syntax_discover.py::test_function"] + ) + assert actual_list + for actual in actual_list: + assert all(item in actual for item in ("status", "cwd", "error")) + assert actual["status"] == "error" + assert actual["cwd"] == os.fspath(TEST_DATA_PATH) + assert len(actual["error"]) == 1 def test_bad_id_error_execution(): @@ -41,12 +45,13 @@ def test_bad_id_error_execution(): The json should still be returned but the errors list should be present. """ - actual = runner(["not/a/real::test_id"]) - assert actual - assert all(item in actual for item in ("status", "cwd", "error")) - assert actual["status"] == "error" - assert actual["cwd"] == os.fspath(TEST_DATA_PATH) - assert len(actual["error"]) == 1 + actual_list: Optional[List[Dict[str, Any]]] = runner(["not/a/real::test_id"]) + assert actual_list + for actual in actual_list: + assert all(item in actual for item in ("status", "cwd", "error")) + assert actual["status"] == "error" + assert actual["cwd"] == os.fspath(TEST_DATA_PATH) + assert len(actual["error"]) == 1 @pytest.mark.parametrize( @@ -153,13 +158,14 @@ def test_pytest_execution(test_ids, expected_const): expected_const -- a dictionary of the expected output from running pytest discovery on the files. """ args = test_ids - actual = runner(args) - assert actual - assert all(item in actual for item in ("status", "cwd", "result")) - assert actual["status"] == "success" - assert actual["cwd"] == os.fspath(TEST_DATA_PATH) - result_data = actual["result"] - for key in result_data: - if result_data[key]["outcome"] == "failure": - result_data[key]["message"] = "ERROR MESSAGE" - assert result_data == expected_const + actual_list: Optional[List[Dict[str, Any]]] = runner(args) + assert actual_list + for actual in actual_list: + assert all(item in actual for item in ("status", "cwd", "result")) + assert actual["status"] == "success" + assert actual["cwd"] == os.fspath(TEST_DATA_PATH) + result_data = actual["result"] + for key in result_data: + if result_data[key]["outcome"] == "failure": + result_data[key]["message"] = "ERROR MESSAGE" + assert result_data == expected_const