From 13eb85c4f09408183469d152d8b1cb25f9bbddbb Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Tue, 21 Nov 2023 15:51:36 +0100 Subject: [PATCH 01/12] Add `_event_loop.close` to fixture, to avoid warning from recent pytest-asyncio --- tests/conftest.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/conftest.py b/tests/conftest.py index 657c0b5d82..9b0d781cb5 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -33,6 +33,7 @@ def event_loop(): _event_loop = asyncio.new_event_loop() _event_loop.set_debug(True) yield _event_loop + _event_loop.close() @pytest.fixture(scope="session") From 96f98d5bbb65ec1d33c84e8f508e3b6fb839f464 Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Tue, 21 Nov 2023 15:53:29 +0100 Subject: [PATCH 02/12] Add `_thread.join()` in tests that open a new thread --- tests/test_backend_slurm_shutdown.py | 1 + tests/test_full_workflow.py | 1 + 2 files changed, 2 insertions(+) diff --git a/tests/test_backend_slurm_shutdown.py b/tests/test_backend_slurm_shutdown.py index c684153032..5dd40e7652 100644 --- a/tests/test_backend_slurm_shutdown.py +++ b/tests/test_backend_slurm_shutdown.py @@ -186,3 +186,4 @@ def fun_sleep(dummy): list(res) debug(e.value) assert "shutdown" in str(e.value) + _thread.join() diff --git a/tests/test_full_workflow.py b/tests/test_full_workflow.py index 2a6f80127e..0d59659e68 100644 --- a/tests/test_full_workflow.py +++ b/tests/test_full_workflow.py @@ -655,6 +655,7 @@ async def test_failing_workflow_JobExecutionError( str(wftask0_id): "done", str(wftask1_id): "failed", } + _thread.join() async def test_non_python_task( From 9fa7003776e4eee635e9f373adc4f381228c0f46 Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Tue, 21 Nov 2023 15:54:27 +0100 Subject: [PATCH 03/12] Update CHANGELOG --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9888b215bb..72355137a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,6 +34,7 @@ * Testing: * Improve `test_full_workflow.py` (\#971). * Update `pytest-asyncio` to v0.21 (\#1008). + * Mitigate issues related to [error in CI](https://github.com/fractal-analytics-platform/fractal-server/issues/954) (\#1012). # 1.3.14 (do not use!) From 8ec410b8fba8c4f2bd7d067b460d77285188656b Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Tue, 21 Nov 2023 16:06:09 +0100 Subject: [PATCH 04/12] Skip test_failing_workflow_JobExecutionError --- tests/test_full_workflow.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_full_workflow.py b/tests/test_full_workflow.py index 0d59659e68..1d6a78de8e 100644 --- a/tests/test_full_workflow.py +++ b/tests/test_full_workflow.py @@ -522,6 +522,7 @@ def _auxiliary_run(slurm_user, sleep_time): loop.close() +@pytest.mark.skip(reason="This test is possibly causing issue #954") @pytest.mark.parametrize("backend", ["slurm"]) @pytest.mark.slow async def test_failing_workflow_JobExecutionError( From c3f5a78291c33ab04cf35d83d73341dde31d192c Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Tue, 21 Nov 2023 16:17:33 +0100 Subject: [PATCH 05/12] Remove `slow` pytest marker --- tests/test_full_workflow.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/test_full_workflow.py b/tests/test_full_workflow.py index 1d6a78de8e..099efc8055 100644 --- a/tests/test_full_workflow.py +++ b/tests/test_full_workflow.py @@ -33,7 +33,6 @@ backends_available = list(_backends.keys()) -@pytest.mark.slow @pytest.mark.parametrize("backend", backends_available) async def test_full_workflow( db, @@ -231,7 +230,6 @@ async def test_full_workflow( assert "index" in list(output_dataset_json["meta"].keys()) -@pytest.mark.slow @pytest.mark.parametrize("backend", backends_available) async def test_failing_workflow_UnknownError( client, @@ -332,7 +330,6 @@ async def test_failing_workflow_UnknownError( print(job_status_data["log"]) -@pytest.mark.slow @pytest.mark.parametrize("backend", backends_available) @pytest.mark.parametrize("failing_task", ["parallel", "non_parallel"]) async def test_failing_workflow_TaskExecutionError( @@ -524,7 +521,6 @@ def _auxiliary_run(slurm_user, sleep_time): @pytest.mark.skip(reason="This test is possibly causing issue #954") @pytest.mark.parametrize("backend", ["slurm"]) -@pytest.mark.slow async def test_failing_workflow_JobExecutionError( backend, client, From 75c7bd065430f8840282f1a187e7390ab533fc56 Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Tue, 21 Nov 2023 16:20:46 +0100 Subject: [PATCH 06/12] Remove `pytest_configure` --- tests/conftest.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 9b0d781cb5..a7f2db5d14 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,13 +21,6 @@ def check_basetemp(tpath: Path): ) -def pytest_configure(config): - """ - See https://docs.pytest.org/en/stable/how-to/mark.html#registering-marks - """ - config.addinivalue_line("markers", "slow: marks tests as slow") - - @pytest.fixture(scope="session") def event_loop(): _event_loop = asyncio.new_event_loop() From 5562112286523adbc85fefb7e382a4e3e345264b Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Tue, 21 Nov 2023 16:21:03 +0100 Subject: [PATCH 07/12] Re-enable `test_failing_workflow_JobExecutionError` --- tests/test_full_workflow.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_full_workflow.py b/tests/test_full_workflow.py index 099efc8055..d947964e0b 100644 --- a/tests/test_full_workflow.py +++ b/tests/test_full_workflow.py @@ -519,7 +519,6 @@ def _auxiliary_run(slurm_user, sleep_time): loop.close() -@pytest.mark.skip(reason="This test is possibly causing issue #954") @pytest.mark.parametrize("backend", ["slurm"]) async def test_failing_workflow_JobExecutionError( backend, From f7307302f5ee5f000a662ae2bc825daee7120e1d Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Tue, 21 Nov 2023 16:43:10 +0100 Subject: [PATCH 08/12] Skip failing test --- tests/test_full_workflow.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_full_workflow.py b/tests/test_full_workflow.py index d947964e0b..e6d5e259c7 100644 --- a/tests/test_full_workflow.py +++ b/tests/test_full_workflow.py @@ -519,6 +519,7 @@ def _auxiliary_run(slurm_user, sleep_time): loop.close() +@pytest.mark.skip @pytest.mark.parametrize("backend", ["slurm"]) async def test_failing_workflow_JobExecutionError( backend, From 22ba341bb07d2b2aa56ce561fba74e9fd9763f38 Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Tue, 21 Nov 2023 16:45:13 +0100 Subject: [PATCH 09/12] Make github-CI tests verbose --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 770ec1ae2d..00d96c9eee 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -66,7 +66,7 @@ jobs: poetry install --with dev --without docs --no-interaction -E slurm -E gunicorn $DB - name: Test with pytest - run: poetry run coverage run --concurrency=thread,greenlet,multiprocessing -m pytest + run: poetry run coverage run --concurrency=thread,greenlet,multiprocessing -m pytest -s --log-cli-level info - name: Upload coverage data uses: actions/upload-artifact@v3 From 7e71d95bebeb88d6c6bc1e7c7ed050b01efb2d2e Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Tue, 21 Nov 2023 17:18:42 +0100 Subject: [PATCH 10/12] Avoid explicit use of threads and event loops in tests, in favor of Popen (ref #954) --- tests/test_backend_slurm_shutdown.py | 62 ++++++++---------------- tests/test_full_workflow.py | 71 +++++++++++----------------- 2 files changed, 48 insertions(+), 85 deletions(-) diff --git a/tests/test_backend_slurm_shutdown.py b/tests/test_backend_slurm_shutdown.py index 5dd40e7652..c70008b701 100644 --- a/tests/test_backend_slurm_shutdown.py +++ b/tests/test_backend_slurm_shutdown.py @@ -1,8 +1,6 @@ -import asyncio -import logging -import threading +import shlex +import subprocess import time -from pathlib import Path import pytest from devtools import debug @@ -114,33 +112,6 @@ def _sleep_and_return(sleep_time): raise e -async def _write_shutdown_file(shutdown_file: Path, sleep_time): - # The _auxiliary_scancel and _auxiliary_run functions are used as in - # https://stackoverflow.com/a/59645689/19085332 - logging.warning(f"[_write_shutdown_file] run START {time.perf_counter()=}") - # Wait `scancel_sleep_time` seconds, to let the SLURM job pass from PENDING - # to RUNNING - time.sleep(sleep_time) - - debug(run_squeue()) - # Scancel all jobs of the current SLURM user - logging.warning(f"[_write_shutdown_file] run WRITE {time.perf_counter()=}") - # Trigger shutdown - with shutdown_file.open("w") as f: - f.write("") - assert shutdown_file.exists() - logging.warning(f"[_write_shutdown_file] run END {time.perf_counter()=}") - - -def _auxiliary_run(shutdown_file: Path, sleep_time): - # The _write_shutdown_file and _auxiliary_run functions are used as in - # https://stackoverflow.com/a/59645689/19085332 - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(_write_shutdown_file(shutdown_file, sleep_time)) - loop.close() - - def test_indirect_shutdown_during_map( monkey_slurm, monkey_slurm_user, @@ -154,18 +125,23 @@ def fun_sleep(dummy): shutdown_file = tmp_path / "shutdown" - # NOTE: the executor.map call below is blocking. For this reason, we call - # the scancel function from a different thread, so that we can make it - # happen during the workflow execution The following block is based on - # https://stackoverflow.com/a/59645689/19085332 - + # NOTE: the executor.map call below is blocking. For this reason, we write + # the shutdown file from a subprocess.Popen, so that we can make it happen + # during the execution. shutdown_sleep_time = 2 - logging.warning(f"PRE THREAD START {time.perf_counter()=}") - _thread = threading.Thread( - target=_auxiliary_run, args=(shutdown_file, shutdown_sleep_time) + tmp_script = (tmp_path / "script.sh").as_posix() + debug(tmp_script) + with open(tmp_script, "w") as f: + f.write(f"sleep {shutdown_sleep_time}\n") + f.write(f"cat NOTHING > {shutdown_file.as_posix()}\n") + + tmp_stdout = open((tmp_path / "stdout").as_posix(), "w") + tmp_stderr = open((tmp_path / "stderr").as_posix(), "w") + subprocess.Popen( + shlex.split(f"bash {tmp_script}"), + stdout=tmp_stdout, + stderr=tmp_stderr, ) - _thread.start() - logging.warning(f"POST THREAD START {time.perf_counter()=}") with FractalSlurmExecutor( slurm_user=monkey_slurm_user, @@ -186,4 +162,6 @@ def fun_sleep(dummy): list(res) debug(e.value) assert "shutdown" in str(e.value) - _thread.join() + + tmp_stdout.close() + tmp_stderr.close() diff --git a/tests/test_full_workflow.py b/tests/test_full_workflow.py index e6d5e259c7..d8b3eea33d 100644 --- a/tests/test_full_workflow.py +++ b/tests/test_full_workflow.py @@ -13,18 +13,15 @@ Institute for Biomedical Research and Pelkmans Lab from the University of Zurich. """ -import asyncio import glob -import logging import os -import threading -import time +import shlex +import subprocess from pathlib import Path import pytest from devtools import debug -from .fixtures_slurm import scancel_all_jobs_of_a_slurm_user from fractal_server.app.runner import _backends PREFIX = "/api/v1" @@ -47,7 +44,6 @@ async def test_full_workflow( request, override_settings_factory, ): - override_settings_factory( FRACTAL_RUNNER_BACKEND=backend, FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path / f"artifacts-{backend}", @@ -347,7 +343,6 @@ async def test_failing_workflow_TaskExecutionError( override_settings_factory, resource_factory, ): - override_settings_factory( FRACTAL_RUNNER_BACKEND=backend, FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path @@ -497,29 +492,6 @@ async def test_failing_workflow_TaskExecutionError( assert "index" in list(output_dataset_json["meta"].keys()) -async def _auxiliary_scancel(slurm_user, sleep_time): - # The _auxiliary_scancel and _auxiliary_run functions are used as in - # https://stackoverflow.com/a/59645689/19085332 - logging.warning(f"[scancel_thread] run START {time.perf_counter()=}") - # Wait `scancel_sleep_time` seconds, to let the SLURM job pass from PENDING - # to RUNNING - time.sleep(sleep_time) - # Scancel all jobs of the current SLURM user - logging.warning(f"[scancel_thread] run SCANCEL {time.perf_counter()=}") - scancel_all_jobs_of_a_slurm_user(slurm_user=slurm_user, show_squeue=True) - logging.warning(f"[scancel_thread] run END {time.perf_counter()=}") - - -def _auxiliary_run(slurm_user, sleep_time): - # The _auxiliary_scancel and _auxiliary_run functions are used as in - # https://stackoverflow.com/a/59645689/19085332 - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - loop.run_until_complete(_auxiliary_scancel(slurm_user, sleep_time)) - loop.close() - - -@pytest.mark.skip @pytest.mark.parametrize("backend", ["slurm"]) async def test_failing_workflow_JobExecutionError( backend, @@ -538,8 +510,8 @@ async def test_failing_workflow_JobExecutionError( relink_python_interpreter, cfut_jobs_finished, resource_factory, + tmp_path, ): - override_settings_factory( FRACTAL_RUNNER_BACKEND=backend, FRACTAL_RUNNER_WORKING_BASE_DIR=tmp777_path @@ -600,19 +572,31 @@ async def test_failing_workflow_JobExecutionError( debug(wftask1_id) # NOTE: the client.post call below is blocking, due to the way we are - # running tests. For this reason, we call the scancel functionfrom a - # different thread, so that we can make it happen during the workflow - # execution - # The following block is based on - # https://stackoverflow.com/a/59645689/19085332 + # running tests. For this reason, we call the scancel function from a + # from a subprocess.Popen, so that we can make it happen during the + # execution. scancel_sleep_time = 10 slurm_user = monkey_slurm_user - logging.warning(f"PRE THREAD START {time.perf_counter()=}") - _thread = threading.Thread( - target=_auxiliary_run, args=(slurm_user, scancel_sleep_time) + + tmp_script = (tmp_path / "script.sh").as_posix() + debug(tmp_script) + with open(tmp_script, "w") as f: + f.write(f"sleep {scancel_sleep_time}\n") + f.write( + ( + f"sudo --non-interactive -u {slurm_user} " + f"scancel -u {slurm_user} -v" + "\n" + ) + ) + + tmp_stdout = open((tmp_path / "stdout").as_posix(), "w") + tmp_stderr = open((tmp_path / "stderr").as_posix(), "w") + subprocess.Popen( + shlex.split(f"bash {tmp_script}"), + stdout=tmp_stdout, + stderr=tmp_stderr, ) - _thread.start() - logging.warning(f"POST THREAD START {time.perf_counter()=}") # Re-submit the modified workflow res_second_apply = await client.post( @@ -652,7 +636,9 @@ async def test_failing_workflow_JobExecutionError( str(wftask0_id): "done", str(wftask1_id): "failed", } - _thread.join() + + tmp_stdout.close() + tmp_stderr.close() async def test_non_python_task( @@ -923,7 +909,6 @@ async def test_non_executable_task_command( user_kwargs = {} async with MockCurrentUser(persist=True, user_kwargs=user_kwargs) as user: - # Create task task = await task_factory( name="invalid-task-command", From 81152aed38c89c60214f6a8b57d8dae92e636108 Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Tue, 21 Nov 2023 17:20:07 +0100 Subject: [PATCH 11/12] Revert github-CI verbosity --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 00d96c9eee..770ec1ae2d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -66,7 +66,7 @@ jobs: poetry install --with dev --without docs --no-interaction -E slurm -E gunicorn $DB - name: Test with pytest - run: poetry run coverage run --concurrency=thread,greenlet,multiprocessing -m pytest -s --log-cli-level info + run: poetry run coverage run --concurrency=thread,greenlet,multiprocessing -m pytest - name: Upload coverage data uses: actions/upload-artifact@v3 From 00fd71ca873344229568001d08bbd92a2dc682d9 Mon Sep 17 00:00:00 2001 From: Tommaso Comparin <3862206+tcompa@users.noreply.github.com> Date: Tue, 21 Nov 2023 17:32:25 +0100 Subject: [PATCH 12/12] Update CHANGELOG --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 72355137a3..08e4a5fcb4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,7 +34,7 @@ * Testing: * Improve `test_full_workflow.py` (\#971). * Update `pytest-asyncio` to v0.21 (\#1008). - * Mitigate issues related to [error in CI](https://github.com/fractal-analytics-platform/fractal-server/issues/954) (\#1012). + * Fix CI issue related to event loop and asyncpg (\#1012). # 1.3.14 (do not use!)