diff --git a/CHANGELOG.md b/CHANGELOG.md index 9888b215bb..08e4a5fcb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ * Testing: * Improve `test_full_workflow.py` (\#971). * Update `pytest-asyncio` to v0.21 (\#1008). + * Fix CI issue related to event loop and asyncpg (\#1012). # 1.3.14 (do not use!) diff --git a/tests/conftest.py b/tests/conftest.py index 657c0b5d82..a7f2db5d14 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,18 +21,12 @@ def check_basetemp(tpath: Path): ) -def pytest_configure(config): - """ - See https://docs.pytest.org/en/stable/how-to/mark.html#registering-marks - """ - config.addinivalue_line("markers", "slow: marks tests as slow") - - @pytest.fixture(scope="session") def event_loop(): _event_loop = asyncio.new_event_loop() _event_loop.set_debug(True) yield _event_loop + _event_loop.close() @pytest.fixture(scope="session") diff --git a/tests/test_backend_slurm_shutdown.py b/tests/test_backend_slurm_shutdown.py index c684153032..c70008b701 100644 --- a/tests/test_backend_slurm_shutdown.py +++ b/tests/test_backend_slurm_shutdown.py @@ -1,8 +1,6 @@ -import asyncio -import logging -import threading +import shlex +import subprocess import time -from pathlib import Path import pytest from devtools import debug @@ -114,33 +112,6 @@ def _sleep_and_return(sleep_time): raise e -async def _write_shutdown_file(shutdown_file: Path, sleep_time): - # The _auxiliary_scancel and _auxiliary_run functions are used as in - # https://stackoverflow.com/a/59645689/19085332 - logging.warning(f"[_write_shutdown_file] run START {time.perf_counter()=}") - # Wait `scancel_sleep_time` seconds, to let the SLURM job pass from PENDING - # to RUNNING - time.sleep(sleep_time) - - debug(run_squeue()) - # Scancel all jobs of the current SLURM user - logging.warning(f"[_write_shutdown_file] run WRITE {time.perf_counter()=}") - # Trigger shutdown - with shutdown_file.open("w") as f: - f.write("") - assert shutdown_file.exists() - logging.warning(f"[_write_shutdown_file] run END {time.perf_counter()=}") - - -def _auxiliary_run(shutdown_file: Path, sleep_time): - # The _write_shutdown_file and _auxiliary_run functions are used as in - # https://stackoverflow.com/a/59645689/19085332 - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(_write_shutdown_file(shutdown_file, sleep_time)) - loop.close() - - def test_indirect_shutdown_during_map( monkey_slurm, monkey_slurm_user, @@ -154,18 +125,23 @@ def fun_sleep(dummy): shutdown_file = tmp_path / "shutdown" - # NOTE: the executor.map call below is blocking. For this reason, we call - # the scancel function from a different thread, so that we can make it - # happen during the workflow execution The following block is based on - # https://stackoverflow.com/a/59645689/19085332 - + # NOTE: the executor.map call below is blocking. For this reason, we write + # the shutdown file from a subprocess.Popen, so that we can make it happen + # during the execution. shutdown_sleep_time = 2 - logging.warning(f"PRE THREAD START {time.perf_counter()=}") - _thread = threading.Thread( - target=_auxiliary_run, args=(shutdown_file, shutdown_sleep_time) + tmp_script = (tmp_path / "script.sh").as_posix() + debug(tmp_script) + with open(tmp_script, "w") as f: + f.write(f"sleep {shutdown_sleep_time}\n") + f.write(f"cat NOTHING > {shutdown_file.as_posix()}\n") + + tmp_stdout = open((tmp_path / "stdout").as_posix(), "w") + tmp_stderr = open((tmp_path / "stderr").as_posix(), "w") + subprocess.Popen( + shlex.split(f"bash {tmp_script}"), + stdout=tmp_stdout, + stderr=tmp_stderr, ) - _thread.start() - logging.warning(f"POST THREAD START {time.perf_counter()=}") with FractalSlurmExecutor( slurm_user=monkey_slurm_user, @@ -186,3 +162,6 @@ def fun_sleep(dummy): list(res) debug(e.value) assert "shutdown" in str(e.value) + + tmp_stdout.close() + tmp_stderr.close() diff --git a/tests/test_full_workflow.py b/tests/test_full_workflow.py index 2a6f80127e..d8b3eea33d 100644 --- a/tests/test_full_workflow.py +++ b/tests/test_full_workflow.py @@ -13,18 +13,15 @@ Institute for Biomedical Research and Pelkmans Lab from the University of Zurich. """ -import asyncio import glob -import logging import os -import threading -import time +import shlex +import subprocess from pathlib import Path import pytest from devtools import debug -from .fixtures_slurm import scancel_all_jobs_of_a_slurm_user from fractal_server.app.runner import _backends PREFIX = "/api/v1" @@ -33,7 +30,6 @@ backends_available = list(_backends.keys()) -@pytest.mark.slow @pytest.mark.parametrize("backend", backends_available) async def test_full_workflow( db, @@ -48,7 +44,6 @@ async def test_full_workflow( request, override_settings_factory, ): - override_settings_factory( FRACTAL_RUNNER_BACKEND=backend, FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path / f"artifacts-{backend}", @@ -231,7 +226,6 @@ async def test_full_workflow( assert "index" in list(output_dataset_json["meta"].keys()) -@pytest.mark.slow @pytest.mark.parametrize("backend", backends_available) async def test_failing_workflow_UnknownError( client, @@ -332,7 +326,6 @@ async def test_failing_workflow_UnknownError( print(job_status_data["log"]) -@pytest.mark.slow @pytest.mark.parametrize("backend", backends_available) @pytest.mark.parametrize("failing_task", ["parallel", "non_parallel"]) async def test_failing_workflow_TaskExecutionError( @@ -350,7 +343,6 @@ async def test_failing_workflow_TaskExecutionError( override_settings_factory, resource_factory, ): - override_settings_factory( FRACTAL_RUNNER_BACKEND=backend, FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path @@ -500,30 +492,7 @@ async def test_failing_workflow_TaskExecutionError( assert "index" in list(output_dataset_json["meta"].keys()) -async def _auxiliary_scancel(slurm_user, sleep_time): - # The _auxiliary_scancel and _auxiliary_run functions are used as in - # https://stackoverflow.com/a/59645689/19085332 - logging.warning(f"[scancel_thread] run START {time.perf_counter()=}") - # Wait `scancel_sleep_time` seconds, to let the SLURM job pass from PENDING - # to RUNNING - time.sleep(sleep_time) - # Scancel all jobs of the current SLURM user - logging.warning(f"[scancel_thread] run SCANCEL {time.perf_counter()=}") - scancel_all_jobs_of_a_slurm_user(slurm_user=slurm_user, show_squeue=True) - logging.warning(f"[scancel_thread] run END {time.perf_counter()=}") - - -def _auxiliary_run(slurm_user, sleep_time): - # The _auxiliary_scancel and _auxiliary_run functions are used as in - # https://stackoverflow.com/a/59645689/19085332 - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(_auxiliary_scancel(slurm_user, sleep_time)) - loop.close() - - @pytest.mark.parametrize("backend", ["slurm"]) -@pytest.mark.slow async def test_failing_workflow_JobExecutionError( backend, client, @@ -541,8 +510,8 @@ async def test_failing_workflow_JobExecutionError( relink_python_interpreter, cfut_jobs_finished, resource_factory, + tmp_path, ): - override_settings_factory( FRACTAL_RUNNER_BACKEND=backend, FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path @@ -603,19 +572,31 @@ async def test_failing_workflow_JobExecutionError( debug(wftask1_id) # NOTE: the client.post call below is blocking, due to the way we are - # running tests. For this reason, we call the scancel functionfrom a - # different thread, so that we can make it happen during the workflow - # execution - # The following block is based on - # https://stackoverflow.com/a/59645689/19085332 + # running tests. For this reason, we call the scancel function from a + # from a subprocess.Popen, so that we can make it happen during the + # execution. scancel_sleep_time = 10 slurm_user = monkey_slurm_user - logging.warning(f"PRE THREAD START {time.perf_counter()=}") - _thread = threading.Thread( - target=_auxiliary_run, args=(slurm_user, scancel_sleep_time) + + tmp_script = (tmp_path / "script.sh").as_posix() + debug(tmp_script) + with open(tmp_script, "w") as f: + f.write(f"sleep {scancel_sleep_time}\n") + f.write( + ( + f"sudo --non-interactive -u {slurm_user} " + f"scancel -u {slurm_user} -v" + "\n" + ) + ) + + tmp_stdout = open((tmp_path / "stdout").as_posix(), "w") + tmp_stderr = open((tmp_path / "stderr").as_posix(), "w") + subprocess.Popen( + shlex.split(f"bash {tmp_script}"), + stdout=tmp_stdout, + stderr=tmp_stderr, ) - _thread.start() - logging.warning(f"POST THREAD START {time.perf_counter()=}") # Re-submit the modified workflow res_second_apply = await client.post( @@ -656,6 +637,9 @@ async def test_failing_workflow_JobExecutionError( str(wftask1_id): "failed", } + tmp_stdout.close() + tmp_stderr.close() + async def test_non_python_task( client, @@ -925,7 +909,6 @@ async def test_non_executable_task_command( user_kwargs = {} async with MockCurrentUser(persist=True, user_kwargs=user_kwargs) as user: - # Create task task = await task_factory( name="invalid-task-command",