diff --git a/pythonFiles/tests/pytestadapter/helpers.py b/pythonFiles/tests/pytestadapter/helpers.py index b078439f6eac..e84c46037c2f 100644 --- a/pythonFiles/tests/pytestadapter/helpers.py +++ b/pythonFiles/tests/pytestadapter/helpers.py @@ -10,6 +10,7 @@ import socket import subprocess import sys +import threading import uuid from typing import Any, Dict, List, Optional, Union @@ -17,20 +18,6 @@ from typing_extensions import TypedDict -@contextlib.contextmanager -def test_output_file(root: pathlib.Path, ext: str = ".txt"): - """Creates a temporary python file with a random name.""" - basename = ( - "".join(random.choice("abcdefghijklmnopqrstuvwxyz") for _ in range(9)) + ext - ) - fullpath = root / basename - try: - fullpath.write_text("", encoding="utf-8") - yield fullpath - finally: - os.unlink(str(fullpath)) - - def create_server( host: str = "127.0.0.1", port: int = 0, @@ -116,31 +103,51 @@ def runner(args: List[str]) -> Optional[Dict[str, Any]]: "-p", "vscode_pytest", ] + args + listener: socket.socket = create_server() + _, port = listener.getsockname() + listener.listen() + + env = os.environ.copy() + env.update( + { + "TEST_UUID": str(uuid.uuid4()), + "TEST_PORT": str(port), + "PYTHONPATH": os.fspath(pathlib.Path(__file__).parent.parent.parent), + } + ) + + result: list = [] + t1: threading.Thread = threading.Thread( + target=_listen_on_socket, args=(listener, result) + ) + t1.start() + + t2 = threading.Thread( + target=lambda proc_args, proc_env, proc_cwd: subprocess.run( + proc_args, env=proc_env, cwd=proc_cwd + ), + args=(process_args, env, TEST_DATA_PATH), + ) + t2.start() + + t1.join() + t2.join() + + return process_rpc_json(result[0]) if result else None + - with test_output_file(TEST_DATA_PATH) as output_path: - env = os.environ.copy() - env.update( - { - "TEST_UUID": str(uuid.uuid4()), - "TEST_PORT": str(12345), # port is not used for tests - "PYTHONPATH": os.fspath(pathlib.Path(__file__).parent.parent.parent), - "TEST_OUTPUT_FILE": os.fspath(output_path), - } - ) - - result = subprocess.run( - process_args, - env=env, - cwd=os.fspath(TEST_DATA_PATH), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - if result.returncode != 0: - print("Subprocess Run failed with:") - print(result.stdout.decode(encoding="utf-8")) - print(result.stderr.decode(encoding="utf-8")) - - return process_rpc_json(output_path.read_text(encoding="utf-8")) +def _listen_on_socket(listener: socket.socket, result: List[str]): + """Listen on the socket for the JSON data from the server. + Created as a seperate function for clarity in threading. + """ + sock, (other_host, other_port) = listener.accept() + all_data: list = [] + while True: + data: bytes = sock.recv(1024 * 1024) + if not data: + break + all_data.append(data.decode("utf-8")) + result.append("".join(all_data)) def find_test_line_number(test_name: str, test_file_path) -> str: diff --git a/pythonFiles/vscode_pytest/__init__.py b/pythonFiles/vscode_pytest/__init__.py index 6063e4113d55..39330f4e8a38 100644 --- a/pythonFiles/vscode_pytest/__init__.py +++ b/pythonFiles/vscode_pytest/__init__.py @@ -437,19 +437,13 @@ def execution_post( Request-uuid: {testuuid} {data}""" - test_output_file: Optional[str] = os.getenv("TEST_OUTPUT_FILE", None) - if test_output_file == "stdout": - print(request) - elif test_output_file: - pathlib.Path(test_output_file).write_text(request, encoding="utf-8") - else: - try: - with socket_manager.SocketManager(addr) as s: - if s.socket is not None: - s.socket.sendall(request.encode("utf-8")) - except Exception as e: - print(f"Plugin error connection error[vscode-pytest]: {e}") - print(f"[vscode-pytest] data: {request}") + try: + with socket_manager.SocketManager(addr) as s: + if s.socket is not None: + s.socket.sendall(request.encode("utf-8")) + except Exception as e: + print(f"Plugin error connection error[vscode-pytest]: {e}") + print(f"[vscode-pytest] data: {request}") def post_response(cwd: str, session_node: TestNode) -> None: @@ -477,16 +471,10 @@ def post_response(cwd: str, session_node: TestNode) -> None: Request-uuid: {testuuid} {data}""" - test_output_file: Optional[str] = os.getenv("TEST_OUTPUT_FILE", None) - if test_output_file == "stdout": - print(request) - elif test_output_file: - pathlib.Path(test_output_file).write_text(request, encoding="utf-8") - else: - try: - with socket_manager.SocketManager(addr) as s: - if s.socket is not None: - s.socket.sendall(request.encode("utf-8")) - except Exception as e: - print(f"Plugin error connection error[vscode-pytest]: {e}") - print(f"[vscode-pytest] data: {request}") + try: + with socket_manager.SocketManager(addr) as s: + if s.socket is not None: + s.socket.sendall(request.encode("utf-8")) + except Exception as e: + print(f"Plugin error connection error[vscode-pytest]: {e}") + print(f"[vscode-pytest] data: {request}")